一.什么是消息队列
1.简介
在介绍消息队列之前,应该先了解什么是
AMQP(Advanced Message Queuing Protocol, 高级消息队列协议,点击查看)
消息(Message)是指在应用间
传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象;而
消息队列(Message Queue)是一种
应用间的
通信方式,消息发送后可以
立即返回,由
消息系统来确保消息的
可靠传递,
消息发布者只管把消息发布到
MQ 中而不用管谁来取,
消息使用者只管从 MQ 中取消息而不管是谁发布的,这样发布者和使用者都不用知道对方的存在,它是典型的
生产者-消费者模型,生产者不断向消息队列生产消息,消费者不断的从队列获取消息。因为消息的生产和消费都是
异步的,并且只关心消息的发送和接收,没有业务逻辑的浸入,这样就实现了生产者和消费者的
解耦
2.总结
(1).消息队列是队列结构的
中间件
(2).消息发送后,不需要立即处理,而是由消息系统来处理(3).消息处理是
消息使用者(消费者)按顺序处理的
3.结构图
二.为什么要使用消息队列
消息队列是一种应用间的异步协作机制,是分布式系统中的重要的组件,主要目的是为了解决应用
藕合,
异步消息,
流城削锋, 冗余,扩展性,排序保证等问题,实现
高性能,
高并发,
可伸缩和
最终一致性架构,下面举例说明
业务解耦
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知,商品配送等业务;在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,或者单独拆分出来作为一个独立的系统,比如生成相应单据为订单系统,扣减库存为库存系统,发放红包独立为红包系统、发短信通知为短信系统,商品配送为配送系统等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信或商品配送之类的消息时,执行相应的业务系统逻辑,这样各个业务系统相互独立,就很方便进行分离部署,防止某一系统故障引起的连锁故障
流量削峰
流量削峰一般在秒杀或者团抢活动中广泛使用
(1).由来
主要是还是来自于互联网的业务场景,例如:春节火车票抢购,大量的用户需要同一时间去抢购;以及双11秒杀, 短时间上亿的用户涌入,瞬间流量巨大(高并发),比如:200万人准备在凌晨12:00准备抢购一件商品,但是商品的数量缺是有限的100-500件左右。这样真实能购买到该件商品的用户也只有几百人左右, 但是从业务上来说,秒杀活动是希望更多的人来参与,也就是抢购之前希望有越来越多的人来看购买商品。但是,在抢购时间达到后,用户开始真正下单时,秒杀的服务器后端却不希望同时有几百万人同时发起抢购请求。因为服务器的处理资源是有限的,所以出现峰值的时候,很容易导致服务器宕机,用户无法访问的情况出现。这就好比出行的时候存在早高峰和晚高峰的问题,为了解决这个问题,出行就有了错峰限行的解决方案。同理,在线上的秒杀等业务场景,也需要类似的解决方案,需要平安度过同时抢购带来的流量峰值的问题,这就是流量削峰的由来。
(2).怎样来实现流量削峰方案
削峰从本质上来说就是更多地延缓用户请求,以及层层过滤用户的访问需求,遵从“最后落地到数据库的请求数要尽量少”的原则。
1).消息队列解决削峰
要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去。
消息队列中间件主要解决应用耦合,异步消息, 流量削锋等问题;常用消息队列系统:目前在生产环境,使用较多的消息队列有 ActiveMQ、RabbitMQ、 ZeroMQ、Kafka、MetaMQ、RocketMQ 等。
在这里,消息队列就像“水库”一样,拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的。
2).流量削峰漏斗:层层削峰
针对秒杀场景还有一种方法,就是对请求进行分层过滤,从而过滤掉一些无效的请求。分层过滤其实就是采用“漏斗”式设计来处理请求的,如下图所示:
这样就像漏斗一样,尽量把数据量和请求量一层一层地过滤和减少了
I.分层过滤的核心思想
通过在不同的层次尽可能地过滤掉无效请求
通过CDN过滤掉大量的图片,静态资源的请求
再通过类似Redis这样的分布式缓存,过滤请求等就是典型的在上游拦截读请求
II.分层过滤的基本原则
对写数据进行基于时间的合理分片,过滤掉过期的失效请求
对写请求做限流保护,将超出系统承载能力的请求过滤掉
涉及到的读数据不做强一致性校验,减少因为一致性校验产生瓶颈的问题
对写数据进行强一致性校验,只保留最后有效的数据
最终,让“漏斗”最末端(数据库)的才是有效请求,例如:当用户真实达到订单和支付的流程,这个是需要数据强一致性的。
(3).总结
1).对于秒杀这样的高并发场景业务,最基本的原则就是将请求拦截在系统上游,降低下游压力。如果不在前端拦截很可能造成数据库(mysql、oracle等)读写锁冲突,甚至导致死锁,最终还有可能出现雪崩等场景。
2).划分好动静资源,静态资源使用CDN进行服务分发
3).充分利用缓存(redis等),增加QPS,从而加大整个集群的吞吐量
4).高峰值流量是压垮系统很重要的原因,所以需要RabbitMQ等消息队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去
在
异步处理
用户注册后,需要发送注册邮件和注册短信
三.RabbitMQ介绍
RabbitMQ是一个由
erlang语言开发的,实现了AMQP协议的标准的开源消息代理和队列服务器(
消息队列中间件)
1.常见的消息队列中间件
2.RabbitMQ特性
可靠性(Reliability)RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认
灵活的路由(Flexible Routing)在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange
消息集群(Clustering)多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker
高可用(Highly Available Queues)队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用
多种协议(Multi-protocol)RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等
多语言客户端(Many Clients)RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby ,PHP等
管理界面(Management UI)RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面
跟踪机制(Tracing)如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么
插件机制(Plugin System)RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件
3.RabbitMQ工作原理
内部实际上也是 AMQP 中的基本概念
上图各个模块的说明:
Message: 消息,消息是不具名的,它由消息头和消息体组成,消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等
Publisher: 消息的生产者,也是一个向交换器发布消息的客户端应用程序
Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序
Broker: 接收和分发消息的应用,表示消息队列服务器实体,RabbitMQ Server就是Message Broker
Virtual host: 虚拟主机(共享相同的身份认证和加密环境的独立服务器域),表示一批交换器、消息队列和相关对象,类似于mysql的数据库,当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange、queue等.每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制,vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /
Connection: publisher、consumer和broker之间的网络连接,比如:TCP连接,断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题
Channel: 管道,多路复用连接中的一条独立的双向数据流通道,信道是建立在真实的TCP连接内地虚拟连接(逻辑连接),如果应用程序支持多线程,通常每个多线程创建单独的channel进行通讯, 因为AMQP 方法中包含了channel id帮助客户端和broker识别channel,所以channel之间是完全隔离的,AMQP 命令都是通过管道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过管道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低,所以引入了管道的概念,目的是为了减少操作系统建立TCP 连接的开销,以复用一条 TCP 连接
Exchange: 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列,message到达broker的第一站,根据分发规则,匹配查询表中的路由键(routing key),分发消息到queue中去,常用的类型有:直连交换机-direct (point-to-point), 主题交换机-topic (publish-subscribe),扇型交换机-fanout (multicast), 头交换机-headers(amq.match (and amq.headers in RabbitMQ))
Queue: 消息队列,用来保存消息直到发送给消费者,它是消息的容器,也是消息的终点,一个消息可投入一个或多个队列,消息最终被送到这里等待消费者连接到这个队列将其取走
Binding: 绑定,消息队列(queue)和交换器(exchange)之间的虚拟连接, binding中可以包含routing key, Binding信息被保存到exchange中的查询表中,用于message的分发依据,一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表
四.RabbitMQ安装与启动
见linux下,docker-compose 安装nignx,php,mysql,redis,rabbitmq,mongo
端口说明:安装完rabbitmq后,有几个常见端口:
4369: epmd(Erlang Port Mapper Daemon),erlang服务端口
5672: client端通信端口
15672:http Api客户端,管理UI(仅在启用了管理插件的情况下使用)
25672:用于节点间通信(Erlang分发服务器端口)
五.RabbitMQ几个重要特性概念讲解
队列模式-简单队列模式,工作队列模式
ACK&NACK消费确认机制&重回队列机制
消息持久化
公平调度(限流机制)
幂等性
return机制
消息的可靠性投递
下面是RabbitMQ和消息所涉及到的一些
术语
- 生产(Producing)的意思就是发送:发送消息的程序就是一个生产者(producer),一般用"P"来表示:
- 队列(queue)就是存在于RabbitMQ中邮箱的名称:虽然消息的传输经过了RabbitMQ和你的应用程序,但是它只能被存储于队列当中。实质上队列就是个巨大的消息缓冲区,它的大小只受主机内存和硬盘限制。多个生产者(producers)可以把消息发送给同一个队列,同样,多个消费者(consumers)也能够从同一个队列(queue)中获取数据。队列可以绘制成这样(图上是队列的名称):
- 消费(Consuming)和接收(receiving)是同一个意思:一个消费者(consumer)就是一个等待获取消息的程序。把它绘制为"C":
以php框架yii2为参照
简单队列模式(simple queue)
发送
单个消息的生产者,以及
接收消息并将其
打印出来的消费者。将忽略RabbitMQ API中的一些细节。 在下图中,“P”是生产者,“C”是消费者。中间的框是一个队列(保存消息的地方)
(1).生产者发布消息步骤
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 创建连接
$connection = new AMQPStreamConnection($host, $port, $user, $pass, $v_host="/");
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化(声明队列)
$channel->queue_declare($queue_name, false, true, false, false);
//消息内容
$data = "this is message2";
// 声明消息,并持久化(创建消息)
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 把消息推到队列里(发布消息)
$channel->basic_publish($mes, '', $queue_name);
//关闭通道和连接
$channel->close();
$connection->close();
上面声明队列方法queue_declare()参数详解
(2).消费者消费消息步骤
//核心代码
basic_consume($queue = '', $consumer_tag = '', $no_local = false,$no_ack = false,$exclusive = false,$nowait = false,$callback = null,$ticket = null,$arguments = array())
上面消费消息方法basic_consume()参数详解
(3).具体代码展示
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 队列名称
],
]
生产者代码
<?php
/**
* 生产者生产消息
*/
namespace console\controllers\simple;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name1"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
//消息
$data = "this is message2";
// 声明消息,并持久化
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 把消息推到队列里
$channel->basic_publish($mes, '', $queue_name);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者代码
<?php
/**
* 消费者消费消息
*/
namespace console\controllers\simple;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name1"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 消费消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
};
$channel->basic_consume($queue_name, "", false, true, false, false,$callback);
// 监控
while ($channel->is_open()){
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
工作队列模式(worker queue)
创建一个
工作队列(Work Queue),它会发送一些
耗时的任务给
多个工作者(Worker),工作队列(又称:
任务队列——Task Queues)是为了
避免等待一些占用大量资源、时间的操作,当把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理,当运行多个工作者(workers),任务就会在它们之间
共享。这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务,使用工作队列的一个
好处就是它能够
并行的处理队列。如果堆积了很多任务,只需要添加
更多的工作者(workers)就可以了,这就是所谓的
循环调度,扩展很简单
(1).具体代码展示
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 队列名称
"name2" => "task_queue", // 队列名称
],
]
生产者代码
<?php
/**
* 生产者生产消息
*/
namespace console\controllers\worker;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 生产多条消息
for ($i = 0; $i <= 10; ++$i) {
//消息
$data = "this is " . $i. " message";
// 声明消息,并持久化
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 把消息推到队列里
$channel->basic_publish($mes, '', $queue_name);
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者代码
当生产者生产了多条费时的消息时,一个消费者不能满足需要,可以添加多个消费者处理生产者的消息,多个消费者之间采用
轮询的方式获取队列的消息,并把该消息发送给对应的用户
比如:可以对一个队列的消息开多个消费者,这里我们开了两个消费者,里面的代码都是一致的
<?php
/**
* 消费者消费消息
*/
namespace console\controllers\worker;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 消费消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
};
$channel->basic_consume($queue_name, "", false, true, false, false,$callback);
// 监控
while ($channel->is_open()){
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
<?php
/**
* 消费者2消费消息
*/
namespace console\controllers\worker;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Consumer2Controller extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 消费消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
};
$channel->basic_consume($queue_name, "", false, true, false, false,$callback);
// 监控
while ($channel->is_open()){
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
ACK消费确认机制&NACK&重回队列机制
ACK消费确认机制
当处理一个
比较耗时得任务的时候,想知道消费者(consumers)
运行到一半就挂掉时,正在处理的消息/发送给当前工作者的消息会怎样,当消息在队列中
没有进行持久化操作时,消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中
移除。这种情况,只要把一个
工作者(worker)停止,正在处理的消息就会
丢失。同时,所有发送到这个工作者的还没有处理的消息
都会丢失。所以,如果不想消息丢失,当一个工作者(worker)挂掉了,希望任务会重新发送给其他的工作者(worker),RabbitMQ就提供了
消息响应(
acknowledgments)。消费者会通过一个
ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会
释放并删除这条消息。如果消费者(consumer)
挂掉了,
没有发送响应,RabbitMQ就会认为消息
没有被完全处理,然后
重新发送给
其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。消息是没有超时这个概念的,当工作者与它断开连的时候,RabbitMQ会重新发送消息,这样在处理一个耗时非常长的消息任务的时候就不会出问题了。在该讲解中,将使用手动消息确认,通过为
no_ack参数传递
false,一旦有任务完成,使用d.ack()(false)向RabbitMQ服务器发送消费完成的确认(这个确认消息是单次传递的)
//核心代码
basic_consume($queue = '', $consumer_tag = '', $no_local = false,
$no_ack = false,$exclusive = false,$nowait = false,$callback = null,$ticket = null,$arguments = array())
// 消费消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
// 消费ack
$msg->ack();
};// 第四个参数: 需要ack确认,这里我们在callback手动确认
$channel->basic_consume($queue_name, "", false,
false, false, false,$callback);
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 队列名称
"name2" => "task_queue", // 队列名称
"name3" => "task_ack", // 队列名称
],
]
生产者代码
<?php
/**
* 生产者生产消息
*/
namespace console\controllers\ack;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name3"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 生产多条消息
for ($i = 0; $i <= 10; ++$i) {
//消息
$data = "this is " . $i. " message";
// 声明消息,并持久化
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
// 把消息推到队列里
$channel->basic_publish($mes, '', $queue_name);
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者代码
<?php
/**
* 消费者消费消息
*/
namespace console\controllers\ack;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$queue_name = $rabbitMqConfig["queue_name"]["name3"]; // 队列名称
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
// 消费消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
// 消费ack
$msg->ack();
};
// 第二个参数:同一时刻服务器只会发送1条消息给消费者
$channel->basic_qos(null, 1, null);
// 第四个参数: 需要ack确认,这里我们在callback手动确认
$channel->basic_consume($queue_name, "", false, false, false, false,$callback);
// 监控
while ($channel->is_open()){
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
NACK&重回队列机制
当设置了方法
basic_consume中
$no_ack = false时,使用手工 ACK 方式,除了ACK外,其实还有 NACK 方式,当手工 AcK 时,会发送给Broker(
服务器)一个应答,代表消息处理成功,Broker就可回送响应给生产端 .NACK 则表示消息处理失败,如果设设置了重回队列, Broker 端就会将没有成功处理的消息重新发送
通俗来讲:
手工ACK:消费成功了,向发起者确认
NACK:消费失败,让生产者重新发
一般在实际应用中,都会关闭重回队列,也就是设置为false
使用方式
消费端消费时.如果由于业务异常,可以手工 NACK 记录日志,然后进行补偿
API :basic_nack($delivery_tag, $multiple = false, $requeue = false)
如果由于服务器宕机等严里问题,就需要手工 ACK 保障消费端消费成功
API :basic_ack($delivery_tag, $multiple = false)
4.消息持久化
如果没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息,为了确保消息不会丢失,有两个事情是需要注意的:必须把“队列”和“消息”设为持久化首先,为了不让队列消失,需要把队列声明为持久化(durable),但这里面会有一定的问题,它会返回一个错误,可以使用一个快捷的解决方法——用不同名字的队列,例如task_queue
代码如上面生产者/消费者所示:
// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
消息持久化配置: "
delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT
5.公平调度(限流机制)
为什么要限流
当工作者处理消息时,会出现这么一个问题:比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松,然而RabbitMQ并不知道这些,它仍然一如既往的派发消息,这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应,它盲目的把第n-th条消息发给第n-th个消费者
假设还有这样的场景:RabbitMQ服务器有上万条未处理的消息,随便打开一个消费者Client ,会造成巨量的消息瞬间全部推送过来,然而单个客户端无法同时处理这么多数据,此时很有可能导致服务器崩溃,严重的可能导致线上的故障
还有一些其他的场景:比如说单个 生产端一分钟产生了几百条数据,但是单个消费端 一分钟可能只能处理 60 条,这个时候生产端-消费端肯定是不平衡的,通常生产端是没办法做限制的,所以消费端肯定需要做一些限流措施,否则如果超出最大负载,可能导致 消费端 性能下降,服务器卡顿甚至崩溃等一系列严重后果
RabbitMQ 提供了一种
qos (服务质质量保证)功能,即在
非自动确认消息的前提下,如果一定数目的消息 ( 通过基于
生产者或者
channel设置
Qos 的值)
未被确认前,不消费新的消息不能设置自动签收功能( auto_ack = false ),如果消息未被确认,就不会到达
消费端 ,目的就是给 生产端
减压
这是可以设置预取计数值为1,告诉RabbitMQ一次只向一个worker发送一条消息,换句话说,在处理并确认前一个消息之前,不要向工作人员发送新消息
如上面ACK消费确认机制中消费者代码:
// 第二个参数:同一时刻服务器只会发送1条消息给消费者
//basic_qos($prefetch_size, $prefetch_count, $a_global)
$channel->basic_qos(null, 1, null);
参数说明:
$prefetch_size:单条消息的大小限制, 通常设置为 0 ,表示不做限制
$prefetch_count:一次最多能处理多少条消息
$a_global:是否将上面设置: true 应用于 channel 级别, false 代表消费者级别
$prefetch_size,$a_global这两项, RabbitMQ 没有实现,暂且不研究.$prefetch_count 在auto_ack = f alse 的情况下生效,即在自动应答的情况下该值无效
6.幂等性概念
一句话概括:
用户对于同一操作发起的一次请求或者多次请求的结果是一致的
比如:对一个SQL执行100次1000次,我们可以借鉴数据库的乐观锁机制:比如我们执行一条更新库存的SQL语句:update T_reps set count = count -1, version = version +1
where version = 1,数据库的乐观锁在执行更新操作前一先去数据库查询此version ,然后执行更新语句,以此version作为条件,如果执行更新时有其他人先更新了这张表的数据,那么这个条件就不生效了,也就不会执行操作了,通过这种
乐观锁的机制来保障幕等性
消费端 - 幂等性保障
在海量订单产生的业务高峰期,如何避免
消息的重复消费问题?在业务高峰期:容易产生
消息重复消费问题,当消费端消费完消息时,在给生产者端返回ack时由于
网络中断,导致生产端
未收到确认信息,该条消息就会
重新发送并
被消费端消费,但实际上该消费端已成功消费了该条消息,这就造成了重复消费.而
消费端实现
幂等性,就意味着:消息不会被多次消费,即使收到了很多一样的消息
业界主流的幂等性操作解决方案:
(1)唯一Id + 指纹码 机制,核心:利用数据库主键去重
唯一Id: 业务表主键
指纹码: 为了区别每次正常操作的码,每次操作时生成指纹码;可以用时间截+业务编号或者标志位(具体视业务场景而定 )
select count(1) from t_order where id = 唯一Id + 指纹码
优势:实现简单
弊端:高并发下有数据库写入的性能瓶颈
解决方案:根据ID进行分库分表进行算法路由
(2)利用Redis的原子性去实现
第一:是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?
7.return机制
Return Listener用于处理一些不可路由的消息,也是生产阶段添加的一个监听
消息生产者通过指定一个Exchange和Routing Key,把消息送达到某一个队列中去,然后消费者监听队列,进行消费处理操作
但是在某些情况下,如果在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果需要监听这种不可达的消息,就要使用Return Listener
在API中有一个关键的配置项 Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么Broker(服务器)会自动删除该消息
8.消息的可靠性投递
(1).什么是生产端的可靠性投递?
保障消息的成功发出
保障MQ节点的成功接收
发送端收到MQ节点(Broker)确认应答
完善的消息进行补偿机制(在大厂一般都不会加事务,都是进行补偿操作)
在实际生产中,很难保障前三点的完全可靠,比如在
极端的环境中,生产者发送消息失败了,发送端在接受确认应答时突然发生网络闪断等等情况,很难保障可靠性投递,所以就需要有第四点完善的
消息补偿机制
(2).解决方案
方案一 消息信息落库,对消息状态进行打标(常见方案)
将消息持久化到 DB中并设置状态值,收到 Consumer 的应答就改变当前记录的状态
再轮询重新发送没接收到应答的消息,注意这里要设置重试次数
方案实现流程
比如下单成功
步骤1
对订单数据入ORDER_DB 订单库,并对因此生成的业务消息入 MSG_DB 消息库,此处由于采用了两个数据库,需要两次持久化操作,为了保证数据的一致性,有人可能就想采用分布式事务,但在大厂实践中,基本都是采用补偿机制
这里一定要保证步骤1中消息都存储成功了,没有出现任问异常情况,然后生产端再进行消息发送.如果失败了就进行快速失败机制
对业务数据和消息入库完毕就进入步骤2
步骤2
发送消息到MQ服务上,如果一切正常无误消费者监听到该消息,进入步骤3
步骤3
生产端有一个 confi rm Listener ,异步监听 Broker(服务端) 回送的响应,从而判断消息是否投递成功
步骤4
如果成功,去数据库查询该消息.并将消息状态更新为 1
步骤5
如果出现意外情况,消费者未接收到或者Listener 接收确认时发生网络闪断,导致生产端的Listener 就永远收不到这条消息的 confi rm应答了,也就是说这条消息的状态就一直为0 了,这时候就需要用到分布式定时任务来从 MSG_DB 数据库抓取那些超时了还未被消费的消息,重新发送一遍。此时需要设置一个规则,比如说消息在入库时候设置一个临界值 timeout , 5 分钟之后如果还是0的状态那就需要把消息抽取出来。这里使用的是分布式定时任务,去定时抓取 MSG_DB中距离消息创建时间超过 5 分钟的且状态为0 的消息
步骤6
把抓取出来的消息进行重新投递( Retry Send ) ,也就是从第二步开始继续往下走
步骤7
当然有些消息可能就是由于一些实际的问题无法路由到 Broker ,比如Routing Key设置不对,对应的队列被误删除了,那么这种消息即使重试多次也仍然无法投递成功,所以需要对重复次数做限制,比如限制 3 次,如果投递次数大于3次,那么就将消息状态更新为 2 ,表示这个消息最终投递失败,然后通过补偿机制,人工去处理,实际生产中.这种情况还是比较少的,但是不能没有这个补偿机制,要不然就做不到可靠性了
缺点
在第一步需要更新或者插入操作数据库2次
优化
不需要消息进行持久化 只需要业务持久化
方案二 消息的延迟投递,做二次确认,回调检查(不常用,大厂在用的高并发方案)
方案实现流程
步骤1
(上游服务: Upstream service )业务入库,然后send 消息到broker,这两步是有先后顺序的
步骤2
进行消息延迟发送到新的队列(延迟时间为 5 分钟:业务决定)
步骤3
(下游服务: Downstream service )监听到消息然后处理消息
步骤4
下游服务 send confirm生成新的消息到 broker (这里是一个新的队列 )
步骤5
callback service 去监听这个消息,并且入库,如果监听到,表示这个消息已经消费成功
步骤6
callback service 去检查 步骤2投递的延迟消息是否 在msgDB里面是否消费成功,不存在或者消费失败就会 Resend command
如果在第 1 , 2 , 4 步失败,如果成功broker 会给一个 confirm ,失败当然没有,这是消息可靠性投递的里要保障
9.注意
关于队列大小:如果所有的工作者都处理繁忙状态,队列就会被填满,需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略
六.RabbitMQ几种常见的交换器模式
1.消息模型基本介绍
前面的教程中,讲的是
发送消息到队列并从中取出消息,现在介绍RabbitMQ中
完整的消息模型
简单的概括一下之前讲的:
发布者(producer)是发布消息的应用程序
队列(queue)用于消息存储的缓冲
消费者(consumer)是接收消息的应用程序
RabbitMQ消息模型的
核心理念是:发布者(producer)
不会直接发送任何消息给
队列,事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。发布者(producer)只需要
把消息发送给一个交换机(exchange),交换机非常简单,它一边从发布者方
接收消息,一边把消息
推送到队列,交换机
必须知道如何处理它接收到的消息,是应该
路由到
指定的队列还是是
多个队列,或者是直接
忽略消息,这些规则是通过交换机类型(exchange type)来定义的
有几个可供选择的交换器类型:direct, topic, headers和fanout
direct(直连/定向交换器)
消息与一个特定的路由键完全匹配
topic(主题交换器)
使用通配符*,#,让路由键和某种模式进行匹配
headers(头交换器)
不处理路由键,而是根据发送的消息内容中的 headers 属性进行匹配
fanout(扇型交换器)
发布/ 订阅模式可以理解为广播模式:即exchange会将消息转发到所有绑定到这个exchange的队列上,这种类型在发送消息,queue bind时,都将忽略route key,也就是说不需要设置 route key
Routing Key(路由键): 生产者将消息发送给交换器,一般都会指定一个Routing Key,用来指定这个消息的路由规则,而这个Routing Key需要与交换器类型和绑定键(Binding Key)联合使用才能生效
Binding(绑定):它是Exchange与Queue之间的虚拟连接,通俗的讲就是交换器和队列之间的联系(这个队列(Queue)对这个交换机(Exchange)的消息感兴趣),实现了根据不同的Routing Key(路由规则),交换机将消息路由(发送)到对应的Queue上
2.交换器核心方法
//试探性申请一个交换器,若该交换器存在,则跳过,不存在则创建
exchange_declare($exchange,$type,$passive = false,$durable = false,$auto_delete = true,$internal = false,$nowait = false,$arguments = array(),$ticket = null)
参数名
默认值
解释
$exchange
交换器名称
$type
交换器类型:
’’ 默认交换机 匿名交换器 未显示声明类型都是该类型
fanout 扇形交换器 会发送消息到它所知道的所有队列,每个消费者获取的消息都是一致的
headers 头部交换器
direct 直连交换器,该交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配
topic 主题交换器 该交换机会对路由键正则匹配,必须是*(一个单词)、#(多个单词,以.分割) ,eg:user.key .abc.* 类型的key
$passive
false
只判断不创建(一般用于判断交换器是否存在)
true:
1.如果exchange已存在则直接连接并且不检查配置比如已存在的exchange是fanout,新需要建立的是direct,也不会报错;
2.如果exchange不存在则直接报错
false:
1.如果exchange不存在则创建新的exchange
2.如果exchange已存在则判断配置是否相同,如果配置不相同则直接报错,比如已存在的exchange是fanout,新需要建立的是direct,会报错。
$durable
false
设置是否持久化,设置为true,表示持久化,反之非持久化,持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息
$auto_delete
true
设置是否自动删除,设置为true时,表示自动删除。自动删除的前提:至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑
$internal
false
设置是否为内置的,设置为true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器,只能通过交换器路由到这个交换器
$nowait
false
如果为true,表示不等待服务器回执信息,函数将返回null,可以提高访问速度
$arguments
array()
其他结构化参数
$ticket
null
3.fanout模式(广播模式)
广播模式可以理解为:发布/订阅模式,即exchange会将消息转发到所有绑定到这个exchange的队列上。针对这种广播模式,在发送消息,queue bind时,都将忽略route key,也就是说不需要设置 route key
案例一
一个生产者生产消息并发布消息到交换器上,多个消费者订阅该交换器,并与之队列绑定,消费消息
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"exchange_name" => [
"name1" => "exch", // 交换器名称
],
]
生产者
<?php
/**
* 交换器fanout(广播)模式: 生产者生产消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 声明一个数据
$data = "this is a exchange message";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 发布消息到交换器
$channel->basic_publish($msg, $exchangeName);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者(多个)
消费者1
<?php
/**
* 交换器fanout(广播)模式: 消费者消费消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
// $queueName = $rabbitMqConfig["queue_name"]["name4"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明对应的交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 声明队列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交换机与队列绑定
$channel->queue_bind($queueName, $exchangeName);
// 消息回调处理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者2
<?php
/**
* 交换器fanout(广播)模式: 消费者消费消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Consumer2Controller extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
// $queueName = $rabbitMqConfig["queue_name"]["name4"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明对应的交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 声明队列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交换机与队列绑定
$channel->queue_bind($queueName, $exchangeName);
// 消息回调处理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
案例二
举个实际应用的场景:比方说用户注册(注销,更改姓名等)新浪,同时需要开通微博、博客、邮箱等,如果不采用队列,按照常规的线性处理,可能注册用户会特别的慢,因为在注册的时候,需要调各种其他服务器接口,如果服务很多的话,可能客户端就超时了。如果采用普通的队列,可能在处理上也会特别的慢(不是最佳方案),如果采用订阅模式,则是最优的选择
处理过程如下:
用户提交username、pwd…等之类的基本信息,将数据提交register.php中
register.php对数据进行校验,符合注册要求,生成uid,并将和基本信息json后,发布一条消息到对应的交换器中,同时直接显示用户注册成功
exchange中的多个队列,如(queue.process、queue.boke、queue.weibo、queue.email)都订阅了这个消息,根据各业务自身的逻辑来处理
总结:
1.不申明队列,因为发布/订阅模式下,是可以随时添加新的订阅队列
2.exchange的Type指定为fanout(广播模式)
3.队列不需要指定route key,绑定exchange
代码如下:
生产者
<?php
/**
* 交换器fanout(广播)模式: 生产者生产消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public function actionIndex()
{
/*
用户注册逻辑
*/
//发送消息逻辑
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = "register";
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 声明一个数据
$data = "{uid:xxx,reg_time:xxx}";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 发布消息到交换器
$channel->basic_publish($msg, $exchangeName);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者(多个)
可以创建多个不同类型的消费者(开通微博、博客、邮箱)等逻辑功能的消费者
<?php
/**
* 交换器fanout(广播)模式: 消费者消费消息
*/
namespace console\controllers\exchange\fanout;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = "register";
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明对应的交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
// 声明一个匿名队列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交换机与队列绑定
$channel->queue_bind($queueName, $exchangeName);
// 消息回调处理
$callback = function ($meg) {
//处理逻辑
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
4.Direct模式
Direct交换器将消息投递到路由参数
完全匹配的队列中
直接上代码
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 队列名称
"name2" => "task_queue", // 队列名称
"name3" => "task_ack", // 队列名称
"name4" => "exchange_fanout_1", // 队列名称
],
"exchange_name" => [
"name1" => "exch", // 交换器名称
"name2" => "exch_direct_log", // 交换器名称
],
"routing_key" => [
"info_key" => "info", // 路由键
"error_key" => "error", // 路由键
"warn_key" => "warn", // 路由键
],
】
生产者
<?php
/**
* 交换器direct(routing_key-更详细的bind模式)模式: 生产者生产消息
*/
namespace console\controllers\exchange\direct;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation=false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name2"];
$routingKey = $rabbitMqConfig["routing_key"]["error_key"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
// 声明一个数据
$data = "this is a ". $routingKey . " message";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 发布消息到交换器, 并和路由键匹配
$channel->basic_publish($msg, $exchangeName, $routingKey);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者
<?php
/**
* 交换器direct(routing_key-更详细的bind模式)模式: 消费者消费消息
*/
namespace console\controllers\exchange\direct;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerWarnController extends Controller
{
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name2"];
$routingKey = $rabbitMqConfig["routing_key"]["warn_key"]; //路由键可以修改为其他key,与生产者bind的关联
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明对应的交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
// 声明一个匿名队列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交换机与队列绑定,并指定routing_key
$channel->queue_bind($queueName, $exchangeName, $routingKey);
// 消息回调处理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
5.topic模式
发送到topic交换器的消息不可以携带随意routing_key,它的routing_key必须是一个由
.分隔开的
词语列表,这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇,以下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",词语的个数可以随意,但是
不要超过255字节。binding key也必须拥有同样的格式,topic交换器背后的逻辑跟direct交换机很相似 : 一个携带着特定routing_key的消息会被topic交换机投递给绑定键与之想匹配的队列,但是它的binding key和routing_key有两个特殊应用方式:
- (星号) 用来表示一个单词
(井号) 用来表示任意数量(零个或多个)单词
下边用图说明:
这个例子里,发送的所有消息都是用来描述小动物的,发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.分割开,路由键里的第一个单词
描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类,所以它看起来是这样的: <celerity>.<colour>.<species>。
创建了三个绑定:Q1的绑定键为 *.orange.*,Q2的绑定键为 ..rabbit 和 lazy.# 。
Q1-->绑定的是
中间带 orange 带 3 个单词的字符串 (.orange.)
Q2-->绑定的是
最后一个单词是 rabbit 的 3 个单词 (..rabbit)
第一个单词是 lazy 的多个单词 (lazy.#)
这三个绑定键被可以总结为:
Q1 对所有的桔黄色动物都感兴趣
Q2 则是对所有的兔子和所有懒惰的动物感兴趣
一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列,携带着 lazy.orange.elephant 的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。
注意:
如果违反约定,发送了一个携带有一个单词或者四个单词("orange" or "quick.orange.male.rabbit")的消息时,发送的消息不会投递给任何一个队列,而且会丢
失掉
但是另一方面,即使 "lazy.orange.male.rabbit" 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。
topic交换机是很强大的,它可以表现出跟其他交换机类似的行为 当一个队列的binding key为 "#"(井号) 的时候,这个队列将会无视消息的routing key,接收所有的消息。 当 * (星号) 和 # (井号) 这两个特殊字符都未在binding key中出现的时候,此时Topic交换机就拥有的direct交换机的行为
代码如下:
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 队列名称
"name2" => "task_queue", // 队列名称
"name3" => "task_ack", // 队列名称
"name4" => "exchange_fanout_1", // 队列名称
],
"exchange_name" => [
"name1" => "exch", // 交换器名称
"name2" => "exch_direct_log", // 交换器名称
"name3" => "exch_topic_log", // 交换器名称
],
"routing_key" => [
"info_key" => "info", // 路由键
"error_key" => "error", // 路由键
"warn_key" => "warn", // 路由键
"all_key" => "#", // 所有的路由键
"user_info" => "user.info", // 路由键
"user_warn" => "user.warn", // 路由键
"user_all" => "user.*", // 匹配以user.开头的路由键
],
]
】
生产者
<?php
/**
* 交换器topic(通配符)模式: 生产者生产消息
*/
namespace console\controllers\exchange\topic;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name3"];
$routingKey = $rabbitMqConfig["routing_key"]["user_info"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, false, false);
// 声明一个数据
$data = "this is a ". $routingKey . " message";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 发布消息到交换器, 并和路由键匹配
$channel->basic_publish($msg, $exchangeName, $routingKey);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者
<?php
/**
* 交换器topic(通配符)模式: 消费者消费消息
*
*/
namespace console\controllers\exchange\topic;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name3"];
$routingKey = $rabbitMqConfig["routing_key"]["user_all"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明对应的交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, false, false);
// 声明队列
list($queueName) = $channel->queue_declare('', false, false, true, false);
// 交换机与队列绑定,并指定routing_key
$channel->queue_bind($queueName, $exchangeName, $routingKey);
// 消息回调处理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
七.死信队列,延时队列
1.死信队列
死信( Dead Letter )是RabbitMQ 中的一种
消息机制,当在消费消息时,如果队列里的消息出现以下情况:
消息被拒绝
消息在队列的存活时间超过设置的 TTL 时间
TTL (Time To Live),即生存时间
RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
RabbitMQ 支持为每个队列设号消息的超时时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会被自动清除
消息队列的消息数量已经超过最大队列长度
那么该消息将成为“死信”,“死信”消息会被 RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃
RabbitMQ 中有一种交换器叫
DLX,全称为
Dead 一 Letter 一 Exchange ,可以称之为
死信交换器,当消息在一个队列中变成死信( dead message )消息之后,它会被重新发送到另外一个交换器中,这个交换器就是DLX ,
绑定在 DLX上的队列就称之为死信队列,程序就可以监听这个队列中的消息,并做相应的处理,该特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能
消息变成死信有以下几种情况:
消息被拒绝消息
TTL 过期(延迟队列)
队列达到最大长度
2.延时队列
延时队列就是用来存放需要在
指定时间被处理的元素的队列.一般可以利用
死信队列的特性来
实现延迟队列:只要给消息设置一个过期时间,消息过期就会自动进入死信队列,消费者只要监听死信队列就可以实现延迟队列了
应用场景
订单在十分钟之内未支付则自动取消
账单在一周内未支付,则自动结算
某个时间下发一条通知
用户注册成功后,如果三天内没有登陆则进行短信提醒
用户发起退款,如果三天内没有得到处理则通知相关运营人员
下面通过一个案例来更进一步了解死信队列,延时队列
案例1
订单在一段时间内未支付则自动取消,步骤:
(1).创建订单操作
(2).订单创建成功后,订单相关数据json处理
(3).构建rabbitmq消息队列,并设置消息过期时间为60秒,把订单相关json数据发布到交换器, 并和路由键匹配,生产者生产消息60秒之后,消息会进入到死信队列,消费者监听死信队列,处理订单
rabbitmq配置
"rabbitMq" => [
"base" => [
'host' => '192.168.0.5', // host地址
'port' => 5672, // 端口
"user" => "user", // 账户
'pass' => 123456, // 密码
"v_host" => "order", // 对应Virtual Hosts
],
"queue_name" => [
"name1" => "goods", // 队列名称
"name2" => "task_queue", // 队列名称
"name3" => "task_ack", // 队列名称
"name4" => "exchange_fanout_1", // 队列名称
"name5" => "queue_pay", // 订单支付队列
],
"exchange_name" => [
"name1" => "exch", // 交换器名称
"name2" => "exch_direct_log", // 交换器名称
"name3" => "exch_topic_log", // 交换器名称
"name4" => "exch_pay", // 订单支付, 交换器名称
],
"routing_key" => [
"info_key" => "info", // 路由键
"error_key" => "error", // 路由键
"warn_key" => "warn", // 路由键
"order_key" => "order_pay", // 订单支付
"all_key" => "#", // 所有的路由键
"user_info" => "user.info", // 路由键
"user_warn" => "user.warn", // 路由键
"user_all" => "user.*", // 匹配以user.开头的路由键
],
"dead_letter" => [ // 死信队列
"exchange_name" => [ // 死信队列交换机名称
"pay" => "dead_exch_pay"
],
"queue_name" => [ // 死信队列名称
"pay" => "dead_queue_pay"
],
"routing_key" => [ // 死信队列routing名称
"pay" => "dead_routing_key_pay"
]
]
]
生产者
<?php
/**
* 死信队列,延时队列: 生产者推送消息到队列,模拟订单支付
*/
namespace console\controllers\exchange\dead;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$deadConfig = Yii::$app->params["rabbitMq"]["dead_letter"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["exchange_name"]["name4"];
$queueName = $rabbitMqConfig["queue_name"]["name5"];
$routingKey = $rabbitMqConfig["routing_key"]["order_key"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器, 交换机类型: routing_key-更详细的bind模式
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
// 消息队列的额外参数
$args = new AMQPTable([
'x-message-ttl' => 2000, // 消息的过期时间
"x-dead-letter-exchange" => $deadConfig["exchange_name"]["pay"], // 死信队列交换机名称
"x-dead-letter-routing-key" => $deadConfig["routing_key"]["pay"] // 死信队列routing名称
]);
// 声明队列
$channel->queue_declare($queueName, false, true, false, false, false, $args);
// 交换机和队列绑定
$channel->queue_bind($queueName, $exchangeName, $routingKey);
// 声明死信交换机
$channel->exchange_declare($deadConfig["exchange_name"]["pay"], AMQPExchangeType::DIRECT, false, false, false);
// 声明死信队列
$channel->queue_declare($deadConfig["queue_name"]["pay"], false, true, false, false, false);
// 死信交换机和队列绑定
$channel->queue_bind($deadConfig["queue_name"]["pay"], $deadConfig["exchange_name"]["pay"], $deadConfig["routing_key"]["pay"]);
// 声明一个数据:里面可以是用户订单相关json数据
$data = "this is a dead message";
// 初始化消息并持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
// 发布消息到交换器, 并和路由键匹配
$channel->basic_publish($msg, $exchangeName, $routingKey);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者
<?php
/**
* 死信队列,延时队列: 模拟订单支付,消费者消费消息
*
*/
namespace console\controllers\exchange\dead;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$config = $rabbitMqConfig["base"];
$exchangeName = $rabbitMqConfig["dead_letter"]["exchange_name"]["pay"];
$queueName = $rabbitMqConfig["dead_letter"]["queue_name"]["pay"];
$routingKey = $rabbitMqConfig["dead_letter"]["routing_key"]["pay"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明对应的交换器
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
// 交换机与队列绑定,并指定routing_key
$channel->queue_bind($queueName, $exchangeName, $routingKey);
// 消息回调处理
$callback = function ($meg) {
//处理订单相关数据
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
问题
通过上面的案例就可以实现死信队列,延时队列操作,上面看上去似乎没什么问题,实测一下就会发现
消息不会“如期死亡”。当先生产一个TTL为60s的消息,再生产一个TTL为5s的消息,第二个消息并不会再5s后过期进入死信队列,而是需要等到第一个消息TTL到期后,与第一个消息一同进入死信队列,
这是因为RabbitMQ 只会判断队列中的第一个消息是否过期
那么怎么来解决这个问题呢?
通过 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来解决
此插件的原理是将消息在交换机处暂存储在mnesia(一个分布式数据系统)表中,延迟投递到队列中,等到消息到期再投递到队列当中
rabbitmq_delayed_message_exchange插件安装
(1).下载地址:https://www.rabbitmq.com/community-plugins.html
注意: 要下载与rabbitmq相对应的版本
(2).把下载的插件放到指定位置
下载的文件为zip格式,将zip格式解压,插件格式为ez,将文件复制到插件目录:
- Linux
/usr/lib/rabbitmq/lib/rabbitmq_server-xxx/plugins
rabbitmq-plugins list
- Windows
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.17\plugins
(3).启动插件
- Linux
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- Windows
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
启动信息:
(4).查看
重构代码
生产者
生产者实现的关键点:
1.在声明交换机时不在是direct类型,而是x-delayed-message类型,这是由插件提供的类型
2.交换机要增加"x-delayed-type": "direct"参数设置
3.发布消息时,要在 Headers 中设置x-delay参数,来控制消息从交换机过期时间
<?php
/**
* 死信队列,延时队列插件使用: 模拟订单支付
*/
namespace console\controllers\exchange\delay;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PublisherController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$delayConfig = Yii::$app->params["rabbitMq"]["delay"];
$config = $rabbitMqConfig["base"];
// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器, 交换机类型: 延时插件名称(x-delayed-message)
$channel->exchange_declare($delayConfig["exchange_name"]["pay"], "x-delayed-message", false, true, false);
// 消息队列的额外参数
$args = new AMQPTable(["x-delayed-type" => "direct"]);
// 声明队列
$channel->queue_declare($delayConfig["queue_name"]["pay"], false, true, false, false, false, $args);
// 交换机和队列绑定
$channel->queue_bind($delayConfig["queue_name"]["pay"], $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
// 声明一个数据
$data = "this is a dead message";
// 初始化消息并持久化
$arr = [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT,
"application_headers" => new AMQPTable([
"x-delayed" => 2000 // 过期时间
])
];
$msg = new AMQPMessage($data, $arr);
// 发布消息到交换器, 并和路由键匹配
$channel->basic_publish($msg, $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
//关闭通道和连接
$channel->close();
$connection->close();
}
}
消费者
没有啥特别的修改
<?php
/**
* 死信队列,延时队列插件使用: 模拟订单支付
*
*/
namespace console\controllers\exchange\delay;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ConsumerController extends Controller
{
public $enableCsrfValidation = false;
public function actionIndex()
{
//rabbitmq相关配置
$rabbitMqConfig = Yii::$app->params["rabbitMq"];
$delayConfig = Yii::$app->params["rabbitMq"]["delay"];
$config = $rabbitMqConfig["base"];// 创建连接
$connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
// 创建channel
$channel = $connection->channel();
// 声明并初始化交换器, 交换机类型: 延时插件名称(x-delayed-message)
$channel->exchange_declare($delayConfig["exchange_name"]["pay"], "x-delayed-message", false, true, false);
// 交换机和队列绑定
$channel->queue_bind($delayConfig["queue_name"]["pay"], $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
// 消息回调处理
$callback = function ($meg) {
echo "revince: " . $meg->body. "\n";
$meg->ack();
};
// 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
$channel->basic_qos(null, 1, null);
// 消费者消费消息: 第四个参数: 需要ack确认
$channel->basic_consume($delayConfig["queue_name"]["pay"], '', false, false, false, false, $callback);
// 监控
while ($channel->is_open()) {
$channel->wait();
}
//关闭通道和连接
$channel->close();
$connection->close();
}
}
版权归原作者 zhoupenghui168 所有, 如有侵权,请联系我们删除。