0


RabbitMQ 消息可靠性问题

目录

一. 两种消费思路

  1. RabbitMQ 的消息消费中,整体上来说有两种不同的思路:(1) 推(push):MQ 主动将消息推送给消费者,这种方式需要消费设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式 (2) 拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方法
  2. 先来看第一种:推(push),这种其实我们上面一直都在使用,就是通过注解 @RabbitListener 注解去监听消费者@RabbitListener(queues =DirectConfig.MY_DIRECT_QUEUE_NAME_01)publicvoidhandMsg(String msg){System.out.println("handMsg = "+ msg);// int i = 1 / 0;}
  3. 大家可以自己在类中手动设置一条异常,然后启动,当监听到队列中有消息时,就会触发这个方法了
  4. 再开看第二种:拉(pull),这种是通过 rabbitTemplate 中的 receiveAndConvert 方法来拉取一条消息下来。如果该方法返回值为 null,表示该队列上没有消息了。@AutowiredRabbitTemplate rabbitTemplate;@Testpublicvoidtest08(){//主动从队列中消息一条消息,即队列中拉一下消息下来String pull =(String) rabbitTemplate.receiveAndConvert(DirectConfig.MY_DIRECT_QUEUE_NAME_02);System.out.println("pull = "+ pull);}注意:这里与我们上面用的 convertAndSend 是不一样的,这个是发送消息,而 receiveAndConvert 则是拉取消息。
  5. 如果需要从消息队列中持续获取消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。
  6. 切记将拉模式放到一个死循环中,变相订阅消息,这会严重影响 RabbitMQ 的性能

二. 确保消费成功的两种思路

  1. 为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。 (1) 当 autoAck 为 false 时,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息删除。 (2) 当 autoAck 为 true 时,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除,即使这些消息并没有到达消费者。
  2. 换句话说,就是当 autoAck 为 false 时,消费者就变得非常从容了,它将会有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack ,此时 RabbitMQ 才会认为这条消息消费成功。
  3. 如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。
  4. 确保消息被成功消费,无非就是两种:手动 Ack 或者自动 Ack ,当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息的时候,解决幂等性的问题。
  5. 解决幂等性的问题:可以参考一下我的这篇文章:

三. 消息确认

I. 自动确认

  1. 在 Spring Boot 中,默认情况下,消息消费就是自动确认的。
  2. 下面就是消息消费的方法,其实与之前的也是一样的。因为推模式中,就是自动确认的。@ComponentpublicclassDirectConsumer{/** * 监听队列 * * 默认情况下,这个方法是自动确认消息是否消费成功的 * 如果这个方法抛出异常,表示消息消费失败,消息会重新回到队列中(RabbitMQ 主页的 ready 中) * 然后客户端会立马重试,然后又把消息拉到客户端来处理,又抛出异常...(陷入死循环) */@RabbitListener(queues =DirectConfig.MY_DIRECT_QUEUE_NAME_01)publicvoidhandMsg(String msg){System.out.println("handMsg = "+ msg);// int i = 1 / 0;}}通过 @Componet 注解将当前类注入到 Spring 容器中,通过 @RabbitListenter 注解来标记一个消息消费,默认情况下,消息消费方法自带事务。即如果该方法执行的过程中,抛出异常,那么消息会重新回到队列中等待下一次被调用,如果该方法正常执行完成没有抛出异常,那么这条消息就算是被消费了

II. 手动确认

手动确认又分为两种:

推模式手动确认

拉模式手动确认

A. 推模式手动确认

  1. 要开启手动确认,首先需要在 application.properties 中开启手动确认# 设置消息的确认模式改为手动,默认是自动的spring.rabbitmq.listener.simple.acknowledge-mode=manual
  2. 改为手动确认之后,消费者中的代码就与之前不一样了@ComponentpublicclassDirectConsumer{/** * 这里是手动确认 */@RabbitListener(queues =DirectConfig.MY_DIRECT_QUEUE_NAME_02)publicvoidhandleMsg02(Message message,Channel channel)throwsIOException{//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//获取消息的内容byte[] body = message.getBody();//把消息转换成字符串System.out.println("msg = "+newString(body,0, body.length));/** * 能够执行到这里,说明没有问题,手动确认消息已经收到 * * 参数一:消息的唯一标记; * 参数二:false 表示仅确认当前消息消费成功; true 表示之前还没有确认的消息都消费成功 *///int i = 1 / 0; channel.basicAck(deliveryTag,false);}catch(IOException e){ e.printStackTrace();/** * 拒绝当前消息的消费 * * 参数一:消息的唯一标记 * 参数二:false 表示仅拒绝当前消息消费; true 表示拒绝之前所有没有被消费者消费的消息 * 参数三:被拒绝的消息是否重新入队 */ channel.basicNack(deliveryTag,false,true);}}}(1) 将消费者需要做的事情都写入到 try-catch 里面(2) 如果消息正常消费,则执行 basicAck 确认消费成功(3) 反之,则执行 basicNack 高数 RabbitMQ 消息消费失败
  3. 在上面的代码中,对 basicAckbasicNack 这两个方法中的参数说明也已经写在注释中了,这里就不继续说了 。
  4. 那么怎么测试呢?测试之前先补充一个内容:- Unacked:表示已经发送给消费者但还收到消费 ack 的消息数量- Ready:表示待消费的消费数量(1) 首先在 RabbitMQ 的 web 管理端确认一下测试的队列中是否有待消费的消息确认是否有待消费的消息(2) 然后在 try-catch 中手动设置一个异常,启动项目,这时候因为生产者还没有发送消息,异常是不会报错的(3) 接着,在生产者中给消费者发送一条消息,我这里还是用前面使用多的单元测试中的 test04``````@AutowiredRabbitTemplate rabbitTemplate;@Testpublicvoidtest04(){ rabbitTemplate.convertAndSend(DirectConfig.MY_DIRECT_EXCHANGE_NAME,DirectConfig.MY_DIRECT_QUEUE_NAME_02,"Hello Peng");}(4) 启动之后,观察 RabbitMQ 页面以及启动类日志变化。此时会发现,启动类日志报错了,而 RabbitMQ 页面中 Unacked 中待确认的消息也显示一条,说明消费者那里出现了异常,还没有确认。RabbitMQ页面变化(5) 这时候把消费者关闭,再次观察 RabbitMQ 页面变化,就会发现 Unacked 没了,而待消费数量变成 1 。说明:消费者那里没有确认消息,消息从新回到了 ready 中。RabbitMQ页面变化2

B. 拉模式手动确认

补充扩展:

  1. 其实拉模式也是可以自动确认的,只需要在 @Transactional 注解在事务中 进行即可。代码就是我们上面介绍 拉模式 时候的 test08 ,加个注解即可@AutowiredRabbitTemplate rabbitTemplate;@Test@Transactionalpublicvoidtest08(){//主动从队列中消息一条消息,即队列中拉一下消息下来String pull =(String) rabbitTemplate.receiveAndConvert(DirectConfig.MY_DIRECT_QUEUE_NAME_02);//int i = 1 / 0;System.out.println("pull = "+ pull);}
  2. 注意,此时的 @Transactional 注解只是一个标记,还需要配置一下 TransactionManager ,在配置类中增加一行配置。我们前面发送可靠性中:开启事务机制 的时候也用过@BeanPlatformTransactionManagerplatformTransactionManager(ConnectionFactory connectionFactory){returnnewRabbitTransactionManager(connectionFactory);}
  3. 当有异常时,拉下来的消息就不会确认;正常的话就自动确认。

手动确认

  1. 拉模式的手动确认就比较麻烦了,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以这里得使用原生的方法去操作@AutowiredRabbitTemplate rabbitTemplate;@Testpublicvoidtest02(){//创建一个不带事务的消息通道。有了通道就好办了,就可以进行确认与取消了Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);long deliveryTag =0;try{//这里是获取队列的一条消息//参数二:表示此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息删除GetResponseGetResponse= channel.basicGet(DirectConfig.MY_DIRECT_QUEUE_NAME_02,false);//获取标记 deliveryTag =GetResponse.getEnvelope().getDeliveryTag();//int i = 1 / 0;//确认消息 channel.basicAck(deliveryTag,false);}catch(IOException e){ e.printStackTrace();try{//取消消息 channel.basicNack(deliveryTag,false,true);}catch(IOException ex){ ex.printStackTrace();}}}
  2. channel.basicGet() 这个方法中,第一个参数就是队列的名字,第二个参数则是上面提到的 autoAck 了,这个在上面 确保消费成功的两种思路 中提到过。
  3. 这里的异常测试与前面提到的都大同小异,通过观察 RabbitMQ 的 web 端管理页面自行测试即可。

III. 消息拒绝

  1. 当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。
  2. 拒绝的方式如下:@RabbitListener(queues =DirectConfig.MY_DIRECT_QUEUE_NAME_02)publicvoidhandleMsg03(Message message,Channel channel){//获取消息编号long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//拒绝消息 channel.basicReject(deliveryTag,true);}catch(IOException e){ e.printStackTrace();}}
  3. 消息拒绝只是 basicReject 该方法的调用与前面不同,其他都是一样的,这里就不做演示了。
标签: rabbitmq java 分布式

本文转载自: https://blog.csdn.net/weixin_50983264/article/details/125253480
版权归原作者 天怎么不会塌 所有, 如有侵权,请联系我们删除。

“RabbitMQ 消息可靠性问题”的评论:

还没有评论