0


RabbitMQ:发布确认模式

✨ RabbitMQ:发布确认模式

📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件

1.基本介绍

在这里插入图片描述

生产者把信道设置成为confirm(确认)模式,一旦信道进入confirm模式,所有在这个信道上面发布的消息都会被指定唯一的一个ID(ID从1开始).一旦消息被投递到所有匹配的队列以后,broker就会发送一个确认给生产者(包含ID),这样使得生产者知道消息已经正确到底目的队列了。如果消息和队列是可持久化的,那么确认消息就会在消息被写入磁盘以后发出,broker回传给生产者的确认消息中delivery-tag包含了确认消息的序列号。

  • 在这里插入图片描述

2.实现消息可靠传递的三个条件

2.1队列持久化

生产者发送消息到队列的时候,把durable参数设置为true(表示队列持久化)

// 参数1 queue :队列名// 参数2 durable :是否持久化// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列// 参数5 arguments
channel.queueDeclare(QUEUE_NAME,true,false,false,null);

2.2消息持久化

我们需要将消息标记为持久性 - 通过将消息属性(实现基本属性)设置为PERSISTENT_TEXT_PLAIN的值。

//交换机名称,队列名称,消息持久化,消息
channel.basicPublish("","task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

2.3发布确认

  • 队列接收到生产者发送的数据以后,队列把消息保存在磁盘(为了实现持久化),队列会把最终的可靠性传递结果告诉给生产者,这就是发布确认。
  • 三种常用的发布确认策略:单个确认发布、批量确认发布、异步确认发布

3.发布确认模式

RabbitMQ的发布确认模式默认是没有开启的,我们可以通过调用channel.confirmSelect()方法来手动开启发布确认模式。

3.1单个确认发布模式

  • 单个确认发布模式是一种简单的同步确认发布的方式。也就是说发布一个消息以后,只要确认它被确认发布,才可以继续发布后续的消息。
  • waitForConfirms(long)这一个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认,就会抛出异常。
  • 缺点:速度慢,因为如果没有确认消息的话,后面的消息都会被阻塞
publicclassConfirmMessage{//消息数量publicstaticfinalint MSG_CNT=200;publicstaticvoidmain(String[] args){//调用单个确认发布方法confirmSingleMessage();}publicstaticvoidconfirmSingleMessage(){try{//获取信道对象Channel channel =ConnectUtil.getChannel();//开启确认发布
            channel.confirmSelect();//声明队列String queue = UUID.randomUUID().toString();//队列持久化
            channel.queueDeclare(queue,true,false,false,null);//发送消息long start=System.currentTimeMillis();for(int i =0; i < MSG_CNT; i++){String msg="消息:"+i;//发送消息,消息需要持久化
                channel.basicPublish("", queue,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());//服务端返回false或者在超时时间内没有返回数据,生产者可以重新发送消息boolean flag=channel.waitForConfirms();if(flag){System.out.println("————————第"+(i+1)+"条消息发送成功————————");}else{System.out.println("========第"+(i+1)+"条消息发送失败=========");}}//记录结束时间long end=System.currentTimeMillis();System.out.println("发布:"+MSG_CNT+"个单独确认消息,耗时:"+(end-start)+"毫秒");}catch(IOException e){
            e.printStackTrace();}catch(TimeoutException e){
            e.printStackTrace();}catch(InterruptedException e){
            e.printStackTrace();}}}

在这里插入图片描述

3.2批量确认发布模式

  • 先发布一批信息然后一起确认可以大大提高吞吐量
  • 缺点:当故障发生的时候,我们不知道是哪一个消息出现了问题,我们需要把整个批处理保存在内存中,记录重要的信息后重新发布消息
  • 这种方案仍然是同步的方式,会阻塞消息的发布
publicclassConfirmMessage{//消息数量publicstaticfinalint MSG_CNT =200;publicstaticvoidmain(String[] args){//调用单个确认发布方法//confirmSingleMessage();//发布:200个单独确认消息,耗时:192毫秒confirmBatchMessage();}publicstaticvoidconfirmSingleMessage(){try{//获取信道对象Channel channel =ConnectUtil.getChannel();//开启确认发布
            channel.confirmSelect();//声明队列String queue = UUID.randomUUID().toString();//队列持久化
            channel.queueDeclare(queue,true,false,false,null);//发送消息long start =System.currentTimeMillis();for(int i =0; i < MSG_CNT; i++){String msg ="消息:"+ i;//发送消息,消息需要持久化
                channel.basicPublish("", queue,MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());//服务端返回false或者在超时时间内没有返回数据,生产者可以重新发送消息boolean flag = channel.waitForConfirms();if(flag){System.out.println("————————第"+(i +1)+"条消息发送成功————————");}else{System.out.println("========第"+(i +1)+"条消息发送失败=========");}}//记录结束时间long end =System.currentTimeMillis();System.out.println("发布:"+ MSG_CNT +"个单独确认消息,耗时:"+(end - start)+"毫秒");}catch(IOException e){
            e.printStackTrace();}catch(TimeoutException e){
            e.printStackTrace();}catch(InterruptedException e){
            e.printStackTrace();}}publicstaticvoidconfirmBatchMessage(){try{//获取信道对象Channel channel =ConnectUtil.getChannel();//开启确认发布
            channel.confirmSelect();//批量确认消息数量int batchSize=20;//未确认消息数量int nackMessageCount=0;//声明队列String queue = UUID.randomUUID().toString();//队列持久化
            channel.queueDeclare(queue,true,false,false,null);//发送消息long start =System.currentTimeMillis();for(int i =0; i < MSG_CNT; i++){String msg ="消息:"+ i;//发送消息,消息需要持久化
                channel.basicPublish("", queue,MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());//累加未确认的发布数量
                nackMessageCount++;//判断的未确认消息数量和批量确认消息的数量是否一致if(nackMessageCount==batchSize){//服务端返回false或者在超时时间内没有返回数据,生产者可以重新发送消息boolean flag = channel.waitForConfirms();if(flag){System.out.println("————————第"+(i +1)+"条消息发送成功————————");}else{System.out.println("========第"+(i +1)+"条消息发送失败=========");}//清空未确认发布消息个数
                    nackMessageCount=0;}}//为了确认剩下的是没有确认的消息,所以要再次进行确认if(nackMessageCount>0){//再次重新确认
                channel.waitForConfirms();}//记录结束时间long end =System.currentTimeMillis();System.out.println("发布:"+ MSG_CNT +"个单独确认消息,耗时:"+(end - start)+"毫秒");}catch(IOException e){
            e.printStackTrace();}catch(TimeoutException e){
            e.printStackTrace();}catch(InterruptedException e){
            e.printStackTrace();}}}

在这里插入图片描述

3.3异步确认发布模式

在这里插入图片描述

//异步消息发布确认publicstaticvoidpublishMessageAsync()throwsException{Channel channel =ConnectUtil.getChannel();//声明队列,此处使用UUID作为队列的名字String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);//开启发布确认模式
        channel.confirmSelect();//创建ConcurrentSkipListMap集合(跳表集合)ConcurrentSkipListMap<Long,String> concurrentSkipListMap =newConcurrentSkipListMap<>();//确认收到消息回调函数ConfirmCallback ackCallBack =newConfirmCallback(){@Overridepublicvoidhandle(long deliveryTag,boolean multiple)throwsIOException{//判断是否批量异步确认if(multiple){//把集合中没有被确认的消息添加到该集合中ConcurrentNavigableMap<Long,String> confirmed = concurrentSkipListMap.headMap(deliveryTag,true);//清除该部分没有被确认的消息
                    confirmed.clear();}else{//只清除当前序列胡的消息
                    concurrentSkipListMap.remove(deliveryTag);}System.out.println("确认的消息序列序号:"+ deliveryTag);}};//未被确认消息的回调函数ConfirmCallback nackCallBack =newConfirmCallback(){@Overridepublicvoidhandle(long deliveryTag,boolean multiple)throwsIOException{//获取没有被确认的消息String msg = concurrentSkipListMap.get(deliveryTag);System.out.println("发布的消息:"+ msg +"未被确认,该消息序列号:"+ deliveryTag);}};//添加异步确认监听器
        channel.addConfirmListener(ackCallBack, nackCallBack);//记录开始时间long start =System.currentTimeMillis();//循环发送消息for(int i =0; i < MSG_CNT; i++){//消息内容String message ="消息:"+ i;//把未确认的消息放到集合中,通过序列号和消息进行关联//            channel.getNextPublishSeqNo(); 获取下一个消息的序列号
            concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message);//发送消息
            channel.basicPublish("", queueName,null, message.getBytes());}//记录结束时间long end =System.currentTimeMillis();System.out.println("发布"+MSG_CNT+"个批量确认消息,一共耗时:"+(end-start)+"毫秒");}

在这里插入图片描述


本文转载自: https://blog.csdn.net/qq_52797170/article/details/127226847
版权归原作者 不断前进的皮卡丘 所有, 如有侵权,请联系我们删除。

“RabbitMQ:发布确认模式”的评论:

还没有评论