0


php简单实现rabbitMQ消息列队(必须收藏)

业务场景:

  1. 公司是主php做开发的,框架为thinkphp。众所周知,php本身的运行效率存在一定的缺陷,所以如果有一个很复杂很耗时的业务时,必须开发一个常驻内存的程序。首先我想到了phpworkermanswoole,但是这里应上面的标题哈,想将耗时任务交给另一个服务器,同时列队处理。所以这里我想独立部署一个rabbitMQ服务器用于处理列队任务(关于liunx如何搭建rabbitMQ服务,去我主页有详细教程)。
  2. rabbitMQ服务器我们准备好了,建立了一个持久化命名为ceshi的列队,如下:

项目上生产者和消费者的开发我这里全部采用tinkphp6+workerman,为便于管理。这里这么做也是因为发现workerman中对rabbitMQ的文档解释太少了!所以开始踩坑!

1、首先部署好thinkphp6框架。(过程去看thinkphp6手册)

2、安装workerman扩展。(过程去看thinkphp6手册)

3、生产者

配置一个workerman类

创建的Send类代码如下:

  1. <?php
  2. namespace app\workerman;
  3. use Bunny\Channel;
  4. use Workerman\RabbitMQ\Client;
  5. use think\worker\Server;
  6. class Send extends Server
  7. {
  8. //websocket地址,一会用于测试。
  9. protected $socket = 'websocket://127.0.0.1:2345';
  10. /**
  11. * 收到信息
  12. * @param $connection
  13. * @param $data
  14. */
  15. public function onMessage($connection, $data)
  16. {
  17. //websocket发送过来的消息
  18. $connection->send('我收到你的信息了:'.$data);
  19. //rabbitMQ配置
  20. $options = [
  21. 'host'=>'127.0.0.1',//rabbitMQ IP
  22. 'port'=>5672,//rabbitMQ 通讯端口
  23. 'user'=>'admin',//rabbitMQ 账号
  24. 'password'=>'123456'//rabbitMQ 密码
  25. ];
  26. (new Client($options))->connect()->then(function (Client $client) {
  27. return $client->channel();
  28. })->then(function (Channel $channel) {
  29. /**
  30. * 创建队列(Queue)
  31. * name: ceshi // 队列名称
  32. * passive: false // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
  33. * durable: true // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,
  34. * 设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue
  35. * exclusive: false // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
  36. * auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
  37. */
  38. return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
  39. return $channel;
  40. });
  41. })->then(function (Channel $channel) use($data){
  42. echo "发送消息内容:".$data."\n";
  43. /**
  44. * 发送消息
  45. * body 发送的数据
  46. * headers 数据头,建议 ['content_type' => 'text/plain'],这样消费端是springboot注解接收直接是字符串类型
  47. * exchange 交换器名称
  48. * routingKey 路由key
  49. * mandatory
  50. * immediate
  51. * @return bool|PromiseInterface|int
  52. */
  53. return $channel->publish($data, ['content_type' => 'text/plain'], '', 'ceshi')->then(function () use ($channel) {
  54. return $channel;
  55. });
  56. })->then(function (Channel $channel) {
  57. //echo " [x] Sent 'Hello World!'\n";
  58. $client = $channel->getClient();
  59. return $channel->close()->then(function () use ($client) {
  60. return $client;
  61. });
  62. })->then(function (Client $client) {
  63. $client->disconnect();
  64. });
  65. }
  66. /**
  67. * 当连接建立时触发的回调函数
  68. * @param $connection
  69. */
  70. public function onConnect($connection)
  71. {
  72. }
  73. /**
  74. * 当连接断开时触发的回调函数
  75. * @param $connection
  76. */
  77. public function onClose($connection)
  78. {
  79. }
  80. /**
  81. * 当客户端的连接上发生错误时触发
  82. * @param $connection
  83. * @param $code
  84. * @param $msg
  85. */
  86. public function onError($connection, $code, $msg)
  87. {
  88. echo "error $code $msg\n";
  89. }
  90. /**
  91. * 每个进程启动
  92. * @param $worker
  93. */
  94. public function onWorkerStart($worker)
  95. {
  96. }
  97. }

上述都OK以后咱们可以项目路径下通过命令启动这个生产者:

  1. php think worker:server

测试发送数据:

通过这个网站:在线模拟websocket请求工具-BeJSON.com

连接【ws://127.0.0.1:2345】后发送数据!

前往rabbitMQ控制台

列队中有一条消息产生并且等待了!

这个时候你可能问,如果我发送数据不想通过ws发送而是接口发送怎么办?

笨思路呗:接口给内置服务器发消息->内置服务去发消息给rabbitMQ

将协议改为tcp

然后重新启动服务

然后去tp6创建一个路由接口

接口代码:

  1. <?php
  2. namespace app\controller;
  3. use app\BaseController;
  4. class Index extends BaseController
  5. {
  6. public function index(string $msg)
  7. {
  8. //连接本地tcp服务
  9. $client = stream_socket_client('tcp://127.0.0.1:2345', $errno, $errmsg, 1);
  10. //发送字符串
  11. fwrite($client, $msg."\n");
  12. //断开服务
  13. fclose($client);
  14. return 'OK';
  15. }
  16. }

执行结果:

说明接口成功的将数据发送给了本地内置的tcp服务。

同时,内置服务将收到的数据给了rabbitMQ服务列队中。

生产者完成。

4、消费者

同生产者一样新创建一个thinkphp6及安装workerman扩展,注意端口别和生产者冲突!这里我设置的是2346端口

创建的Receive类代码如下:

  1. <?php
  2. namespace app\workerman;
  3. use Bunny\Channel;
  4. use Bunny\Message;
  5. use Workerman\RabbitMQ\Client;
  6. use think\worker\Server;
  7. class Receive extends Server
  8. {
  9. protected $socket = 'tcp://127.0.0.1:2346';
  10. /**
  11. * 收到信息
  12. * @param $connection
  13. * @param $data
  14. */
  15. public function onMessage($connection, $data)
  16. {
  17. }
  18. /**
  19. * 当连接建立时触发的回调函数
  20. * @param $connection
  21. */
  22. public function onConnect($connection)
  23. {
  24. }
  25. /**
  26. * 当连接断开时触发的回调函数
  27. * @param $connection
  28. */
  29. public function onClose($connection)
  30. {
  31. }
  32. /**
  33. * 当客户端的连接上发生错误时触发
  34. * @param $connection
  35. * @param $code
  36. * @param $msg
  37. */
  38. public function onError($connection, $code, $msg)
  39. {
  40. echo "error $code $msg\n";
  41. }
  42. /**
  43. * 每个进程启动
  44. * @param $worker
  45. */
  46. public function onWorkerStart($worker)
  47. {
  48. //rabbitMQ配置
  49. $options = [
  50. 'host'=>'127.0.0.1',//rabbitMQ IP
  51. 'port'=>5672,//rabbitMQ 通讯端口
  52. 'user'=>'admin',//rabbitMQ 账号
  53. 'password'=>'123456'//rabbitMQ 密码
  54. ];
  55. (new Client($options))->connect()->then(function (Client $client) {
  56. return $client->channel();
  57. })->then(function (Channel $channel) {
  58. /**
  59. * 创建队列(Queue)
  60. * name: ceshi // 队列名称
  61. * passive: false // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
  62. * durable: true // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,
  63. * 设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue
  64. * exclusive: false // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
  65. * auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
  66. */
  67. return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
  68. return $channel;
  69. });
  70. })->then(function (Channel $channel) {
  71. echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
  72. $channel->consume(
  73. function (Message $message, Channel $channel, Client $client) {
  74. echo "接收消息内容:", $message->content, "\n";
  75. },
  76. 'ceshi',
  77. '',
  78. false,
  79. true
  80. );
  81. });
  82. }
  83. }

都OK以后咱们可以项目路径下通过命令启动这个消费者:

  1. php think worker:server

此时应该会自动消费掉rabbitMQ中等待的消息!

到这里消费者也就结束啦!

5、整体测试

接下来我用cmd来启动两个服务,然后用接口发送消息和消费测试!

至于具体怎么灵活应用自行开拓大脑哦~

比如php项目有些业务吃力,可以去做个java的消费端,让java来完成任务~

完美~

记得三连~

万分感谢~

标签: rabbitmq php 分布式

本文转载自: https://blog.csdn.net/weixin_47723549/article/details/124493059
版权归原作者 盘古-阿飞 所有, 如有侵权,请联系我们删除。

“php简单实现rabbitMQ消息列队(必须收藏)”的评论:

还没有评论