0


laravel使用rabbitmq

laravel使用rabbitmq

1 . 拉取composer包

composer require vladimir-yuldashev/laravel-queue-rabbitmq

2 . 在config/queue.php 文件中配置信息

'rabbitmq'=>['driver'=>'rabbitmq','queue'=>env('RABBITMQ_QUEUE','default'),'connection'=>PhpAmqpLib\Connection\AMQPLazyConnection::class,'hosts'=>[['host'=>env('RABBITMQ_HOST','127.0.0.1'),'port'=>env('RABBITMQ_PORT',5672),'user'=>env('RABBITMQ_USER','guest'),'password'=>env('RABBITMQ_PASSWORD','guest'),'vhost'=>env('RABBITMQ_VHOST','/'),],],'options'=>['ssl_options'=>['cafile'=>env('RABBITMQ_SSL_CAFILE',null),'local_cert'=>env('RABBITMQ_SSL_LOCALCERT',null),'local_key'=>env('RABBITMQ_SSL_LOCALKEY',null),'verify_peer'=>env('RABBITMQ_SSL_VERIFY_PEER',true),'passphrase'=>env('RABBITMQ_SSL_PASSPHRASE',null),],'queue'=>['job'=>\VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class,]],/*
             * Set to "horizon" if you wish to use Laravel Horizon.
             */'worker'=>env('RABBITMQ_WORKER','default'),]

3 . 在.env文件配置

.env文件中
# 默认使用rabbitmq队列,后面启动队列监听的时候,如果不指定驱动,就默认用这个名称指定的驱动QUEUE_CONNECTION=rabbitmq
# 使用的队列驱动QUEUE_DRIVER=rabbitmq
# mq的ip地址RABBITMQ_HOST=172.17.0.10# mq的端口RABBITMQ_PORT=5672# mq的账号RABBITMQ_USER=admin
# mq的密码RABBITMQ_PASSWORD=admin
# 默认的虚拟主机RABBITMQ_VHOST=my_vhost
# 默认队列名称RABBITMQ_QUEUE=product

4 . 创建jobs文件

使用laravel的队列监听

php artisan make:job UpdateProduct

5 . 在UpdateProduct.php中编写生产者与消费者

生产者在把消息推送到laravel的事件监听中,初始化生产者配置,创建rabbitmq的所需要绑定的交换机,路由,队列,并且进行绑定。并且监听消费者,当有消息消费时,则从rabbitmq的队列中获取消息,消费成功进行ack

<?php/**
 * php artisan make:job UpdateProduct
 */namespaceApp\Jobs;useApp\Services\RabbitmqService;useIlluminate\Bus\Queueable;useIlluminate\Contracts\Queue\ShouldQueue;useIlluminate\Foundation\Bus\Dispatchable;useIlluminate\Queue\InteractsWithQueue;useIlluminate\Queue\SerializesModels;classUpdateProductimplementsShouldQueue{useDispatchable, InteractsWithQueue, Queueable, SerializesModels;protected$productKey;/**
     * UpdateProduct constructor.
     * @param $data
     * @throws \Exception
     */publicfunction__construct($data){$this->productKey="product::info::{$data->id}";//服务生产者RabbitmqService::push('product','exc_product','pus_product',$data);}/**
     * 服务消费者会走到这里,把消息消费掉
     * @throws \Exception
     */publicfunctionhandle(){RabbitmqService::pop('product',function($message){print_r('消费者消费消息'.PHP_EOL);print_r(PHP_EOL);$key=$this->productKey.':'.date('Y-m-d H:i:s');$input=serialize(json_decode($message,true));$product=app('redis')->set($key,$input);if($product){print_r('消息消费成功');returntrue;}else{print_r('消息消费失败');returnfalse;}});}/**
     * 异常扑获
     * @param \Exception $exception
     */publicfunctionfailed(\Exception$exception){print_r($exception->getMessage());}}

6 . 创建操作rabbitmq的service

封装具体操作rabbitmq的方法

<?phpnamespaceApp\Services;usePhpAmqpLib\Connection\AMQPStreamConnection;usePhpAmqpLib\Message\AMQPMessage;classRabbitmqService{privatestaticfunctiongetConnect(){$config=['host'=>env('RABBITMQ_HOST','127.0.0.1'),'port'=>env('RABBITMQ_PORT',5672),'user'=>env('RABBITMQ_USER','guest'),'password'=>env('RABBITMQ_PASSWORD','guest'),'vhost'=>env('RABBITMQ_VHOST','/'),];returnnewAMQPStreamConnection($config['host'],$config['port'],$config['user'],$config['password'],$config['vhost']);}/**
     * 数据插入到mq队列中(生产者)
     * @param $queue   .队列名称
     * @param $messageBody .消息体
     * @param string $exchange .交换机名称
     * @param string $routing_key .设置路由
     * @throws \Exception
     */publicstaticfunctionpush($queue,$exchange,$routing_key,$messageBody){//获取连接$connection=self::getConnect();//构建通道(mq的数据存储与获取是通过通道进行数据传输的)$channel=$connection->channel();//监听数据,成功$channel->set_ack_handler(function(AMQPMessage$message){dump("数据写入成功");});//监听数据,失败$channel->set_nack_handler(function(AMQPMessage$message){dump("数据写入失败");});//声明一个队列$channel->queue_declare($queue,false,true,false,false);//指定交换机,若是路由的名称不匹配不会把数据放入队列中$channel->exchange_declare($exchange,'direct',false,true,false);//队列和交换器绑定/绑定队列和类型$channel->queue_bind($queue,$exchange,$routing_key);$config=['content_type'=>'text/plain','delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT];//实例化消息推送类$message=newAMQPMessage($messageBody,$config);//消息推送到路由名称为$exchange的队列当中$channel->basic_publish($message,$exchange,$routing_key);//监听写入$channel->wait_for_pending_acks();dump('生产者已操作');//关闭消息推送资源$channel->close();//关闭mq资源$connection->close();}/**
     * 消费者:取出消息进行消费,并返回
     * @param $queue
     * @param $callback
     * @return bool
     * @throws \Exception
     */publicstaticfunctionpop($queue,$callback){print_r('消费者中心'.PHP_EOL);$connection=self::getConnect();//构建消息通道$channel=$connection->channel();//从队列中取出消息,并且消费$message=$channel->basic_get($queue);if(!$message)returnfalse;//消息主题返回给回调函数$res=$callback($message->body);if($res){print_r('ack验证'.PHP_EOL);//ack验证,如果消费失败了,从新获取一次数据再次消费$channel->basic_ack($message->getDeliveryTag());}print_r('ack消费完成'.PHP_EOL);$channel->close();$connection->close();returntrue;}}

7 . 生产者控制器中派遣任务

<?phpnamespaceApp\Http\Controllers\Api\V1;useApp\Http\Controllers\ApiResponse;useApp\Http\Controllers\Controller;useApp\Jobs\CloseOrder;useApp\Jobs\UpdateProduct;useApp\Models\Product;useApp\pool\redisPool;useApp\Services\PublicService;useIlluminate\Http\Request;useApp\Services\ElasticsearchService;classProductControllerextendsController{useApiResponse;/**
     * 推送消息到mq中
     * @param Request $request
     * @return \Illuminate\Http\JsonResponse
     */publicfunctionpushRabbitmq(Request$request){try{$id=$request->input('id');if(!$id)return$this->failed('缺少店铺id');$select='id,name,long_name,shop_id,create_time';$info=Product::query()->selectRaw($select)->where('id',$id)->first();$productJob=newUpdateProduct($info);//派遣$this->dispatch($productJob);return$this->success('操作成功');}catch(\Exception$e){return$this->failed($e->getMessage().$e->getLine());}}}

8 . 测试:

8 . 1 生产者开始推送消息

在这里插入图片描述

8 . 2 消费者监听消息

php artisan queue:work rabbitmq

这里分别在两个不同的服务器中部署同一份代码,分别启动消费者进行消息消费

服务器一

在这里插入图片描述

服务器二

在这里插入图片描述

两个消费者消费消息,并不是轮询的,而是看谁空闲,则由谁来消费,如果都空闲,则随机,如果消费者处理不过来,可以增加多几台服务器,一起进行消息的消费

注: 为何能在其他服务器监听到别的服务器监听到laravel发布的消息?因为在 监听队列的时候,指定了rabbitmq为驱动

在这里插入图片描述


本文转载自: https://blog.csdn.net/weixin_41753567/article/details/126097031
版权归原作者 Dear-xq 所有, 如有侵权,请联系我们删除。

“laravel使用rabbitmq”的评论:

还没有评论