0


TP5简单使用RabbitMQ实现消息队列

在使用 RabbitMQ 之前,你要安装好 RabbitMQ 服务,具体安装方法可以参考 windows下安装RabbitMQ

1、安装扩展

进入TP5 更目录下,输入命令安装:

composer require php-amqplib/php-amqplib

2、自定义命令

TP5 的自定义命令,这里也简单说下。
第一步:
创建命令类文件,新建 application/api/command/Test.php。

<?phpnamespaceapp\api\command;usethink\console\Command;usethink\console\Input;usethink\console\Output;/**
 * 自定义命令测试
 */classTestextendsCommand{/**
     * 配置
     */protectedfunctionconfigure(){// 设置命令的名称和描述$this->setName('test')->setDescription('这是一个测试命令');}/**
     * 执行
     */protectedfunctionexecute(Input$input,Output$output){$output->writeln("测试命令");}}

这个文件定义了一个叫test的命令,备注为 这是一个测试命令,执行命令会输出:test command。
第二步:
配置 command.php文件,在 application/command.php文件中添加命令。

<?phpreturn['app\api\command\Test',];

第三步:
测试命令,在项目根目录下输入命令:

php think test

回车运行之后输出:

test command

到这里,自定义命令就结束了,test命令就自定义成功了。

3、rabbitmq服务端

下来我们自定义 RabbitMQ 启动命令,守护进程运行,启动 rabbirmq 服务端接收消息。
在 application/api/command 目录下,新建 Ramq.php 文件,在执行命令的方法中,调用 RabbitMQ 启动守护进程方法即可。

<?phpnamespaceapp\api\command;usePhpAmqpLib\Connection\AMQPStreamConnection;usethink\console\Command;usethink\console\Input;usethink\console\Output;/**
 * RabbitMq 启动命令
 */classRamqextendsCommand{protected$consumerTag='customer';protected$exchange='xcuser';protected$queue='xcmsg';protectedfunctionconfigure(){$this->setName('ramq')->setDescription('rabbitmq');}protectedfunctionexecute(Input$input,Output$output){$output->writeln("消息队列开始");$this->start();// 指令输出$output->writeln('消费队列结束');}/**
     * 关闭
     */functionshutdown($channel,$connection){$channel->close();$connection->close();}/**
     * 回调处理信息
     */functionprocess_message($message){if($message->body!=='quit'){echo$message->body;}//手动应答$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);if($message->body==='quit'){$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);}}/**
     * 启动 守护进程运行
     */publicfunctionstart(){$host='127.0.0.1';$port=5672;$user='guest';$pwd='guest';$vhost='/';$connection=newAMQPStreamConnection($host,$port,$user,$pwd,$vhost);$channel=$connection->channel();$channel->queue_declare($this->queue,false,true,false,false);$channel->exchange_declare($this->exchange,'direct',false,true,false);$channel->queue_bind($this->queue,$this->exchange);$channel->basic_consume($this->queue,$this->consumerTag,false,false,false,false,array($this,'process_message'));register_shutdown_function(array($this,'shutdown'),$channel,$connection);while(count($channel->callbacks)){$channel->wait();}}}

在application/command.php文件中,添加rabbitmq自定义命令。

return['app\api\command\Ramq',// rabbitmq];

4、发送端

最后,我们再写发送消息的控制器,实现消息队列,具体代码如下:

<?phpnamespaceapp\api\controller;usePhpAmqpLib\Connection\AMQPStreamConnection;usePhpAmqpLib\Message\AMQPMessage;usethink\Controller;/**
 * 发送端
 */classMessageQueueextendsController{constexchange='xcuser';constqueue='xcmsg';/**
     * 发送消息
     */publicfunctionpushMessage($data){$host='127.0.0.1';$port=5672;$user='guest';$pwd='guest';$vhost='/';$connection=newAMQPStreamConnection($host,$port,$user,$pwd,$vhost);$channel=$connection->channel();$channel->exchange_declare(self::exchange,'direct',false,true,false);$channel->queue_declare(self::queue,false,true,false,false);$channel->queue_bind(self::queue,self::exchange);$messageBody=$data;$message=newAMQPMessage($messageBody,array('content_type'=>'text/plain','delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT));$channel->basic_publish($message,self::exchange);$channel->close();$connection->close();echo'ok';}/**
     * 执行
     */publicfunctionindex(){$data=json_encode(['msg'=>'测试数据','id'=>'15']);$this->pushMessage($data);}}

5、验证

先执行自定义命令,启动 rabbitmq 守护进程。在项目更目录下打开命令行,输入下面命令:

php think ramq

然后在浏览器访问发送信息的方法,http://你的域名/api/message/index,你发送一次消息,在命令行就会输出一条消息。这样我们就用 RabbitMQ 实现了一个简单的消息队列。


本文转载自: https://blog.csdn.net/weixin_44888397/article/details/131296203
版权归原作者 L小臣 所有, 如有侵权,请联系我们删除。

“TP5简单使用RabbitMQ实现消息队列”的评论:

还没有评论