0


高级篇-rabbitmq的高级特性

1.消息可靠性

  1. 三种丢失的情形:

1.1 生产者确认机制

启动MQ

创建Queues:

两种Callback:

1.ReturnCallback:全局callback

2.ComfirmCallback: 发送信息时候设置

  1. @Test
  2. public void testSendMessage2SimpleQueue() throws InterruptedException {
  3. // 1.准备消息
  4. String message = "hello, spring amqp!";
  5. // 2.准备CorrelationData
  6. // 2.1.消息ID
  7. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  8. // 2.2.准备ConfirmCallback
  9. correlationData.getFuture().addCallback(result -> {
  10. // 判断结果
  11. if (result.isAck()) {
  12. // ACK
  13. log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
  14. } else {
  15. // NACK
  16. log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
  17. // 重发消息
  18. }
  19. }, ex -> {
  20. // 记录日志
  21. log.error("消息发送失败!", ex);
  22. // 重发消息
  23. });
  24. // 3.发送消息
  25. rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);
  26. }

执行成功:

监控页面:

模拟失败:

1.投递到交互机失败

2.投递到交换机了,但是没有进入队列

1.2 消息持久化

  1. 注意: 生产者确认只能保证数据放到队列当中,但是无法保证数据不丢失(比如所在的机器宕机了),
  2. 所以还需要保证数据的持久化

  1. @Configuration
  2. public class CommonConfig {
  3. @Bean
  4. public DirectExchange simpleDirect(){
  5. return new DirectExchange("simple.direct");
  6. }
  7. @Bean
  8. public Queue simpleQueue(){
  9. return QueueBuilder.durable("simple.queue").build();
  10. }
  11. }
  1. @Test
  2. public void testDurableMessage() {
  3. // 1.准备消息 消息持久化
  4. Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
  5. .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
  6. .build();
  7. // 2.发送消息
  8. rabbitTemplate.convertAndSend("simple.queue", message);
  9. }

注意:

  1. //交换机不传值默认就是持久化
  2. //交换机、队列、消息默认都是持久化
  3. @Bean
  4. public DirectExchange simpleDirect(){
  5. return new DirectExchange("simple.direct");
  6. }
  7. public AbstractExchange(String name) {
  8. this(name, true, false);
  9. }

演示数据是否默认持久化:

  1. 重启mq

1. 交互机、队列、消息都做持久化

2.消费者端关闭防止被消费

3.重启mq后看队列中数据是否还在(是否持久化)

1.3 消费者消息确认

  1. 生产者确认:能确定消息投递到队列
  2. 消息持久化:能避免MQ宕机造成的消息丢失
  3. 生产者确认和消息持久化能保证消息能投递到消费者,但是无法保证消息被消费者消费(比如投递消费者的
  4. 同时,消费者所在机器宕机了)

  1. 1.manual:不推荐 代码侵入
  2. try{
  3. //业务逻辑
  4. ack
  5. } catch(ex){
  6. nack
  7. }
  8. 2.auto:推荐 spring全权完成,不需要手动写代码
  9. 3.none:不推荐 投递完成立马删除消息,是否成功都不管
  1. @Slf4j
  2. @Component
  3. public class SpringRabbitListener {
  4. @RabbitListener(queues = "simple.queue")
  5. public void listenSimpleQueue(String msg) {
  6. log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
  7. //模拟出现异常情况
  8. System.out.println(1 / 0);
  9. log.info("消费者处理消息成功!");
  10. }
  11. }

默认为none:抛出异常后消息立即被删除:

修改为auto模式:

队列返回nack会再去发送信息:

1.4 失败重试机制

演示失败重试机制:

  1. listener:
  2. simple:
  3. prefetch: 1
  4. acknowledge-mode: auto
  5. retry:
  6. enabled: true
  7. initial-interval: 1000
  8. multiplier: 3
  9. max-attempts: 4

默认重试到达最大次数后消息就丢弃:

  1. 但是对于一些比较重要不能丢弃的消息需要使用以下策略: ![](https://img-blog.csdnimg.cn/17584e4a42f34937b7219627399e6711.png)
  1. 推荐使用第三种方案:将失败的消息发送到失败的交换机和失败的队列中,后面可以告知管理员然后重新
  2. 人工去处理

  1. @Configuration
  2. public class ErrorMessageConfig {
  3. @Bean
  4. public DirectExchange errorMessageExchange(){
  5. return new DirectExchange("error.direct");
  6. }
  7. @Bean
  8. public Queue errorQueue(){
  9. return new Queue("error.queue");
  10. }
  11. @Bean
  12. public Binding errorMessageBinding(){
  13. return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
  14. }
  15. @Bean
  16. public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
  17. return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
  18. }
  19. }

演示:

发送消息:

面试题:最后一分钟的总结

2. 死信交换机

2.1 初识死信交换机

  1. 1.发送信息到消费者默认的retry重试机制,达到最大次数就会被reject
  2. 2.队列中绑定一个死信交换机,接收被reject的信息,然后发送到dl.queue
  3. 3.这样就不担心死信会丢失

对比消息失败信息处理策略:

2.2 TTL

注意: 存活时间取消息所在队列中存货时间 、消息本身存活时间的以短的时间为准

  1. @Slf4j
  2. @Component
  3. public class SpringRabbitListener {
  4. @RabbitListener(bindings = @QueueBinding(
  5. value = @Queue(name = "dl.queue", durable = "true"),
  6. exchange = @Exchange(name = "dl.direct"),
  7. key = "dl"
  8. ))
  9. public void listenDlQueue(String msg) {
  10. log.info("消费者接收到了dl.queue的延迟消息");
  11. }
  12. }

  1. @Configuration
  2. public class TTLMessageConfig {
  3. @Bean
  4. public DirectExchange ttlDirectExchange(){
  5. return new DirectExchange("ttl.direct");
  6. }
  7. @Bean
  8. public Queue ttlQueue(){
  9. return QueueBuilder
  10. .durable("ttl.queue")
  11. .ttl(10000)
  12. .deadLetterExchange("dl.direct")
  13. .deadLetterRoutingKey("dl")
  14. .build();
  15. }
  16. @Bean
  17. public Binding ttlBinding(){
  18. return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
  19. }
  20. }

  1. @Test
  2. public void testTTLMessage() {
  3. // 1.准备消息
  4. Message message = MessageBuilder
  5. .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
  6. .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
  7. .setExpiration("5000")
  8. .build();
  9. // 2.发送消息
  10. rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
  11. // 3.记录日志
  12. log.info("消息已经成功发送!");
  13. }

演示延时队列:

1.启动消费者

2.发送消息:testTTLMessage()

2.3 延迟队列

p159 27:18

  1. @Slf4j
  2. @Component
  3. public class SpringRabbitListener {
  4. @RabbitListener(bindings = @QueueBinding(
  5. value = @Queue(name = "delay.queue", durable = "true"),
  6. exchange = @Exchange(name = "delay.direct", delayed = "true"),
  7. key = "delay"
  8. ))
  9. public void listenDelayExchange(String msg) {
  10. log.info("消费者接收到了delay.queue的延迟消息");
  11. }
  12. }

  1. @Test
  2. public void testSendDelayMessage() throws InterruptedException {
  3. // 1.准备消息
  4. Message message = MessageBuilder
  5. .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
  6. .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
  7. .setHeader("x-delay", 5000)
  8. .build();
  9. // 2.准备CorrelationData
  10. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  11. // 3.发送消息
  12. rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
  13. log.info("发送消息成功");
  14. }

演示延时队列:

1.启动消费者

2.运行testSendDelayMessage

报错原因:消息没有做路由

如何不报错:添加延迟的判断:

  1. @Slf4j
  2. @Configuration
  3. public class CommonConfig implements ApplicationContextAware {
  4. @Override
  5. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  6. // 获取RabbitTemplate对象
  7. RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
  8. // 配置ReturnCallback
  9. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  10. // 判断是否是延迟消息
  11. Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
  12. if (receivedDelay != null && receivedDelay > 0) {
  13. // 是一个延迟消息,忽略这个错误提示
  14. return;
  15. }
  16. // 记录日志
  17. log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
  18. replyCode, replyText, exchange, routingKey, message.toString());
  19. // 如果有需要的话,重发消息
  20. });
  21. }
  22. }

标签: mq rabbitmq

本文转载自: https://blog.csdn.net/wanglvip/article/details/128273475
版权归原作者 黑冰vip 所有, 如有侵权,请联系我们删除。

“高级篇-rabbitmq的高级特性”的评论:

还没有评论