目录
一. 两种消费思路
- RabbitMQ 的消息消费中,整体上来说有两种不同的思路:(1) 推(push):MQ 主动将消息推送给消费者,这种方式需要消费设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式 (2) 拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方法
- 先来看第一种:推(push),这种其实我们上面一直都在使用,就是通过注解
@RabbitListener
注解去监听消费者@RabbitListener(queues =DirectConfig.MY_DIRECT_QUEUE_NAME_01)publicvoidhandMsg(String msg){System.out.println("handMsg = "+ msg);// int i = 1 / 0;}
- 大家可以自己在类中手动设置一条异常,然后启动,当监听到队列中有消息时,就会触发这个方法了
- 再开看第二种:拉(pull),这种是通过 rabbitTemplate 中的
receiveAndConvert
方法来拉取一条消息下来。如果该方法返回值为 null,表示该队列上没有消息了。@AutowiredRabbitTemplate rabbitTemplate;@Testpublicvoidtest08(){//主动从队列中消息一条消息,即队列中拉一下消息下来String pull =(String) rabbitTemplate.receiveAndConvert(DirectConfig.MY_DIRECT_QUEUE_NAME_02);System.out.println("pull = "+ pull);}
注意:这里与我们上面用的convertAndSend
是不一样的,这个是发送消息,而receiveAndConvert
则是拉取消息。 - 如果需要从消息队列中持续获取消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。
- 切记将拉模式放到一个死循环中,变相订阅消息,这会严重影响 RabbitMQ 的性能
二. 确保消费成功的两种思路
- 为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。 (1) 当
autoAck
为 false 时,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息删除。 (2) 当autoAck
为 true 时,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除,即使这些消息并没有到达消费者。 - 换句话说,就是当 autoAck 为 false 时,消费者就变得非常从容了,它将会有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack ,此时 RabbitMQ 才会认为这条消息消费成功。
- 如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。
- 确保消息被成功消费,无非就是两种:手动 Ack 或者自动 Ack ,当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息的时候,解决幂等性的问题。
- 解决幂等性的问题:可以参考一下我的这篇文章:
三. 消息确认
I. 自动确认
- 在 Spring Boot 中,默认情况下,消息消费就是自动确认的。
- 下面就是消息消费的方法,其实与之前的也是一样的。因为推模式中,就是自动确认的。
@ComponentpublicclassDirectConsumer{/** * 监听队列 * * 默认情况下,这个方法是自动确认消息是否消费成功的 * 如果这个方法抛出异常,表示消息消费失败,消息会重新回到队列中(RabbitMQ 主页的 ready 中) * 然后客户端会立马重试,然后又把消息拉到客户端来处理,又抛出异常...(陷入死循环) */@RabbitListener(queues =DirectConfig.MY_DIRECT_QUEUE_NAME_01)publicvoidhandMsg(String msg){System.out.println("handMsg = "+ msg);// int i = 1 / 0;}}
通过@Componet
注解将当前类注入到 Spring 容器中,通过@RabbitListenter
注解来标记一个消息消费,默认情况下,消息消费方法自带事务。即如果该方法执行的过程中,抛出异常,那么消息会重新回到队列中等待下一次被调用,如果该方法正常执行完成没有抛出异常,那么这条消息就算是被消费了
II. 手动确认
手动确认又分为两种:
推模式手动确认
与
拉模式手动确认
A. 推模式手动确认
- 要开启手动确认,首先需要在
application.properties
中开启手动确认# 设置消息的确认模式改为手动,默认是自动的spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 改为手动确认之后,消费者中的代码就与之前不一样了
@ComponentpublicclassDirectConsumer{/** * 这里是手动确认 */@RabbitListener(queues =DirectConfig.MY_DIRECT_QUEUE_NAME_02)publicvoidhandleMsg02(Message message,Channel channel)throwsIOException{//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//获取消息的内容byte[] body = message.getBody();//把消息转换成字符串System.out.println("msg = "+newString(body,0, body.length));/** * 能够执行到这里,说明没有问题,手动确认消息已经收到 * * 参数一:消息的唯一标记; * 参数二:false 表示仅确认当前消息消费成功; true 表示之前还没有确认的消息都消费成功 *///int i = 1 / 0; channel.basicAck(deliveryTag,false);}catch(IOException e){ e.printStackTrace();/** * 拒绝当前消息的消费 * * 参数一:消息的唯一标记 * 参数二:false 表示仅拒绝当前消息消费; true 表示拒绝之前所有没有被消费者消费的消息 * 参数三:被拒绝的消息是否重新入队 */ channel.basicNack(deliveryTag,false,true);}}}
(1) 将消费者需要做的事情都写入到try-catch
里面(2) 如果消息正常消费,则执行basicAck
确认消费成功(3) 反之,则执行basicNack
高数 RabbitMQ 消息消费失败 - 在上面的代码中,对
basicAck
和basicNack
这两个方法中的参数说明也已经写在注释中了,这里就不继续说了 。 - 那么怎么测试呢?测试之前先补充一个内容:- Unacked:表示已经发送给消费者但还收到消费 ack 的消息数量- Ready:表示待消费的消费数量(1) 首先在 RabbitMQ 的 web 管理端确认一下测试的队列中是否有待消费的消息(2) 然后在
try-catch
中手动设置一个异常,启动项目,这时候因为生产者还没有发送消息,异常是不会报错的(3) 接着,在生产者中给消费者发送一条消息,我这里还是用前面使用多的单元测试中的test04``````@AutowiredRabbitTemplate rabbitTemplate;@Testpublicvoidtest04(){ rabbitTemplate.convertAndSend(DirectConfig.MY_DIRECT_EXCHANGE_NAME,DirectConfig.MY_DIRECT_QUEUE_NAME_02,"Hello Peng");}
(4) 启动之后,观察 RabbitMQ 页面以及启动类日志变化。此时会发现,启动类日志报错了,而 RabbitMQ 页面中 Unacked 中待确认的消息也显示一条,说明消费者那里出现了异常,还没有确认。(5) 这时候把消费者关闭,再次观察 RabbitMQ 页面变化,就会发现 Unacked 没了,而待消费数量变成 1 。说明:消费者那里没有确认消息,消息从新回到了 ready 中。
B. 拉模式手动确认
补充扩展:
- 其实拉模式也是可以自动确认的,只需要在
@Transactional
注解在事务中 进行即可。代码就是我们上面介绍 拉模式 时候的test08
,加个注解即可@AutowiredRabbitTemplate rabbitTemplate;@Test@Transactionalpublicvoidtest08(){//主动从队列中消息一条消息,即队列中拉一下消息下来String pull =(String) rabbitTemplate.receiveAndConvert(DirectConfig.MY_DIRECT_QUEUE_NAME_02);//int i = 1 / 0;System.out.println("pull = "+ pull);}
- 注意,此时的
@Transactional
注解只是一个标记,还需要配置一下 TransactionManager ,在配置类中增加一行配置。我们前面发送可靠性中:开启事务机制
的时候也用过@BeanPlatformTransactionManagerplatformTransactionManager(ConnectionFactory connectionFactory){returnnewRabbitTransactionManager(connectionFactory);}
- 当有异常时,拉下来的消息就不会确认;正常的话就自动确认。
手动确认
- 拉模式的手动确认就比较麻烦了,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以这里得使用原生的方法去操作
@AutowiredRabbitTemplate rabbitTemplate;@Testpublicvoidtest02(){//创建一个不带事务的消息通道。有了通道就好办了,就可以进行确认与取消了Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);long deliveryTag =0;try{//这里是获取队列的一条消息//参数二:表示此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息删除GetResponseGetResponse= channel.basicGet(DirectConfig.MY_DIRECT_QUEUE_NAME_02,false);//获取标记 deliveryTag =GetResponse.getEnvelope().getDeliveryTag();//int i = 1 / 0;//确认消息 channel.basicAck(deliveryTag,false);}catch(IOException e){ e.printStackTrace();try{//取消消息 channel.basicNack(deliveryTag,false,true);}catch(IOException ex){ ex.printStackTrace();}}}
- 在
channel.basicGet()
这个方法中,第一个参数就是队列的名字,第二个参数则是上面提到的autoAck
了,这个在上面确保消费成功的两种思路
中提到过。 - 这里的异常测试与前面提到的都大同小异,通过观察 RabbitMQ 的 web 端管理页面自行测试即可。
III. 消息拒绝
- 当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。
- 拒绝的方式如下:
@RabbitListener(queues =DirectConfig.MY_DIRECT_QUEUE_NAME_02)publicvoidhandleMsg03(Message message,Channel channel){//获取消息编号long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//拒绝消息 channel.basicReject(deliveryTag,true);}catch(IOException e){ e.printStackTrace();}}
- 消息拒绝只是
basicReject
该方法的调用与前面不同,其他都是一样的,这里就不做演示了。
本文转载自: https://blog.csdn.net/weixin_50983264/article/details/125253480
版权归原作者 天怎么不会塌 所有, 如有侵权,请联系我们删除。
版权归原作者 天怎么不会塌 所有, 如有侵权,请联系我们删除。