0


Hyperf使用RabbitMQ消息队列

Hyperf连接使用RabbitMQ消息中间件

传送门

  1. 使用Docker部署RabbitMQ,->传送门<
  2. 使用Docker部署Hyperf,->传送门-<

部署环境

安装amqp扩展

composer require hyperf/amqp

安装command命令行扩展

composer require hyperf/command

配置参数

假设已经在rabbitmq设置了交换机exchange_test和队列queue_test

新建 /config/autoload/amp.php配置文件,修改地址和用户名密码

<?phpreturn['default'=>['host'=>'127.0.0.1',//rabbitmq服务的地址'port'=>5672,'user'=>'user','password'=>'123456','vhost'=>'/','concurrent'=>['limit'=>1,],'pool'=>['connections'=>1,],'params'=>['insist'=>false,'login_method'=>'AMQPLAIN','login_response'=>null,'locale'=>'en_US','connection_timeout'=>3.0,'read_write_timeout'=>6.0,'context'=>null,'keepalive'=>false,'heartbeat'=>3,'close_on_destruct'=>false,],],'pool2'=>[...]];

生产数据

创建生产者中间件

php bin/hyperf.php gen:amqp-producer DemoProducer

exchange是交换机,routingKey是队列名

<?phpdeclare(strict_types=1);namespaceApp\Amqp\Producers;useHyperf\Amqp\Annotation\Producer;useHyperf\Amqp\Message\ProducerMessage;/**
 * @Producer(exchange="exchange_test", routingKey="queue_test")
 */classDemoProducerextendsProducerMessage{publicfunction__construct($data){//将收到的数据加入队列$this->plyload=$data;}}

创建生产者脚本

php bin/hyperf.php gen:command FooCommand

代码

<?phpdeclare(strict_types=1);namespaceApp\Command;useHyperf\Command\Commandas HyperfCommand;useHyperf\Command\Annotation\Command;useHyperf\Amqp\Producer;useApp\Amqp\Producers\DemoProducer;useHyperf\Utils\ApplicationContext;/**
 * @Command
 */classFooCommandextendsHyperfCommand{/**
     * 执行的命令行
     *
     * @var string
     */protected$name='foo:command';publicfunctionhandle(){//协程代码,创建1000个协程分别处理$wg=new\Hyperf\Utils\WaitGroup();$wg->add(1000);// 计数器加1000for($i=0;$i<1000;$i++){// 创建协程$ico(function()use($wg){//amqp代码,将数据加入生产者队列$message=newDemoProducer(['id'=>$i]);$producer=ApplicationContext::getContainer()->get(Producer::class);$result=$producer->produce($message);// 计数器减一$wg->done();});}// 等待所有协程运行完成$wg->wait();}}

调用命令行,来生产数据

php bin/hyperf.php foo:command

至此,进入rabbitmq后台,对应的队列里就会有数据。

消费数据

创建消费者中间件

php bin/hyperf.php gen:amqp-consumer DemoConsumer

代码解释如上,多的queue也是队列名,num是进程数

<?phpdeclare(strict_types=1);namespaceApp\Amqp\Consumers;useHyperf\Amqp\Annotation\Consumer;useHyperf\Amqp\Message\ConsumerMessage;useHyperf\Amqp\Result;usePhpAmqpLib\Message\AMQPMessage;#[Consumer(exchange:"hyperf",routingKey:"hyperf",queue:"hyperf",nums:1)]classDemoConsumerextendsConsumerMessage{publicfunctionconsumeMessage($data,AMQPMessage$message):string{print_r($data);returnResult::ACK;}}

重启框架会自动调用消费者

php bin/hyperf.php start

在这里插入图片描述
原创码字不易,喜欢请收藏关注

部分参考自:https://www.bilibili.com/video/BV1de4y1E7Ya/?vd_source=36102b089bcd7ff8177499ba833633e0

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/xiantianga6883/article/details/129344700
版权归原作者 骷大人 所有, 如有侵权,请联系我们删除。

“Hyperf使用RabbitMQ消息队列”的评论:

还没有评论