一、概念
RabbitMQ消息确认机制指的是在消息传递过程中,发送方发送消息后,接收方需要对消息进行确认,以确保消息被正确地接收和处理。
1、生产者确认机制:生产者发送消息后,需要等待RabbitMQ服务器的确认消息,以确保消息已经被成功地发送到RabbitMQ服务器。如果RabbitMQ服务器没有收到消息或者消息发送失败,生产者会收到一个确认消息,从而可以进行重发或者其他处理。
2、消费者确认机制:消费者接收到消息后,需要向RabbitMQ服务器发送确认消息,以告诉服务器已经成功地接收并处理了该消息。如果消费者没有发送确认消息,RabbitMQ服务器会认为该消息没有被正确地处理,从而会将该消息重新发送给其他消费者进行处理。
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),
一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该 nack 消息。
二、发布确认策略(可以解决消息丢失问题)
需要:
1、设置队列必须持久化
2、设置要求队列中的消息必须持久计划
3、发布确认
开启发布确认的方法:
默认是没有开启的,若要开启需要调用方法confirmSelect,每当你要想使用发布确认,都需要在channel(信道开启发布确认)上调用该方法。
1、单个确认发布
是一种同步确认发布的方式,即发布一个消息之后只有它被确认发布。后续的消息才能继续发布,waitForConfirms这个方法只有在消息被确认的时候才返回,若在指定的时间内这个消息没有被确认,将会抛出异常。
该方式特点:发布速度特别慢,因为若没有确认发布的消息就会阻塞后续消息的发布,该方式最对提供每秒不超过数百条发布消息的吞吐量。
代码:
publicstaticvoidpublishMessageDanGe()throwsIOException,TimeoutException,InterruptedException{Channel channel =RabbitUtils.getChannel();// 队列声明String queueName =UUID.randomUUID().toString();/*
* 生成一个队列
* 1、队列名称
* 2、队列里面的消息是否持久化(磁盘)默认情况消息存储在内存中
* 3、该队列是否共一个消费者进行消费,是否进行消息共享,true可以多个消费者消费,false只能一个消费者消费
* 4、是否自动删除,最后一个消费者断开链接后,该队列是否自动删除,true自动删除,false不自动删除
* 5、其它参数
* */
channel.queueDeclare(queueName,true,false,false,null);// 开启发布确认
channel.confirmSelect();// 开始时间long begin =System.currentTimeMillis();// 批量发1000条消息for(int i =0; i <1000; i++){String message = i +"";
channel.basicPublish("", queueName,null, message.getBytes());// 只有在消息被确认的时候才返回boolean flag = channel.waitForConfirms();// 单个消息马上进行发布确认if(flag){System.out.println("消息发送成功");}}// 结束时间long end =System.currentTimeMillis();System.out.println("发布"+1000+"个单独确认消息耗时"+(end - begin)+"毫秒");}
2、批量确认发布
单个确认发布方式非常慢,与单个等待确认消息相比,先发布一批消息,然后一起确认可以极大地提高吞吐量,此方式缺点:当发生故障导致发布出现问题时,不知道是哪个消息出现了问题,必须将整个批处理保存在内存中,以记录重要的信息
代码:
publicstaticvoidpublishMessageBath()throwsIOException,TimeoutException,InterruptedException{Channel channel =RabbitUtils.getChannel();// 队列声明String queueName =UUID.randomUUID().toString();/*
* 生成一个队列
* 1、队列名称
* 2、队列里面的消息是否持久化(磁盘)默认情况消息存储在内存中
* 3、该队列是否共一个消费者进行消费,是否进行消息共享,true可以多个消费者消费,false只能一个消费者消费
* 4、是否自动删除,最后一个消费者断开链接后,该队列是否自动删除,true自动删除,false不自动删除
* 5、其它参数
* */
channel.queueDeclare(queueName,true,false,false,null);// 开启发布确认
channel.confirmSelect();// 开始时间long begin =System.currentTimeMillis();// 批量确认大小int batchSize =100;// 批量发送消息,批量发布确认for(int i =0; i <1000; i++){String message = i +"";
channel.basicPublish("", queueName,null, message.getBytes());// 判断达到100条消息时,批量确认一次if(i % batchSize ==0){// 发布确认
channel.waitForConfirms();}}// 结束时间long end =System.currentTimeMillis();System.out.println("发布"+1000+"批量确认消息耗时"+(end - begin)+"毫秒");}
3、异步确认发布
异步确认发布逻辑上复杂,但是性价比最高,具有高可靠性。它是利用回调函数来达到消息可靠性传递的。
需要准备消息的监听器,监听哪些消息成功了,哪些消息失败了(在信道上监听,异步通知)。
代码:
// 异步确认发布publicstaticvoidpublishMessageAsync()throwsIOException,TimeoutException{Channel channel =RabbitUtils.getChannel();// 队列声明String queueName =UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);// 开启发布确认
channel.confirmSelect();// 开始时间long begin =System.currentTimeMillis();// 消息确认成功回调函数ConfirmCallback ackCallback =(var1, var3)->{System.out.println("确认的消息:"+ var1);};// 消息确认失败回调函数ConfirmCallback nackCallback =(var1, var3)->{System.out.println("未确认的消息:"+ var1);};// 消息监听器,监听哪些消息成功,哪些消息失败。两个参数都存在说明已确认的和未确认的消息都监听/**
* 1、ackCallback 监听哪些消息成功
* 2、nackCallback 监听哪些消息失败
* */
channel.addConfirmListener(ackCallback, nackCallback);// 异步监听// 批量发送消息,批量发布确认for(int i =0; i <1000; i++){String message = i +"";
channel.basicPublish("", queueName,null, message.getBytes());}// 结束时间long end =System.currentTimeMillis();System.out.println("发布"+1000+"批量确认消息耗时"+(end - begin)+"毫秒");}
3.1 如何处理异步未确认的消息
把未确认的消息放到一个基于内存的能被发布线程访问的队列,例如ConcurrentLinkedQueue(并发链路队列)这个队列在confirm callbacks与发布线程之间进行详细的传递。
该队列会记录发布的所有的消息数据,将来哪些确认了,将其删除。剩余的就是未确认的。
代码:
// 异步确认发布,处理异步未确认的消息publicstaticvoidpublishMessageAsyncTask()throwsIOException,TimeoutException{Channel channel =RabbitUtils.getChannel();// 队列声明String queueName =UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);// 开启发布确认
channel.confirmSelect();// 消息确认成功回调函数ConfirmCallback ackCallback =(var1, var3)->{System.out.println("确认的消息:"+ var1);};// 消息确认失败回调函数ConfirmCallback nackCallback =(var1, var3)->{System.out.println("未确认的消息:"+ var1);};// 消息监听器,监听哪些消息成功,哪些消息失败。两个参数都存在说明已确认的和未确认的消息都监听/**
* 1、ackCallback 监听哪些消息成功
* 2、nackCallback 监听哪些消息失败
* */
channel.addConfirmListener(ackCallback, nackCallback);// 异步监听// 开始时间long begin =System.currentTimeMillis();// 批量发送消息,批量发布确认for(int i =0; i <1000; i++){String message = i +"";
channel.basicPublish("", queueName,null, message.getBytes());}// 结束时间long end =System.currentTimeMillis();System.out.println("发布"+1000+"批量确认消息耗时"+(end - begin)+"毫秒");}
三、总结
单独发布消息
同步等待确认,简单,但是吞吐量非常有限。
批量发布消息
批量同步等待确认,简单,合理的吞吐量,一旦出现问题后很难推断出是哪条消息出现了问题
异步处理
最佳的性能和资源使用,再出现错误的情况下可以很好的控制,但是实现较为复杂。
应答和发布的区别
应答功能属于消费者,消费完消息告诉 RabbitMQ 已经消费成功。
发布功能属于生产者,生产消息到 RabbitMQ,RabbitMQ 需要告诉生产者已经收到消息。
版权归原作者 imaginationtrouble 所有, 如有侵权,请联系我们删除。