0


php程序中使用rabbitmq消息队列的简单示例 及 rabbitmq队列所有php可用命令

一、php程序中使用rabbitmq消息队列的简单示例

1. php中rabbitmq扩展程序的安装使用

php下要使用rabbitmq,首先需要安装rabbitmq扩展:ampq,脚本如下:
yum install librabbitmq-devel.x86_64
wget http://pecl.php.net/get/amqp-1.4.0.tgz
tar zxvf amqp-1.4.0.tgz  
cd amqp-1.4.0
/usr/local/php/bin/phpize 
./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp
make && make install

2. rabbitmq队列的应用感受

 rabbitmq的队列感觉不大好理解,甚至觉得有点扯蛋。消息队列那不就是队列和队列里面的消息么,但是rabbitmq里还整些什么交换机、什么路由,也真是头大。在了解了一下之后觉得其实可以这样理解,rabbitmq这个消息队列的设计思想是这样的,分为两端,一端是写,一端是读。写的时候是写到交换机的,此时实际根本不是队列,你可以把交换机理解为rabbitmq里放消息的一个集合,写入的时候会把消息全部写到各个集合中。有个重要的东西是路由,每一个消息写进服务端的时候需要指定绑定路由,为什么需要这样。这就是它的设计思路。先看看读取端吧。

队列是读取端的概念,读取端要读取前先建一个队列,建立好后需要绑定到交换机。正常情况下我们一个队列绑定到一个交换机,然后从交换机源源不断地读取消息。但有时我们有可能有这种情况,会将交换机里的消息再进行分类。这就是路由起的作用,于是读取端时可以读取一个交换机里的内容,也可以读取一个交换机里指定路由的内容。其实确实有点麻烦的。 也就是说它是队列中的队列。完全可以让其扁平化,就一层队列,那样一个交换机就是一个队列了。rabbitmq的设计思路就没有beanstalk先进。

3. PHP程序中调用rabbitmq的示例代码

在PHP里调用rabbitmq的示例代码。下面是生产方producer.php里的代码。

a. 生产方producer.php代码:

<?php
#kermit:2016-10

//连接参数配置 并连接rabbitmq
$config = array(
   'host' => '127.0.0.1',
   'port' => '5672',
   'login' => 'guest',
   'password' => 'guest',
   'vhost' => '/'
);
$conn = new AMQPConnection($config);
if (!$conn->connect()) {
    die("连接rabbitmq失败!\n");
}
print_r($conn); #打印连接

//建立信道
$channel = new AMQPChannel($conn);
$msg = json_encode(array('test-time' => date('Y-m-d H:i:s')));

//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName('school');
$ex->setType(AMQP_EX_TYPE_DIRECT);      // direct类型
$ex->setFlags(AMQP_DURABLE);            // 持久化
$ex->declare();

// 消息发布
$channel->startTransaction();
if ($msg) $ex->publish($msg, 'students-update');
$channel->commitTransaction();
print_r($msg);
echo "\n-----ok----\n";

b.下面是消费方consumer.php的代码:

<?php
#kermit:2016-10

//连接参数配置 并连接rabbitmq
$config = array(
   'host' => '127.0.0.1',
   'port' => '5672',
   'login' => 'guest',
   'password' => 'guest',
   'vhost' => '/'
);
$conn = new AMQPConnection($config);
if (!$conn->connect()) {
    die("连接rabbitmq失败!\n");
}

//建立信道
$channel = new AMQPChannel($conn);

// 创建队列
$q = new AMQPQueue($channel);
$q->setName('students');
$q->setFlags(AMQP_DURABLE);             // 持久化

// 绑定交换机与队列,并指定路由键
$q->bind('school', 'students-update');

// 消息获取
$ret = $q->get(AMQP_AUTOACK);
print_r($ret);
if ($ret) {
        echo "\nget data:\n";
        var_dump(json_decode($ret->getBody(), true));
}
$conn->disconnect();

c. 生产方执行一次命令如下:

[root@kermit rabbitmq]# php -f producer.php 
AMQPConnection Object
(
    [login] => guest
    [password] => guest
    [host] => 127.0.0.1
    [vhost] => /
    [port] => 5672
    [read_timeout] => 0
    [write_timeout] => 0
    [connect_timeout] => 0
    [is_connected] => 1
)
{"test-time":"2016-10-12 17:07:36"}
-----ok----

d. 消费方取得消息内容如下:

[root@kermit rabbitmq]# php -f consumer.php 
AMQPEnvelope Object
(
    [body] => {"test-time":"2016-10-12 17:07:36"}
    [content_type] => text/plain
    [routing_key] => students-update
    [delivery_tag] => 1
    [delivery_mode] => 0
    [exchange_name] => school
    [is_redelivery] => 0
    [content_encoding] => 
    [type] => 
    [timestamp] => 0
    [priority] => 0
    [expiration] => 
    [user_id] => 
    [app_id] => 
    [message_id] => 
    [reply_to] => 
    [correlation_id] => 
    [headers] => Array
        (
        )

)

get data:
array(1) {
  ["test-time"]=>
  string(19) "2016-10-12 17:07:36"
}
取完之后,消息就从交换机中消失,没有beanstalk里面的那些状态修改等。上面是进行测试,在消费方真正要操作时是要进行阻塞式操作的。需要设置一个回调函数来操作读取的信息。如下:
$q->consume('Message', AMQP_AUTOACK); //自动ACK应答  
//消费回调函数
function Message($queue, $queue) { 
    print_r($queue->getRoutingKey);
    $msg = $queue->getBody(); 
    echo $msg;
}

二、 rabbitmq队列所有可用命令

rabbitmq自带了方便的命令行工具:rabbitmqctl,如本机的rabbitmqctl的安装路径在:/opt/modules/rabbitmq/sbin/rabbitmqctl list_queues

使用方法: /opt/modules/rabbitmq/sbin/rabbitmqctl list_queues status

rabbitmqctl可用的全部命令如下:

stop [<pid_file>]
stop_app
start_app
wait <pid_file>
reset
force_reset
rotate_logs <suffix>

join_cluster <clusternode> [--ram]
cluster_status
change_cluster_node_type disc | ram
forget_cluster_node [--offline]
update_cluster_nodes clusternode
sync_queue queue
cancel_sync_queue queue
set_cluster_name name

add_user <username> <password>    #用户账号操作
delete_user <username>
change_password <username> <newpassword>
clear_password <username>
set_user_tags <username> <tag> ...
list_users

add_vhost <vhostpath>
delete_vhost <vhostpath>
list_vhosts [<vhostinfoitem> ...]
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
clear_permissions [-p <vhostpath>] <username>
list_permissions [-p <vhostpath>]
list_user_permissions <username>

set_parameter [-p <vhostpath>] <component_name> <name> <value>
clear_parameter [-p <vhostpath>] <component_name> <key>
list_parameters [-p <vhostpath>]

set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern>  <definition>
clear_policy [-p <vhostpath>] <name>
list_policies [-p <vhostpath>]

list_queues [-p <vhostpath>] [<queueinfoitem> ...]    #列出队列等信息
list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
list_connections [<connectioninfoitem> ...]
list_channels [<channelinfoitem> ...]
list_consumers [-p <vhostpath>]
status
environment
report
eval <expr>

close_connection <connectionpid> <explanation>
trace_on [-p <vhost>]
trace_off [-p <vhost>]
set_vm_memory_high_watermark <fraction>

本文转载自: https://blog.csdn.net/weixin_47792780/article/details/140149440
版权归原作者 林戈的IT生涯 所有, 如有侵权,请联系我们删除。

“php程序中使用rabbitmq消息队列的简单示例 及 rabbitmq队列所有php可用命令”的评论:

还没有评论