一、php程序中使用rabbitmq消息队列的简单示例
1. php中rabbitmq扩展程序的安装使用
php下要使用rabbitmq,首先需要安装rabbitmq扩展:ampq,脚本如下:
yum install librabbitmq-devel.x86_64
wget http://pecl.php.net/get/amqp-1.4.0.tgz
tar zxvf amqp-1.4.0.tgz
cd amqp-1.4.0
/usr/local/php/bin/phpize
./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp
make && make install
2. rabbitmq队列的应用感受
rabbitmq的队列感觉不大好理解,甚至觉得有点扯蛋。消息队列那不就是队列和队列里面的消息么,但是rabbitmq里还整些什么交换机、什么路由,也真是头大。在了解了一下之后觉得其实可以这样理解,rabbitmq这个消息队列的设计思想是这样的,分为两端,一端是写,一端是读。写的时候是写到交换机的,此时实际根本不是队列,你可以把交换机理解为rabbitmq里放消息的一个集合,写入的时候会把消息全部写到各个集合中。有个重要的东西是路由,每一个消息写进服务端的时候需要指定绑定路由,为什么需要这样。这就是它的设计思路。先看看读取端吧。
队列是读取端的概念,读取端要读取前先建一个队列,建立好后需要绑定到交换机。正常情况下我们一个队列绑定到一个交换机,然后从交换机源源不断地读取消息。但有时我们有可能有这种情况,会将交换机里的消息再进行分类。这就是路由起的作用,于是读取端时可以读取一个交换机里的内容,也可以读取一个交换机里指定路由的内容。其实确实有点麻烦的。 也就是说它是队列中的队列。完全可以让其扁平化,就一层队列,那样一个交换机就是一个队列了。rabbitmq的设计思路就没有beanstalk先进。
3. PHP程序中调用rabbitmq的示例代码
在PHP里调用rabbitmq的示例代码。下面是生产方producer.php里的代码。
a. 生产方producer.php代码:
<?php
#kermit:2016-10
//连接参数配置 并连接rabbitmq
$config = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
$conn = new AMQPConnection($config);
if (!$conn->connect()) {
die("连接rabbitmq失败!\n");
}
print_r($conn); #打印连接
//建立信道
$channel = new AMQPChannel($conn);
$msg = json_encode(array('test-time' => date('Y-m-d H:i:s')));
//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName('school');
$ex->setType(AMQP_EX_TYPE_DIRECT); // direct类型
$ex->setFlags(AMQP_DURABLE); // 持久化
$ex->declare();
// 消息发布
$channel->startTransaction();
if ($msg) $ex->publish($msg, 'students-update');
$channel->commitTransaction();
print_r($msg);
echo "\n-----ok----\n";
b.下面是消费方consumer.php的代码:
<?php
#kermit:2016-10
//连接参数配置 并连接rabbitmq
$config = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
$conn = new AMQPConnection($config);
if (!$conn->connect()) {
die("连接rabbitmq失败!\n");
}
//建立信道
$channel = new AMQPChannel($conn);
// 创建队列
$q = new AMQPQueue($channel);
$q->setName('students');
$q->setFlags(AMQP_DURABLE); // 持久化
// 绑定交换机与队列,并指定路由键
$q->bind('school', 'students-update');
// 消息获取
$ret = $q->get(AMQP_AUTOACK);
print_r($ret);
if ($ret) {
echo "\nget data:\n";
var_dump(json_decode($ret->getBody(), true));
}
$conn->disconnect();
c. 生产方执行一次命令如下:
[root@kermit rabbitmq]# php -f producer.php
AMQPConnection Object
(
[login] => guest
[password] => guest
[host] => 127.0.0.1
[vhost] => /
[port] => 5672
[read_timeout] => 0
[write_timeout] => 0
[connect_timeout] => 0
[is_connected] => 1
)
{"test-time":"2016-10-12 17:07:36"}
-----ok----
d. 消费方取得消息内容如下:
[root@kermit rabbitmq]# php -f consumer.php
AMQPEnvelope Object
(
[body] => {"test-time":"2016-10-12 17:07:36"}
[content_type] => text/plain
[routing_key] => students-update
[delivery_tag] => 1
[delivery_mode] => 0
[exchange_name] => school
[is_redelivery] => 0
[content_encoding] =>
[type] =>
[timestamp] => 0
[priority] => 0
[expiration] =>
[user_id] =>
[app_id] =>
[message_id] =>
[reply_to] =>
[correlation_id] =>
[headers] => Array
(
)
)
get data:
array(1) {
["test-time"]=>
string(19) "2016-10-12 17:07:36"
}
取完之后,消息就从交换机中消失,没有beanstalk里面的那些状态修改等。上面是进行测试,在消费方真正要操作时是要进行阻塞式操作的。需要设置一个回调函数来操作读取的信息。如下:
$q->consume('Message', AMQP_AUTOACK); //自动ACK应答
//消费回调函数
function Message($queue, $queue) {
print_r($queue->getRoutingKey);
$msg = $queue->getBody();
echo $msg;
}
二、 rabbitmq队列所有可用命令
rabbitmq自带了方便的命令行工具:rabbitmqctl,如本机的rabbitmqctl的安装路径在:/opt/modules/rabbitmq/sbin/rabbitmqctl list_queues
使用方法: /opt/modules/rabbitmq/sbin/rabbitmqctl list_queues status
rabbitmqctl可用的全部命令如下:
stop [<pid_file>] stop_app start_app wait <pid_file> reset force_reset rotate_logs <suffix> join_cluster <clusternode> [--ram] cluster_status change_cluster_node_type disc | ram forget_cluster_node [--offline] update_cluster_nodes clusternode sync_queue queue cancel_sync_queue queue set_cluster_name name add_user <username> <password> #用户账号操作 delete_user <username> change_password <username> <newpassword> clear_password <username> set_user_tags <username> <tag> ... list_users add_vhost <vhostpath> delete_vhost <vhostpath> list_vhosts [<vhostinfoitem> ...] set_permissions [-p <vhostpath>] <user> <conf> <write> <read> clear_permissions [-p <vhostpath>] <username> list_permissions [-p <vhostpath>] list_user_permissions <username> set_parameter [-p <vhostpath>] <component_name> <name> <value> clear_parameter [-p <vhostpath>] <component_name> <key> list_parameters [-p <vhostpath>] set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition> clear_policy [-p <vhostpath>] <name> list_policies [-p <vhostpath>] list_queues [-p <vhostpath>] [<queueinfoitem> ...] #列出队列等信息 list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...] list_bindings [-p <vhostpath>] [<bindinginfoitem> ...] list_connections [<connectioninfoitem> ...] list_channels [<channelinfoitem> ...] list_consumers [-p <vhostpath>] status environment report eval <expr> close_connection <connectionpid> <explanation> trace_on [-p <vhost>] trace_off [-p <vhost>] set_vm_memory_high_watermark <fraction>
本文转载自: https://blog.csdn.net/weixin_47792780/article/details/140149440
版权归原作者 林戈的IT生涯 所有, 如有侵权,请联系我们删除。
版权归原作者 林戈的IT生涯 所有, 如有侵权,请联系我们删除。