在使用 RabbitMQ 之前,你要安装好 RabbitMQ 服务,具体安装方法可以参考 windows下安装RabbitMQ
1、安装扩展
进入TP5 更目录下,输入命令安装:
composer require php-amqplib/php-amqplib
2、自定义命令
TP5 的自定义命令,这里也简单说下。
第一步:
创建命令类文件,新建 application/api/command/Test.php。
<?phpnamespaceapp\api\command;usethink\console\Command;usethink\console\Input;usethink\console\Output;/**
* 自定义命令测试
*/classTestextendsCommand{/**
* 配置
*/protectedfunctionconfigure(){// 设置命令的名称和描述$this->setName('test')->setDescription('这是一个测试命令');}/**
* 执行
*/protectedfunctionexecute(Input$input,Output$output){$output->writeln("测试命令");}}
这个文件定义了一个叫test的命令,备注为 这是一个测试命令,执行命令会输出:test command。
第二步:
配置 command.php文件,在 application/command.php文件中添加命令。
<?phpreturn['app\api\command\Test',];
第三步:
测试命令,在项目根目录下输入命令:
php think test
回车运行之后输出:
test command
到这里,自定义命令就结束了,test命令就自定义成功了。
3、rabbitmq服务端
下来我们自定义 RabbitMQ 启动命令,守护进程运行,启动 rabbirmq 服务端接收消息。
在 application/api/command 目录下,新建 Ramq.php 文件,在执行命令的方法中,调用 RabbitMQ 启动守护进程方法即可。
<?phpnamespaceapp\api\command;usePhpAmqpLib\Connection\AMQPStreamConnection;usethink\console\Command;usethink\console\Input;usethink\console\Output;/**
* RabbitMq 启动命令
*/classRamqextendsCommand{protected$consumerTag='customer';protected$exchange='xcuser';protected$queue='xcmsg';protectedfunctionconfigure(){$this->setName('ramq')->setDescription('rabbitmq');}protectedfunctionexecute(Input$input,Output$output){$output->writeln("消息队列开始");$this->start();// 指令输出$output->writeln('消费队列结束');}/**
* 关闭
*/functionshutdown($channel,$connection){$channel->close();$connection->close();}/**
* 回调处理信息
*/functionprocess_message($message){if($message->body!=='quit'){echo$message->body;}//手动应答$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);if($message->body==='quit'){$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);}}/**
* 启动 守护进程运行
*/publicfunctionstart(){$host='127.0.0.1';$port=5672;$user='guest';$pwd='guest';$vhost='/';$connection=newAMQPStreamConnection($host,$port,$user,$pwd,$vhost);$channel=$connection->channel();$channel->queue_declare($this->queue,false,true,false,false);$channel->exchange_declare($this->exchange,'direct',false,true,false);$channel->queue_bind($this->queue,$this->exchange);$channel->basic_consume($this->queue,$this->consumerTag,false,false,false,false,array($this,'process_message'));register_shutdown_function(array($this,'shutdown'),$channel,$connection);while(count($channel->callbacks)){$channel->wait();}}}
在application/command.php文件中,添加rabbitmq自定义命令。
return['app\api\command\Ramq',// rabbitmq];
4、发送端
最后,我们再写发送消息的控制器,实现消息队列,具体代码如下:
<?phpnamespaceapp\api\controller;usePhpAmqpLib\Connection\AMQPStreamConnection;usePhpAmqpLib\Message\AMQPMessage;usethink\Controller;/**
* 发送端
*/classMessageQueueextendsController{constexchange='xcuser';constqueue='xcmsg';/**
* 发送消息
*/publicfunctionpushMessage($data){$host='127.0.0.1';$port=5672;$user='guest';$pwd='guest';$vhost='/';$connection=newAMQPStreamConnection($host,$port,$user,$pwd,$vhost);$channel=$connection->channel();$channel->exchange_declare(self::exchange,'direct',false,true,false);$channel->queue_declare(self::queue,false,true,false,false);$channel->queue_bind(self::queue,self::exchange);$messageBody=$data;$message=newAMQPMessage($messageBody,array('content_type'=>'text/plain','delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT));$channel->basic_publish($message,self::exchange);$channel->close();$connection->close();echo'ok';}/**
* 执行
*/publicfunctionindex(){$data=json_encode(['msg'=>'测试数据','id'=>'15']);$this->pushMessage($data);}}
5、验证
先执行自定义命令,启动 rabbitmq 守护进程。在项目更目录下打开命令行,输入下面命令:
php think ramq
然后在浏览器访问发送信息的方法,http://你的域名/api/message/index,你发送一次消息,在命令行就会输出一条消息。这样我们就用 RabbitMQ 实现了一个简单的消息队列。
版权归原作者 L小臣 所有, 如有侵权,请联系我们删除。