文章目录
一.什么是MQ
1.1 mq的作用
1.流量消峰: 将同时刻大量请求访问,使用消息队列做缓冲,把一秒内下的订单分散成一段时间来处理.
2.应用解耦:
**3.异步处理: **
1.2 mq的区别
1.Kafka: 大数据的利器
优点: 性能卓越,单机写入 TPS约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms级可用性非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用 Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web管理界面 Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:
功能较为简单,主要支持简单的 MQ功能,在大数据领域的实时计算以及日志采集被大规模使用
缺点: Kafka单机超过 64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消
息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,
但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
2.RocketMQ: 阿里巴巴的开源产品
优点: 单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0丢失,MQ功能较为完善,还是分布式的,扩展性好,支持
10亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java我们可以自己阅读源码,定制自己公司的 MQ
缺点: 支持的客户端语言不多,社区活跃度一般,没有在 MQ 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码
3.RabbitMQ 当前最主流的消息中间件之一
优点: 由于 erlang语言的高并发特性,性能较好;吞吐量到万级,MQ功能比较完备,健壮、稳定、易 用、跨平台、支持多种语言如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持AJAX文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高 https://www.rabbitmq.com/news.html
缺点: 商业版需要收费,学习成本较高
二. RabbitMQ
2.1 四大核心概念
生产者 : 产生数据发送消息的程序是生产者
交换机: 一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定.
队列: 队列是 RabbitMQ内部使用的一种数据结构,队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。
消费者: 消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
2.2 RabbitMQ核心部分
1.简单模式(Hello World!)
2.工作模式(Work queues)
3.发布订阅模式(publish/Subscrible)
4.路由模式(Routing)
5.主题模式(Topics)
6.发布确认模式(Publisher Confirms)
2.3 RabbitMQ工作原理
三.docker安装
# -d后台运行 -p端口映射 -e设置账号密码 --name 给容器起别名
docker run -d -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3.8-management
注意防火墙放行
ip:15672 访问ui 界面
ip:5672 访问客户端
收工完事
进入到容器中
docker exec -it mq容器名 /bin/bash # 进入rabbit容器
rabbitmqctl list_users # 查看rabbitmq用户列表
rabbitmqctl add_user admin 123# 添加账号
rabbitmqctl set_user_tags admin administrator # 设置用户身份# set_permissions [-p <vhostpath>]<user> <conf> <write> <read> 配置 读 写 权限
rabbitmqctl set_permissions -p "/" admin ".*"".*"".*"
四.代码实现
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build><dependencies><!-- rabbitMQ依赖客户端--><!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><!-- 操作文件流的依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency></dependencies>
五. 工作队列模式
引自尚硅谷定义:
当有多个工作线程时,他们将轮询(轮训)接收队列消息
抽取连接rabbitMq工具类
publicclassRabbitMqUtils{publicstaticChannelgetChannel()throwsIOException,TimeoutException{// 创建连接工厂ConnectionFactory factory =newConnectionFactory();// 工厂IP 连接rabbitMQ的队列
factory.setHost("127.0.0.1");
factory.setPort(5672);// 用户名
factory.setUsername("root");// 密码
factory.setPassword("123456");// 创建连接Connection connection = factory.newConnection();// 获取连接中的信道Channel channel = connection.createChannel();return channel;}}
工作线程01(消费者01)
packagework_queues;importcom.rabbitmq.client.CancelCallback;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importutils.RabbitMqUtils;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
* @author WangJiaHui
* @description: 工作线程
* @ClassName Work01
* @date 2022/4/6 16:25
*/publicclassWork01{// 队列名称publicstaticfinalString QUEUE_NAME ="hello";// 接收消息publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Channel channel =RabbitMqUtils.getChannel();// 声明 lambda表达式,取代匿名内部类 DeliverCallback接口需要被实现DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("接收到的消息"+newString(message.getBody(),"UTF-8"));// 手动应答/**
* 1. 消息的标记 tag
* 2. 是否批量应答
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};// 取消消息时的回调CancelCallback cancelCallback = consumerTag ->{System.out.println("消息被中断");};/**
* 1.消费那个队列
* 2.消费成功后是否自动应答,true自动应答
* 3.消费未成功消费的回调
* 4.消费者取消消费回调
*/System.out.println("work01等待接收消息......");// 采用手动应答boolean autoAck =false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);}}
工作线程02(消费者02)
packagework_queues;importcom.rabbitmq.client.CancelCallback;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importutils.RabbitMqUtils;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
* @author WangJiaHui
* @description:
* @ClassName Work02
* @date 2022/4/6 16:35
*/publicclassWork02{// 队列名称publicstaticfinalString QUEUE_NAME ="hello";// 接收消息publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Channel channel =RabbitMqUtils.getChannel();// 声明 lambda表达式,取代匿名内部类 DeliverCallback接口需要被实现DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("接收到的消息"+newString(message.getBody(),"UTF-8"));// 手动应答/**
* 1. 消息的标记 tag
* 2. 是否批量应答
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};// 取消消息时的回调CancelCallback cancelCallback = consumerTag ->{System.out.println("消息被中断");};/**
* 1.消费那个队列
* 2.消费成功后是否自动应答,true自动应答
* 3.消费未成功消费的回调
* 4.消费者取消消费回调
*/System.out.println("work02等待接收消息......");// 采用手动应答boolean autoAck =false;
channel.basicConsume(QUEUE_NAME,true, deliverCallback, cancelCallback);}}
生产者01
packagework_queues;importcom.rabbitmq.client.Channel;importutils.RabbitMqUtils;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importjava.util.Scanner;importjava.util.concurrent.TimeoutException;/**
* @author WangJiaHui
* @description:
* @ClassName Prod01
* @date 2022/4/6 16:39
*/publicclassProd01{// 队列名称publicstaticfinalString QUEUE_NAME ="hello";// 发送大量的消息publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Channel channel =RabbitMqUtils.getChannel();// 队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 发消息/**
* 1. 发送到哪个交换机
* 2. 路由key值 本次是队列名称
* 3. 其他参数信息
* 4. 发送消息的消息体
*/Scanner scanner =newScanner(System.in);while(scanner.hasNext()){String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));System.out.println("消息发送完毕: "+ message);}}}
六. 消息应答-消费端
引自尚硅谷:
6.1 自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡.
因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了.
当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。|
6.2 手动应答
手动确认是在获取到消息后,在没有返回回执前,消息会一直存储在队列.当消息发送到工作线程时,该工作线程突然挂掉(如果我们没有对消息接收后进行任何反馈的话该条消息在队列的状态会变成Unacked 直到我们消费端AMQP连接中断后该消息状态又会变成Ready).则不会向队列发出ack确认信息.而使得队列认为该工作线程挂掉.转发给另一个正常工作的线程;手动应答会在工作线程完成任务时向队列发送信息.
6.2.2 消息应答重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ.将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
6.2.3 消息手动应答代码
应答是工作线程回馈消息给队列,确认工作完成的.
七.持久化
7.1 队列持久化
7.2 消息持久化
八.不公平分发
rabbitMQ默认采用轮询分发,但这种策略在面对不同工作线程处理时长,还是采用轮训分发的话就会使这处理速度快的消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活.为避免这种"大锅饭",可以使用不公平分发–按劳分配.
// 设置消费者方不公平分发int prefetchCount=1;
channel.basicQos(prefetchCount);
8.1 预取值
本身消息的发送就是异步发送的,所以在任何时候,channel上肯定不止只有一个消息另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6这个消息刚刚被确认 ACK,RabbitMQ将会感知这个情况到并再发送一条消息。消息应答和 QoS预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理 的消息的数量也会增加,从而增加了消费者的 RAM消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100到 300范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。
// 设置消费者方预取值5条int prefetchCount=5;
channel.basicQos(prefetchCount);
九. 交换机
RabbitMQ消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列. 即便不写交换机(" “),也会走默认交换机,
交换机类型: 直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout),无名类型(” ").
9.1 fanout扇出模式
之前的工作模式,一条信息只能被消费一次,然而扇出模式允许多个消费者接收,即广播(binding相同).
生产者异步确认发布代码:
/**
* @author YuanJie
* @description:
* @ClassName Emitlog
* @date 2022/4/8 21:34
*/publicclassEmitlog{// 交换机名称publicstaticfinalString EXCHANGE_NAME="logs";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Channel channel =RabbitMqUtils.getChannel();// 开启交换机扇出模式,即广播模式
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");Scanner scanner =newScanner(System.in);// 开启发布确认
channel.confirmSelect();/**
* 线程有序的哈希跳跃表 适用于高并发的情况下
* 1.轻松的将消息序号与消息内容相关联
* 2.轻松批量删除某个消息
* 3.支持高并发(多线程安全)
*/ConcurrentSkipListMap<Long,String> outStandingConfirms =newConcurrentSkipListMap<>();// 消息确认成功回调函数ConfirmCallback ackCallback =(deliveryTag,multiple)->{if(multiple){// ^^^ 删除掉已经确认的消息ConcurrentNavigableMap<Long,String> confirmed =
outStandingConfirms.headMap(deliveryTag);
confirmed.clear();}else{
outStandingConfirms.remove(deliveryTag);}System.out.println("确认消息: "+ deliveryTag);};/** 消息确认失败回调函数
* 1. 消息在队列的标识
* 2. 是否批量确认
*/ConfirmCallback nackCallback =(deliveryTag,multiple)->{String s = outStandingConfirms.get(deliveryTag);// ^^^ 打印未确认的消息System.out.println("未确认消息: "+ deliveryTag);};/** 准备消息的监听器 监听哪些消息成功了 哪些失败了
* 1. 监听成功
* 2. 监听失败
*/
channel.addConfirmListener(ackCallback,nackCallback);// 发消息/**
* 1. 发送到哪个交换机
* 2. 路由key值 本次是队列名称
* 3. 其他参数信息
* 4. 发送消息的消息体
*/while(scanner.hasNext()){String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));System.out.println("生产者发出消息: "+ message);// ^^^记录所有发送的消息
outStandingConfirms.put(channel.getNextPublishSeqNo(),message);}}}
消费者01
/**
* @author YuanJie
* @description:
* @ClassName ReceiveLogs01
* @date 2022/4/8 21:19
*/publicclassReceiveLogs01{// 交换机名称publicstaticfinalString EXCHANGE_NAME="logs";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Channel channel =RabbitMqUtils.getChannel();// 声明一个交换机类型
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 声明一个队列 临时队列/**
* 生成一个临时队列,队列的名称是随机的
* 当消费者断开与队列的连接时候 队列就自动删除
*/String queue = channel.queueDeclare().getQueue();/**
* 绑定交换机与队列的映射关系
*/
channel.queueBind(queue,EXCHANGE_NAME,"");System.out.println("等待接收消息....");// 接收消息DeliverCallback deliverCallback =(consumerTag,message)->{System.out.println("ReceiveLogs01控制台打印接收到的消息: "+newString(message.getBody(),StandardCharsets.UTF_8));};// 消费者取消消息时回调接口
channel.basicConsume(queue,true,deliverCallback,consumerTag ->{});}}
消费者02
/**
* @author YuanJie
* @description:
* @ClassName ReceiveLogs01
* @date 2022/4/8 21:19
*/publicclassReceiveLogs02{// 交换机名称publicstaticfinalString EXCHANGE_NAME="logs";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Channel channel =RabbitMqUtils.getChannel();// 声明一个交换机类型
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 声明一个队列 临时队列/**
* 生成一个临时队列,队列的名称是随机的
* 当消费者断开与队列的连接时候 队列就自动删除
*/String queue = channel.queueDeclare().getQueue();/**
* 绑定交换机与队列的映射关系
*/
channel.queueBind(queue,EXCHANGE_NAME,"");System.out.println("等待接收消息....");// 接收消息DeliverCallback deliverCallback =(consumerTag,message)->{System.out.println("ReceiveLogs02控制台打印接收到的消息: "+newString(message.getBody(),StandardCharsets.UTF_8));};// 消费者取消消息时回调接口
channel.basicConsume(queue,true,deliverCallback,consumerTag ->{});}}
9.2 Direct直接模式
即重新指定routingkey即可.由于一次只能指定一个routingkey,因此一次只能路由一个队列.
十.Topics主题交换机
作用:获取一种类型的数据文件.比如说同类型的日志文件.
发送到类型是 topic交换机的消息的 routing_key不能随意写,必须满足一定的要求,它必须是一个单
词列表,以点号分隔开。*(星号)可以代替一个单词 , #(井号)可以替代零个或多个单词
十一.死信队列
无法被消费的消息
producer将消息投递到 broker或者直接到 queue里了,consumer从 queue取出消息进行消费,但某些时候由于特定的原因导致 queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
11.1 死信的来源
消息 TTL过期
队列达到最大长度(队列满了,无法再添加数据到 mq中)
消息被拒绝(basic.reject或 basic.nack)并且 requeue=false.
11.2 .1 TTL消息过期引起的死信
Producer
importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.ConfirmCallback;importutils.RabbitMqUtils;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importjava.util.concurrent.ConcurrentNavigableMap;importjava.util.concurrent.ConcurrentSkipListMap;importjava.util.concurrent.TimeoutException;/**
* @author yuanjie
* @description: 死信队列之生产者
* @ClassName prod
* @date 2022/4/10 16:54
*/publicclassProducer{// 正常交换机publicstaticfinalString NORMAL_EXCHANGE ="normal_exchange";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Channel channel =RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);// 开启发布确认 异步确认
channel.confirmSelect();/**
* 线程有序的哈希跳跃表 适用于高并发的情况下
* 1.轻松的将消息序号与消息内容相关联
* 2.轻松批量删除某个消息
* 3.支持高并发(多线程安全)
*/ConcurrentSkipListMap<Long,String> outStandingConfirms =newConcurrentSkipListMap<>();// 消息确认成功回调函数ConfirmCallback ackCallback =(deliveryTag, multiple)->{if(multiple){// ^^^ 删除掉已经确认的消息ConcurrentNavigableMap<Long,String> confirmed =
outStandingConfirms.headMap(deliveryTag);
confirmed.clear();}else{
outStandingConfirms.remove(deliveryTag);}System.out.println("确认消息: "+ deliveryTag);};/** 消息确认失败回调函数
* 1. 消息在队列的标识
* 2. 是否批量确认
*/ConfirmCallback nackCallback =(deliveryTag,multiple)->{String s = outStandingConfirms.get(deliveryTag);// ^^^ 打印未确认的消息System.out.println("未确认消息: "+ s);};/** 准备消息的监听器 监听哪些消息成功了 哪些失败了
* 1. 监听成功
* 2. 监听失败
*/
channel.addConfirmListener(ackCallback,nackCallback);// 开启死信消息 设置消息过期TTL时间 10s=10000msAMQP.BasicProperties properties =newAMQP.BasicProperties().builder().expiration("10000").build();for(int i=1;i<11;i++){String message ="info"+ i;
channel.basicPublish(NORMAL_EXCHANGE,"key1",properties,message.getBytes(StandardCharsets.UTF_8));// ^^^记录所有发送的消息
outStandingConfirms.put(channel.getNextPublishSeqNo(),message);}}}
Consumer01
importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.CancelCallback;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importutils.RabbitMqUtils;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importjava.util.HashMap;importjava.util.Map;importjava.util.concurrent.TimeoutException;/**
* @author yuanjie
* @description: 死信队列实战
* @ClassName Consumer01
* @date 2022/4/10 16:55
*/publicclassConsumer01{// 正常交换机publicstaticfinalString NORMAL_EXCHANGE ="normal_exchange";// 普通队列名称publicstaticfinalString NORMAL_QUEUE ="normal_queue";// 死信交换机publicstaticfinalString DEAD_EXCHANGE ="dead_exchange";// 死信队列名称publicstaticfinalString DEAD_QUEUE ="dead_queue";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 封装的rabbitMQ客户端Channel channel =RabbitMqUtils.getChannel();// 声明死信和普通交换机,direct类型
channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 声明普通队列Map<String,Object> arguments =newHashMap<>();// 正常队列设置死信交换机 死信 x-dead-letter-exchange 固定写法
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","key2");
channel.queueDeclare(NORMAL_QUEUE,false,false,false, arguments);//// 声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通交换机与队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE,"key1");// 绑定死信的交换机与死信队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE,"key2");System.out.println("等待接收消息.....");// 重写确认信息回调函数接口DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("Consumer01接受的消息: "+newString(message.getBody(),StandardCharsets.UTF_8));};// 重写取消消息时的回调CancelCallback cancelCallback = consumerTag ->{System.out.println("消息被中断");};
channel.basicConsume(NORMAL_QUEUE,true, deliverCallback, cancelCallback);}}
Consumer02
importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.CancelCallback;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importutils.RabbitMqUtils;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importjava.util.concurrent.TimeoutException;/**
* @author WangJiaHui
* @description:
* @ClassName Consumer02
* @date 2022/4/10 17:45
*/publicclassConsumer02{// 死信交换机publicstaticfinalString DEAD_EXCHANGE ="dead_exchange";// 死信队列名称publicstaticfinalString DEAD_QUEUE ="dead_queue";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 封装的rabbitMQ客户端Channel channel =RabbitMqUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通交换机与队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE,"key2");System.out.println("等待接收消息.....");// 重写确认信息回调函数接口DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("Consumer02接受的消息: "+newString(message.getBody(),StandardCharsets.UTF_8));};// 重写取消消息时的回调CancelCallback cancelCallback = consumerTag ->{System.out.println("消息被中断");};
channel.basicConsume(DEAD_QUEUE,true, deliverCallback, cancelCallback);}}
11.2.2 队列到达最大长度引起的死信
超出队列部分成为死信
在正常消费者中,传递给死信的参数增添
arguments.put("x-max-length",6);
11.2.3 消息被拒绝
在正常消费者中,
publicclassConsumer01{//普通交换机名称 privatestaticfinalString NORMAL_EXCHANGE ="normal_exchange";//死信交换机名称 privatestaticfinalString DEAD_EXCHANGE ="dead_exchange";publicstaticvoidmain(String[] argv)throwsException{Channel channel =RabbitUtils.getChannel();//声明死信和普通交换机 类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);//声明死信队列 String deadQueue ="dead-queue";
channel.queueDeclare(deadQueue,false,false,false,null);//死信队列绑定死信交换机与routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE,"key2");//正常队列绑定死信队列信息 Map<String,Object> params =newHashMap<>();//正常队列设置死信交换机 参数key是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常队列设置死信routing-key 参数key是固定值
params.put("x-dead-letter-routing-key","key2");String normalQueue ="normal-queue";
channel.queueDeclare(normalQueue,false,false,false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE,"key1");System.out.println("等待接收消息.....");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),"UTF-8");if(message.equals("info5")){System.out.println("Consumer01接收到消息"+ message +"并拒绝签收该消息");//requeue设置为false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);}else{System.out.println("Consumer01接收到消息"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}};boolean autoAck =false;
channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag ->{});}}
十二. 延迟队列
延时队列是用来存放需要在指定时间被处理的元素的队列。对于数据量极大的,不适合使用轮询定时任务.应采用延迟队列.
注意:延迟队列是排队的有先来先服务的特点! RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。由于队列的先进先出特性,因此只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列所以在考虑使用RabbitMQ来实现延迟任务队列的时候Bb需要确保业务上每个任务的延迟时间是一致的。需要不同的延时的话,需要为每严种不同延迟时间的消息建立单独的消息队列。
12.1 整合boot
基于插件
的延迟队列
再次强调
RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
我们可以用基于插件的延迟队列解决这个问题.
docker版插件下载教程
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--RabbitMQ依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies>
spring.rabbitmq.host=198.168.11.11
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
springboot之前.声明队列,交换机在消费者和生产者.但springboot后有专门的配置文件类来声明.
配置文件类代码
/**
* @author yuanjie
* @description:
* @ClassName DelayedQueueConfig
* @date 2022/4/13 17:16
*/@ConfigurationpublicclassDelayedQueueConfig{// exchangepublicstaticfinalString DELAYED_EXCHANGE_NAME="delayed.exchange";// queuepublicstaticfinalString DELAYED_QUEUE_NAME="delayed.queue";// routingKeypublicstaticfinalString DELAYED_ROUTING_KEY="delayed.routingKey";// 声明 queue@Bean("delayedQueue")publicQueuedelayedQueue(){returnQueueBuilder.durable(DELAYED_QUEUE_NAME).build();}// 声明exchange@Bean("delayedExchange")publicCustomExchangedelayedExchange(){Map<String,Object> arguments =newHashMap<>();
arguments.put("x-delayed-type","direct");/**
* 1. exchange name
* 2. exchange type
* 3. is_durable
* 4. is_auto_delete
*/returnnewCustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);}// binding@BeanpublicBindingdelayedQueueBingdingDelayedExchange(@Qualifier("delayedQueue")Queue delayedQueue,@Qualifier("delayedExchange")CustomExchange delayedExchange){returnBindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
生产者代码
/**
* @author yuanjie
* @description: 发送延迟消息
* @ClassName SendMsg
* @date 2022/4/12 16:57
*/@Slf4j@RestController@RequestMapping("/ttl")publicclassSendMsgController{@AutowiredprivateRabbitTemplate rabbitTemplate;// 基于插件的消息及延迟时间@GetMapping("/sendDelayMsg/{message}/{delaytime}")publicvoidsendMsg(@PathVariableString message,@PathVariableInteger delaytime){
log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",newDate().toString(),delaytime,message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, message1 ->{// 发送消息的时候 延时时长ms
message1.getMessageProperties().setDelay(delaytime);return message1;});}}
消费者
/**
* @author yuanjie
* @description: Consumer 基于插件的延迟
* @ClassName DelayQueueConsumer
* @date 2022/4/13 17:35
*/@Slf4j@ComponentpublicclassDelayQueueConsumer{// 监听消息@RabbitListener(queues =DelayedQueueConfig.DELAYED_QUEUE_NAME)publicvoidreceiveDelayQueue(Message message){String s =newString(message.getBody());
log.info("当前时间:{},收到死信队列消息: {}",newDate().toString(),s);}}
12.3 总结
引自尚硅谷
延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis的zset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景
十三.发布确认高级-生产者端
13.1 交换机确认和回退消息
交换机确认场景;rabbitMQ由于异常原因导致重启.生产者投递消息失败.如何恢复投递失败的消息重新发送的问题.
即确认交换机是否存在
回退消息: 如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的.通过设置
mandatory
参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
13.2 回退消息代码
如果两者同时开启,备份交换机优先级高
yml增添
spring.rabbitmq.publisher-confirm-type=correlated
#交换机开启确认消息
spring.rabbitmq.publisher-returns=true
开启消息不可路由,回退消息给生产者
封装交换机回调确认类(当消息发送给交换机失败,我们可以通过该类接收发送失败的消息,存储进内存重发,解决
交换机
异常问题)
/**
* @author WangJiaHui
* @description: 交换机确认回调
* @ClassName CallBackConFirm
* @date 2022/4/14 18:03
*/@Slf4j@ComponentpublicclassCallBackConFirmimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{@AutowiredprivateRabbitTemplate rabbitTemplate;/**
* ConfirmCallback接口注入RabbitTemplate
*/@PostConstructpublicvoidinit(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);}/**
* 1. 发消息 交换机接收到了 回调
* @param correlationData 保存消息的id及相关信息
* @param b ack应答确认 true or false
* @param s cause 失败的原因 or null
*/@Overridepublicvoidconfirm(CorrelationData correlationData,boolean b,String s){String id = correlationData !=null? correlationData.getId():" ";if(b){
log.info("交换机已经收到了id为:{}的消息",id);}else{
log.info("交换机还未收到id为:{}的消息,由于:{}",id,s);}}// 当消息传递过程中不可达目的地时将消息返回给生产者// 只有不可达目的地时候 才进行回退@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){// 可以在这里做保存内存重发
log.error("消息:{},被交换机:{}回退给生产者,回退的原因:{},路由key:{}",newString(returnedMessage.getMessage().getBody()),
returnedMessage.getExchange(),
returnedMessage.getReplyText(),
returnedMessage.getRoutingKey());}}
配置类
/**
* @author Yuanjie
* @description: 发布确认高级
* @ClassName ConfirmConfig
* @date 2022/4/14 15:49
*/@ConfigurationpublicclassConfirmConfig{// 交换机publicstaticfinalString CONFIRM_EXCHANGE_NAME="confirm_exchange";// 队列publicstaticfinalString CONFIRM_QUEUE_NAME="confirm_queue";//routingKeypublicstaticfinalString CONFIRM_ROUTING_KEY="key1";// 声明交换机@Bean("confirmExchange")publicDirectExchangeconfirmExchange(){returnnewDirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明队列@Bean("confirmQueue")publicQueueconfirmQueue(){returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 绑定@BeanpublicBindingqueueBindingExchange(@Qualifier("confirmQueue")Queue confirmQueue,@Qualifier("confirmExchange")DirectExchange confirmExchange){returnBindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}}
生产者(可以修改交换机名称,造成无交换机测试)
/**
* @author WangJiaHui
* @description: 测试确认
* @ClassName ProducerController
* @date 2022/4/14 16:00
*/@RestController@Slf4j@RequestMapping("/confirm")publicclassProducerController{@AutowiredprivateRabbitTemplate rabbitTemplate;// 发消息@GetMapping("sendMessage/{message}")publicvoidsendMessage(@PathVariableString message){// 有参idCorrelationData correlationData =newCorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData
);
log.info("发送消息内容为: {}",message);}}
消费者
/**
* @author WangJiaHui
* @description:
* @ClassName Consumer
* @date 2022/4/14 17:52
*/@Slf4j@ComponentpublicclassConsumer{@RabbitListener(queues =ConfirmConfig.CONFIRM_QUEUE_NAME)publicvoidreceiveConfirmMessage(Message message){String s =newString(message.getBody());
log.info("接收到的队列confirm.queue消息:{}",s);}}
交换机异常情况
消息不可路由情况
13.3 解决方式二 备份交换机结合回退消息
如果两者同时开启,备份交换机优先级高
yml增添
spring.rabbitmq.publisher-confirm-type=correlated
#交换机开启确认消息
spring.rabbitmq.publisher-returns=true
开启消息不可路由,回退消息给生产者
配置类
/**
* @author Yuanjie
* @description: 发布确认高级
* @ClassName ConfirmConfig
* @date 2022/4/14 15:49
*/@ConfigurationpublicclassConfirmConfig{// 交换机publicstaticfinalString CONFIRM_EXCHANGE_NAME="confirm_exchange";// 队列publicstaticfinalString CONFIRM_QUEUE_NAME="confirm_queue";//routingKeypublicstaticfinalString CONFIRM_ROUTING_KEY="key1";// 备份交换机publicstaticfinalString BACKUP_EXCHANGE_NAME="backup_exhacnge";// 备份队列publicstaticfinalString BACKUP_QUEUE_NAME="backup_queue";// 报警队列publicstaticfinalString WARNING_QUEUE_NAME="warning_queue";// 声明交换机@Bean("confirmExchange")publicDirectExchangeconfirmExchange(){//returnExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();}// 声明队列@Bean("confirmQueue")publicQueueconfirmQueue(){returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明备份交换机@Bean("backupExchange")publicFanoutExchangebackupExchange(){returnnewFanoutExchange(BACKUP_EXCHANGE_NAME);}// 声明备份队列@Bean("backupQueue")publicQueuebackupQueue(){returnQueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明报警队列@Bean("warningQueue")publicQueuewarningQueue(){returnQueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 绑定@BeanpublicBindingqueueBindingExchange(@Qualifier("confirmQueue")Queue confirmQueue,@Qualifier("confirmExchange")DirectExchange confirmExchange){returnBindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}// 绑定备份@BeanpublicBindingbackupQueueBindingExchange(@Qualifier("backupQueue")Queue backupQueue,@Qualifier("backupExchange")FanoutExchange backupExchange){returnBindingBuilder.bind(backupQueue).to(backupExchange);}// 绑定报警@BeanpublicBindingwarningQueueBindingExchange(@Qualifier("warningQueue")Queue warningQueue,@Qualifier("backupExchange")FanoutExchange backupExchange){returnBindingBuilder.bind(warningQueue).to(backupExchange);}}
以下回调功能会被备份交换机功能覆盖
/**
* @author WangJiaHui
* @description: 交换机确认回调
* @ClassName CallBackConFirm
* @date 2022/4/14 18:03
*/@Slf4j@ComponentpublicclassCallBackConFirmimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{@AutowiredprivateRabbitTemplate rabbitTemplate;/**
* ConfirmCallback接口注入RabbitTemplate
*/@PostConstructpublicvoidinit(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);}/**
* 1. 发消息 交换机接收到了 回调
* @param correlationData 保存消息的id及相关信息
* @param b ack应答确认 true or false
* @param s cause 失败的原因 or null
*/@Overridepublicvoidconfirm(CorrelationData correlationData,boolean b,String s){String id = correlationData !=null? correlationData.getId():" ";if(b){
log.info("交换机已经收到了id为:{}的消息",id);}else{
log.info("交换机还未收到id为:{}的消息,由于:{}",id,s);}}// 当消息传递过程中不可达目的地时将消息返回给生产者// 只有不可达目的地时候 才进行回退@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){
log.error("消息:{},被交换机:{}回退给生产者,回退的原因:{},路由key:{}",newString(returnedMessage.getMessage().getBody()),
returnedMessage.getExchange(),
returnedMessage.getReplyText(),
returnedMessage.getRoutingKey());}}
生产者
/**
* @author WangJiaHui
* @description: 测试确认
* @ClassName ProducerController
* @date 2022/4/14 16:00
*/@RestController@Slf4j@RequestMapping("/confirm")publicclassProducerController{@AutowiredprivateRabbitTemplate rabbitTemplate;// 发消息@GetMapping("sendMessage/{message}")publicvoidsendMessage(@PathVariableString message){// 有参idCorrelationData correlationData =newCorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY+"2", message,correlationData
);
log.info("发送消息内容为: {}",message);}}
报警消费者
/**
* @author WangJiaHui
* @description:
* @ClassName WarningConsumer
* @date 2022/4/14 21:44
*/@Slf4j@ComponentpublicclassWarningConsumer{// 接收报警消息@RabbitListener(queues =ConfirmConfig.WARNING_QUEUE_NAME)publicvoidreceiveWarningMsg(Message message){String msg =newString(message.getBody());
log.error("报警发现不可路由消息: {}",msg);}}
正常消费者
/**
* @author WangJiaHui
* @description:
* @ClassName Consumer
* @date 2022/4/14 17:52
*/@Slf4j@ComponentpublicclassConsumer{@RabbitListener(queues =ConfirmConfig.CONFIRM_QUEUE_NAME)publicvoidreceiveConfirmMessage(Message message){String s =newString(message.getBody());
log.info("接收到的队列confirm.queue消息:{}",s);}}
版权归原作者 呆萌小新@渊洁 所有, 如有侵权,请联系我们删除。