📒博客首页:崇尚学技术的科班人
🏇小肖来了
🍣今天给大家带来的文章是《RabbitMQ发布确认和交换机基础总结与实战》🍣
🍣这是RabbitMQ的发布确认和交换机基础总结与实战🍣
🍣希望各位小伙伴们能够耐心的读完这篇文章🍣
🙏博主也在学习阶段,如若发现问题,请告知,非常感谢🙏
💗同时也非常感谢各位小伙伴们的支持💗
🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣源码地址🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣🍣
文章目录
1、发布确认
1.1、发布确认的引出
一个消息的持久化需要经历的步骤:
- 设置要求队列持久化。
- 设置要求队列中的消息必须持久化。
- 发布确认
- 如果缺少了发布确认的话,那么消息在磁盘上持久化之前会发生丢失,从而不能满足消息持久化的目的。
1.2、发布确认的策略
1.2.1、开启发布确认的方法
Channel channel =RabbitmqUtil.getChannel();//开启发布确认
channel.confirmSelect();
- 发布确认默认是没有开启的,如果需要开启需要调用
confirmSelect
,每当需要使用发布确认的时候,都需要调用该方法。
1.2.2、单个确认发布
- 单个确认发布是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布。
- 该确认方式主要通过
waitForConfirms
方法实现,这个方法只有在消息被确认的时候才会返回,如果在指定时间范围内这个消息没有被确认那么它将会抛出异常。 - 这种确认方式的最大的缺点就是:发布速度特别慢。
publicstaticvoidConfirmMessageIndividually()throwsException{Channel channel =RabbitmqUtil.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();long begin =System.currentTimeMillis();for(int i =0; i < MESSAGE_COUNT; i++){String message = i +"";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());// 进行单个发布确认boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发送成功");}}long end =System.currentTimeMillis();System.out.println("单个确认发送"+ MESSAGE_COUNT +"条消息所消耗的时间是"+(end - begin)+"ms");}
1.2.3、批量确认发布
- 先发布一批消息然后一起确认。
- 缺点:当发生故障导致发布出现问题时,不知道那个消息出现了问题,我们必须将整个批处理保存在内存中,以记录重要的消息而后重新发布消息。
publicstaticvoidConfirmMessageBatch()throwsIOException,TimeoutException,InterruptedException{Channel channel =RabbitmqUtil.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();long begin =System.currentTimeMillis();// 批量处理消息的个数int batchSize =100;for(int i =1; i <= MESSAGE_COUNT; i++){String message = i +"";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());// 进行批量发布确认if(i % batchSize ==0){
channel.waitForConfirms();System.out.println("批量处理消息成功");}}long end =System.currentTimeMillis();System.out.println("批量确认发送"+ MESSAGE_COUNT +"条消息所消耗的时间是"+(end - begin)+"ms");}
1.2.4、异步确认发布
原理
- 异步确认发布是利用
回调函数
来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。
代码
publicstaticvoidConfirmMessageAsync()throwsException{Channel channel =RabbitmqUtil.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();long begin =System.currentTimeMillis();ConfirmCallback ackCallback =(var1,var2)->{System.out.println("已确认的消息"+ var1);};ConfirmCallback nackCallback =(var1,var2)->{System.out.println("未确认的消息"+ var1);};/**
* 1. 确认收到消息的回调函数
* 2. 未确认收到消息的回调函数
*/
channel.addConfirmListener(ackCallback,nackCallback);for(int i =1; i <= MESSAGE_COUNT; i++){String message = i +"";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());}long end =System.currentTimeMillis();System.out.println("异步确认发送"+ MESSAGE_COUNT +"条消息所消耗的时间是"+(end - begin)+"ms");}
1.2.5、如何处理异步未确认消息
- 最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说
ConcurrentSkipListMap
publicstaticvoidConfirmMessageAsync()throwsException{Channel channel =RabbitmqUtil.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();ConcurrentSkipListMap<Long,String> map =newConcurrentSkipListMap<>();/**
* 1. 消息标识
* 2. 是否批量处理
*/ConfirmCallback ackCallback =(var1,var2)->{if(var2){ConcurrentNavigableMap<Long,String> longStringConcurrentNavigableMap = map.headMap(var1);
longStringConcurrentNavigableMap.clear();}else{
map.remove(var1);}String message = map.get(var1);System.out.println("已确认的消息是:"+ message +" 已确认的消息tag:"+ var1);};ConfirmCallback nackCallback =(var1,var2)->{// 未确认的消息String s = map.get(var1);System.out.println(s);System.out.println("未确认的消息"+ var1);};/**
* 1. 确认收到消息的回调函数
* 2. 未确认收到消息的回调函数
*/
channel.addConfirmListener(ackCallback,nackCallback);long begin =System.currentTimeMillis();for(int i =1; i <= MESSAGE_COUNT; i++){String message = i +"";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());// 1. 将消息保存到一个线程安全地队列中
map.put(channel.getNextPublishSeqNo(),message);}long end =System.currentTimeMillis();System.out.println("异步确认发送"+ MESSAGE_COUNT +"条消息所消耗的时间是"+(end - begin)+"ms");}
1.2.6、以上3种发布确认的速度对比
- 单个发布确认:同步等待确认,简单,但吞吐量非常有限。
- 批量确认发布:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪一条消息出现了问题。
- 异步确认发布:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些。
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.ConfirmCallback;importcom.xiao.utils.RabbitmqUtil;importjava.io.IOException;importjava.util.UUID;importjava.util.concurrent.ConcurrentNavigableMap;importjava.util.concurrent.ConcurrentSkipListMap;importjava.util.concurrent.TimeoutException;publicclassConfirmMessage{publicstaticfinalint MESSAGE_COUNT =1000;publicstaticvoidmain(String[] args)throwsException{// 单个发布确认ConfirmMessageIndividually();// 单个确认发送1000条消息所消耗的时间是680ms// 批量发布确认ConfirmMessageBatch();//批量确认发送1000条消息所消耗的时间是112ms//异步发布确认ConfirmMessageAsync();// 异步确认发送1000条消息所消耗的时间是41ms// 异步确认发送1000条消息所消耗的时间是33ms}publicstaticvoidConfirmMessageIndividually()throwsException{Channel channel =RabbitmqUtil.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();long begin =System.currentTimeMillis();for(int i =0; i < MESSAGE_COUNT; i++){String message = i +"";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());// 进行单个发布确认boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发送成功");}}long end =System.currentTimeMillis();System.out.println("单个确认发送"+ MESSAGE_COUNT +"条消息所消耗的时间是"+(end - begin)+"ms");}publicstaticvoidConfirmMessageBatch()throwsIOException,TimeoutException,InterruptedException{Channel channel =RabbitmqUtil.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();long begin =System.currentTimeMillis();// 批量处理消息的个数int batchSize =100;for(int i =1; i <= MESSAGE_COUNT; i++){String message = i +"";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());// 进行批量发布确认if(i % batchSize ==0){
channel.waitForConfirms();System.out.println("批量处理消息成功");}}long end =System.currentTimeMillis();System.out.println("批量确认发送"+ MESSAGE_COUNT +"条消息所消耗的时间是"+(end - begin)+"ms");}publicstaticvoidConfirmMessageAsync()throwsException{Channel channel =RabbitmqUtil.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();ConcurrentSkipListMap<Long,String> map =newConcurrentSkipListMap<>();/**
* 1. 消息标识
* 2. 是否批量处理
*/ConfirmCallback ackCallback =(var1,var2)->{if(var2){ConcurrentNavigableMap<Long,String> longStringConcurrentNavigableMap = map.headMap(var1);
longStringConcurrentNavigableMap.clear();}else{
map.remove(var1);}String message = map.get(var1);System.out.println("已确认的消息是:"+ message +" 已确认的消息tag:"+ var1);};ConfirmCallback nackCallback =(var1,var2)->{// 未确认的消息String s = map.get(var1);System.out.println(s);System.out.println("未确认的消息"+ var1);};/**
* 1. 确认收到消息的回调函数
* 2. 未确认收到消息的回调函数
*/
channel.addConfirmListener(ackCallback,nackCallback);long begin =System.currentTimeMillis();for(int i =1; i <= MESSAGE_COUNT; i++){String message = i +"";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());// 1. 将消息保存到一个线程安全地队列中
map.put(channel.getNextPublishSeqNo(),message);}long end =System.currentTimeMillis();System.out.println("异步确认发送"+ MESSAGE_COUNT +"条消息所消耗的时间是"+(end - begin)+"ms");}}
2、交换机
- 在这一部分中,我们将做一些完全不同的事情 – 我们将消息传达给多个消费者。这种模式成为“发布/订阅模式”。这里需要使用到交换机。
2.1、Exchanges
2.1.1、概念
- RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。
- 相反,生产者只能将消息发送到交换机。
2.1.2、类型
- 直接(direct)
- 主题(topic)
- 标题(headers)
- 扇出(fanout)
2.1.3、无名exchange
channel.basicPublish("",TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes("UTF-8"));
- 我们在发送消息时,
""
表示的就是默认的无名队列。 - 消息能路由发送到队列中其实是由
routingKey
绑定key指定的。
2.2、临时队列
临时队列
:一个具有随即名称的队列。一旦我们断开了连接,队列将被自动删除。
String queueName = channel.queueDeclare().getQueue();
2.3、绑定(binding)
- 绑定其实是交换机和队列之间的桥梁。它能够标识哪个交换机和哪个队列进行了绑定关系。
2.4、Fanout(发布订阅模式)
2.4.1、介绍
- 它是将接收到的所有消息广播到它知道的所有队列中。
- 系统有默认的的
Fanout
交换机类型
2.4.2、实战
- 交换机转发一条消息,其所绑定的所有队列都可以接收到消息。
1. 两个消费者
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xiao.utils.RabbitmqUtil;publicclassReceiveLogs01{publicstaticfinalString EXCHANGE_NAME ="logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 声明一个临时队列String queueName = channel.queueDeclare().getQueue();/**
* 1. 队列名称
* 2. 交换机名称
* 3. RoutingKey
*/
channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("等待接收消息");DeliverCallback deliverCallback =(var1,var2)->{System.out.println("ReceiveLogs01控制台接收到的消息是:"+newString(var2.getBody(),"UTF-8"));};
channel.basicConsume(queueName,true,deliverCallback,var1->{});}}
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xiao.utils.RabbitmqUtil;publicclassReceiveLogs02{publicstaticfinalString EXCHANGE_NAME ="logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 声明一个临时队列String queueName = channel.queueDeclare().getQueue();/**
* 1. 队列名称
* 2. 交换机名称
* 3. RoutingKey
*/
channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("等待接收消息");DeliverCallback deliverCallback =(var1,var2)->{System.out.println("ReceiveLogs02控制台接收到的消息是:"+newString(var2.getBody(),"UTF-8"));};
channel.basicConsume(queueName,true,deliverCallback,var1->{});}}
2. 一个生产者
importcom.rabbitmq.client.Channel;importcom.xiao.utils.RabbitmqUtil;importjava.util.Scanner;publicclassEmitLog{publicstaticfinalString EXCHANGE_NAME ="logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();Scanner sc =newScanner(System.in);while(sc.hasNext()){String message = sc.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));System.out.println("成功发送消息:"+ message);}}}
3. 测试结果
生产者
消费者
- 所以在
fanout
模式下,所有队列都可以收到消息。
2.5、Direct(路由模式)
2.5.1、介绍
- 直接交换机和
fanout
交换机的差别在于RoutingKey
的绑定上,它绑定的的多个队列的key
一般是不同的,如果是相同的,那么它表现得就和fanout
有点类似。
2.5.2、实战
队列和交换机的绑定关系
1. 生产者
importcom.rabbitmq.client.Channel;importcom.xiao.utils.RabbitmqUtil;importjava.util.Scanner;publicclassDirectLogs{publicstaticfinalString EXCHANGE_NAME ="direct_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();Scanner sc =newScanner(System.in);while(sc.hasNext()){String message = sc.next();
channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes("UTF-8"));System.out.println("成功发送消息:"+ message);}}}
2. 两个消费者
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xiao.utils.RabbitmqUtil;publicclassReceiveLogsDirect01{publicstaticfinalString EXCHANGE_NAME ="direct_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");// 声明队列
channel.queueDeclare("console",false,false,false,null);// 进行绑定
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");System.out.println("等待接收消息");DeliverCallback deliverCallback =(var1, var2)->{System.out.println("ReceiveLogsDirect01控制台接收到的消息是:"+newString(var2.getBody(),"UTF-8"));};
channel.basicConsume("console",true,deliverCallback,var1->{});}}
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xiao.utils.RabbitmqUtil;publicclassReceiveLogsDirect02{publicstaticfinalString EXCHANGE_NAME ="direct_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");// 声明队列
channel.queueDeclare("disk",false,false,false,null);// 进行绑定
channel.queueBind("disk",EXCHANGE_NAME,"error");System.out.println("等待接收消息");DeliverCallback deliverCallback =(var1, var2)->{System.out.println("ReceiveLogsDirect02控制台接收到的消息是:"+newString(var2.getBody(),"UTF-8"));};
channel.basicConsume("disk",true,deliverCallback,var1->{});}}
3. 测试结果
- 当生产者进行发送消息的时候,它会优先进行
RoutingKey
的比较,然后才会发送给相应的队列。
2.6、Topic
2.6.1、介绍
之前我们
Fanout
可以将所有消息发送到所有队列,
direct
可以将消息发送到某个队列。但我们假设我们当前有
3
个队列,我们想只发送消息到其中的两个队列,那么这就需要
Topic
。
2.6.2、Topic的要求
topic
的RoutingKey
不能随意写,它必须是一个单词列表,以点号隔开。*(星号)
可以代替一个单词。#(井号)
可以代替零个或多个单词
2.6.3、Topic的匹配案例
quick.orange.rabbit
:被队列Q1、Q2接收到lazy.orange.elephant
:被队列Q1、Q2接收到quick.orange.fox
:被队列Q1接收到lazy.brown.fox
:被队列Q2接收到lazy.pink.rabbit
:虽然满足两个绑定但只被队列Q2接收一次。quick.brown.fox
:不匹配任何绑定不会被任何队列接收到会被丢弃。quick.orange.male.rabbit
:是四个单词不匹配任何绑定会被丢弃lazy.orange.male.rabbit
:是四个单词当匹配Q2
- 当一个队列绑定键是
#
,那么这个队列将接收所有数据,就有点像fanout
了。 - 如果队列绑定键当中没有
#
和*
出现,那么该队列绑定类型就是direct
了。
2.6.4、实战
1. 两个消费者
importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xiao.utils.RabbitmqUtil;publicclassReceiveLogsTopic01{publicstaticfinalString EXCHANGE_NAME ="topic_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);String queueName ="Q1";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");System.out.println("等待接受消息.....");DeliverCallback deliverCallback =(var1, var2)->{System.out.println("ReceiveLogsTopic01控制台接收到的消息是:"+newString(var2.getBody(),"UTF-8"));System.out.println("接收队列:"+ queueName +" 接受的键:"+ var2.getEnvelope().getRoutingKey());};
channel.basicConsume(queueName,true,deliverCallback,var1->{});}}
importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xiao.utils.RabbitmqUtil;publicclassReceiveLogsTopic02{publicstaticfinalString EXCHANGE_NAME ="topic_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);String queueName ="Q2";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");System.out.println("等待接受消息.....");DeliverCallback deliverCallback =(var1, var2)->{System.out.println("ReceiveLogsTopic02控制台接收到的消息是:"+newString(var2.getBody(),"UTF-8"));System.out.println("接收队列:"+ queueName +" 接受的键:"+ var2.getEnvelope().getRoutingKey());};
channel.basicConsume(queueName,true,deliverCallback,var1->{});}}
2. 生产者
importcom.rabbitmq.client.Channel;importcom.xiao.utils.RabbitmqUtil;importjava.util.HashMap;importjava.util.Map;publicclassEmitLogTopic{publicstaticfinalString EXCHANGE_NAME ="topic_logs";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();Map<String,String> map =newHashMap<>();
map.put("quick.orange.rabbit","被队列Q1、Q2接收到");
map.put("lazy.orange.elephant","被队列Q1、Q2接收到");
map.put("quick.orange.fox","被队列Q1接收到");
map.put("lazy.brown.fox","被队列Q2接收到");
map.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次。");
map.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃。");
map.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
map.put("lazy.orange.male.rabbit","是四个单词当匹配Q2");for(String key : map.keySet()){String message = map.get(key);
channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes("UTF-8"));System.out.println("发送的消息是:"+ message);}}}
3. 测试结果
1. 生产者
2. 消费者
版权归原作者 崇尚学技术的科班人 所有, 如有侵权,请联系我们删除。