0


Thinkphp6使用RabbitMQ消息队列

Thinkphp6连接使用RabbitMQ(不止tp6,其他框架对应改下也一样),使用Docker部署RabbitMQ,在上一篇已经讲了->传送门<-。

部署环境

开始前先进入RabbitMQ的web管理界面,选择Queues菜单,点击底部的Add a new queue,新建一个test的队列。

安装thinkphp6框架

composer create-project topthink/think tp 

安装workerman扩展

composer require topthink/think-worker

安装rabbitmq扩展

composer require workerman/rabbitmq

代码编写

生产者

  • 在app目录下新建workerman目录,并在其下创建Send.php文件,$options数组中的host地址改成你的rabbitmq地址。
<?phpnamespaceapp\workerman;useBunny\Channel;useWorkerman\RabbitMQ\Client;usethink\worker\Server;classSendextendsServer{//websocket地址,一会用于测试。protected$socket='websocket://127.0.0.1:2345';/**
     * 收到信息
     * @param $connection
     * @param $data
     */publicfunctiononMessage($connection,$data){//websocket发送过来的消息$connection->send('我收到你的信息了:'.$data);//rabbitMQ配置$options=['host'=>'127.0.0.1',//rabbitMQ IP'port'=>5672,//rabbitMQ 通讯端口'user'=>'admin',//rabbitMQ 账号'password'=>'123456'//rabbitMQ 密码];(newClient($options))->connect()->then(function(Client $client){return$client->channel();})->then(function(Channel $channel){/**
             * 创建队列(Queue)
             * name: ceshi         // 队列名称
             * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
             * durable: true       // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,
             *                        设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue
             * exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
             *  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
             */return$channel->queueDeclare('ceshi',false,true,false,false)->then(function()use($channel){return$channel;});})->then(function(Channel $channel)use($data){echo"发送消息内容:".$data."\n";/**
             * 发送消息
             * body 发送的数据
             * headers 数据头,建议 ['content_type' => 'text/plain'],这样消费端是springboot注解接收直接是字符串类型
             * exchange 交换器名称
             * routingKey 路由key
             * mandatory
             * immediate
             * @return bool|PromiseInterface|int
             */return$channel->publish($data,['content_type'=>'text/plain'],'','ceshi')->then(function()use($channel){return$channel;});})->then(function(Channel $channel){//echo " [x] Sent 'Hello World!'\n";$client=$channel->getClient();return$channel->close()->then(function()use($client){return$client;});})->then(function(Client $client){$client->disconnect();});}/**
     * 当连接建立时触发的回调函数
     * @param $connection
     */publicfunctiononConnect($connection){}/**
     * 当连接断开时触发的回调函数
     * @param $connection
     */publicfunctiononClose($connection){}/**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */publicfunctiononError($connection,$code,$msg){echo"error $code$msg\n";}/**
     * 每个进程启动
     * @param $worker
     */publicfunctiononWorkerStart($worker){}}
  • 在config/worker_server.php中设置worker_class值为'app\workerman\Send'
  • 启动这个生产者
php think worker:server
方式1:通过tcp发送数据
  • 发送数据 通过在线网址发送数据(websocket方式),->传送门<- 输入【ws://127.0.0.1:2345】后点击发送数据!在这里插入图片描述
  • 前往rabbitMQ控制台查看在这里插入图片描述 至此,生产这一步就走完了,那么如果我不想通过websocket方式,想用tcp方式生产数据怎么办?
方式2:通过tcp发送数据

接口给内置服务器发消息->内置服务去发消息给rabbitMQ

  • 将Send.php中websocket:127.0.0.1改成tcp:127.0.0.1
  • 重启服务
  • 把controller目录中Index.php修改为以下内容
<?phpnamespaceapp\controller;useapp\BaseController;classIndexextendsBaseController{publicfunctionindex(string $msg){//连接本地tcp服务$client=stream_socket_client('tcp://127.0.0.1:2345',$errno,$errmsg,1);//发送字符串fwrite($client,$msg."\n");//断开服务fclose($client);return'OK';}}
  • 用Postman访问对应接口就好,也会有数据进入队列

消费者

同生产者一样新创建一个thinkphp6项目,注意端口别和生产者冲突!这里我设置的是2346端口

  • 在app目录下新建workerman目录,并在其下创建Receive.php文件,$options数组中的host地址改成你的rabbitmq地址。
<?phpnamespaceapp\workerman;useBunny\Channel;useBunny\Message;useWorkerman\RabbitMQ\Client;usethink\worker\Server;classReceiveextendsServer{protected$socket='tcp://127.0.0.1:2346';/**
     * 收到信息
     * @param $connection
     * @param $data
     */publicfunctiononMessage($connection,$data){}/**
     * 当连接建立时触发的回调函数
     * @param $connection
     */publicfunctiononConnect($connection){}/**
     * 当连接断开时触发的回调函数
     * @param $connection
     */publicfunctiononClose($connection){}/**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */publicfunctiononError($connection,$code,$msg){echo"error $code$msg\n";}/**
     * 每个进程启动
     * @param $worker
     */publicfunctiononWorkerStart($worker){//rabbitMQ配置$options=['host'=>'127.0.0.1',//rabbitMQ IP'port'=>5672,//rabbitMQ 通讯端口'user'=>'admin',//rabbitMQ 账号'password'=>'123456'//rabbitMQ 密码];(newClient($options))->connect()->then(function(Client $client){return$client->channel();})->then(function(Channel $channel){/**
             * 创建队列(Queue)
             * name: ceshi         // 队列名称
             * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
             * durable: true       // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,
             *                        设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue
             * exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
             *  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
             */return$channel->queueDeclare('ceshi',false,true,false,false)->then(function()use($channel){return$channel;});})->then(function(Channel $channel){echo' [*] Waiting for messages. To exit press CTRL+C',"\n";$channel->consume(function(Message $message, Channel $channel, Client $client){echo"接收消息内容:",$message->content,"\n";},'ceshi','',false,true);});}}
  • 在config/worker_server.php中设置worker_class值为'app\workerman\Receive',并将端口改为2346
  • 启动这个消费者
php think worker:server

到这里消费者也就结束啦!

使用

接下来我用cmd来启动两个服务,然后用接口发送消息和消费测试!
在这里插入图片描述

部分参考自:
https://www.workerman.net/doc/workerman/components/workerman-rabbitmq.html
https://blog.csdn.net/weixin_47723549/article/details/124493059


本文转载自: https://blog.csdn.net/xiantianga6883/article/details/129328065
版权归原作者 骷大人 所有, 如有侵权,请联系我们删除。

“Thinkphp6使用RabbitMQ消息队列”的评论:

还没有评论