0


集成框架-RabbitMQ重试和确认

集成框架-RabbitMQ重试和确认

前言:

关于消息重试

Spring Boot

中,你可以使用

yml

格式的配置文件来配置

RabbitMQ 

的重试机制。

spring:rabbitmq:host: localhost
    port:5672username: guest
    password: guest
    listener:simple:retry:enabled:trueinitial-interval:5000# 初始重试间隔时间(毫秒)max-attempts:3# 最大重试次数max-interval:10000# 最大重试间隔时间(毫秒)multiplier:2.0# 重试间隔时间倍数

在这个配置中,启用了

 RabbitMQ 的重试机制

,并指定了重试的初始

间隔时间

最大重试次数

、最大重试间隔时间和重试间隔时间的

倍数


关于消息确认

当消费者处理消息时,如果处理成功,可以使用确认机制告知

RabbitMQ

已经成功消费了该消息。如果处理失败,则消息会被重新放回队列,等待重试。下面是一个演示确认机制的示例:

import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;

@Component
public class MessageConsumer {

    @RabbitListener(queues ="retry_queue")
    public void handleMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        try {
            System.out.println("Received message: " + message);
            // 在这里进行消息处理
            // 如果处理成功,手动确认消息
            channel.basicAck(tag, false);} catch (Exception e){
            // 如果处理失败,可以选择手动拒绝消息并重新放回队列,或者进行其他处理
            System.out.println("Failed to process message: " + e.getMessage());
            // 手动拒绝消息并重新放回队列
            channel.basicReject(tag, true);}}}

正文

消息重试

当消费者报错时,

RabbitMQ

将会使用

yml 配置

文件中设置的重试策略对消息进行重试。

添加

spring-boot-starter-amqp

依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
amqp 

是什么在

rabbitmq

中有什么作用呢,amqp是异步消息传输的协议 它定义了消息的传输格式、消息交换模式、队列管理方式等


下面是

AMQP

RabbitMQ

中的一些网络文章罗列的作用:

  • 消息格式定义: 定义了消息的格式,包括消息的头部、属性、内容等。消息的格式化使得消息在不同的应用程序之间可以被正确地解析和处理。
  • 消息交换模式: 定义了消息的交换模式,包括直接交换、扇出交换、主题交换等。这些交换模式使得消息可以被路由到不同的队列,以满足不同的业务需求。
  • 队列管理: 定义了队列的管理方式,包括队列的创建、删除、绑定、解绑等操作。这些操作使得队列可以被动态地管理和调整,以适应不同的应用场景。
  • 消息确认机制: 提供了消息确认机制,包括自动确认和手动确认两种模式。消息确认机制可以确保消息被正确地接收和处理,从而提高了消息传输的可靠性和稳定性。
  • 事务支持: 支持事务,可以保证一组消息的原子性操作。事务机制可以确保消息的原子性,从而保证消息的一致性和可靠性。

然后,可以在配置文件中配置

RabbitMQ

连接信息和重试策略:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 5000# 初始重试间隔时间(毫秒)
          max-attempts: 3# 最大重试次数
          max-interval: 10000# 最大重试间隔时间(毫秒)
          multiplier: 2.0# 重试间隔时间倍数

接下来,编写一个生产者,用于发送消息到

RabbitMQ

import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void produceMessage(String message){
        amqpTemplate.convertAndSend("retry_exchange", "retry_key", message);}}

然后,编写一个消费者,用于消费消息并故意引发异常:

import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;

@Component
public class MessageConsumer {

    @RabbitListener(queues ="retry_queue")
    public void handleMessage(String message) throws Exception {
        System.out.println("Received message: " + message);
        // 模拟处理消息时发生异常
        throw new RuntimeException("Simulated exception occurred");}}

最后,在你的应用程序入口,创建一个简单的控制器,用于触发消息的发送:

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class Application {

    @Autowired
    private MessageProducer messageProducer;

    public static void main(String[] args){
        SpringApplication.run(Application.class, args);}

    @GetMapping("/send")
    public String sendMessage(){
        messageProducer.produceMessage("Hello RabbitMQ!");return"Message sent to RabbitMQ";}}

现在,当启动应用程序并访问

/send 

路径时,生产者将发送一条消息到

RabbitMQ

,然后消费者将会接收到这条消息,并故意引发异常。

Rabbitmq

将根据配置文件中的重试策略对消息进行重试,直到达到最大重试次数为止。


消息重试总结

以上就是消息重试的概念,关于如果不是一个服务,比如生产者配置3次指的是生产者发送失败重试的次数,消费者配置3次指的是消费者重试的次数,只要配置了,如果消费报错就会重试,生产报错也会重试,如果生产消费在一起,就是消费报错重试。


消息确认

当消费者处理消息时,如果处理成功,可以使用确认机制告知

RabbitMQ

已经成功消费了该消息。如果处理失败,则消息会被重新放回队列,等待重试。下面是一个确认机制的示例:


首先,确保你的消费者方法使用了

@RabbitListener

注解,并且使用

Channel

对象进行手动确认。这样,你就可以在消费者成功处理消息时手动确认消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;

@Component
public class MessageConsumer {

    @RabbitListener(queues ="retry_queue")
    public void handleMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        try {
            System.out.println("Received message: " + message);
            // 在这里进行消息处理
            // 如果处理成功,手动确认消息
            channel.basicAck(tag, false);} catch (Exception e){
            // 如果处理失败,可以选择手动拒绝消息并重新放回队列,或者进行其他处理
            System.out.println("Failed to process message: " + e.getMessage());
            // 手动拒绝消息并重新放回队列
            channel.basicReject(tag, true);}}}

这里使用

channel.basicAck(tag, false) 

方法手动确认消息。如果消费者成功处理消息,则调用这个方法告知

RabbitMQ 

已经成功消费了该消息。如果处理失败,则可以选择手动拒绝消息并重新放回队列,或者进行其他处理。如果你不用

channel.basicAck(tag, false) 

和没有引入

@Header(AmqpHeaders.DELIVERY_TAG) long tag

默认的话是自动确认。

所以确保在消费者方法的参数列表中包含

Channel 

对象和消息的

DELIVERY_TAG

。这样,

Spring AMQP 

就会将

Channel 

对象注入到消费者方法中,以便你可以使用它来手动确认消息。

这样,当消费者处理消息时,你就可以使用确认机制告知

RabbitMQ

消息的处理结果。如果处理成功,则消息被消费,否则消息会被重新放回队列等待重试。

RabbitMQ 

中,生产者并不会直接接收到消费者的确认请求。确认请求是由消费者向

RabbitMQ 

服务器发送的,用于告知

RabbitMQ

消息的处理结果。生产者可以选择监听确认事件,以便在消息被确认后执行相应的操作。

Spring Boot

中,可以使用

RabbitTemplate

ConfirmCallback

接口来监听确认事件。以下是一个生产者简单的示例:

import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void produceMessage(String message){
        CorrelationData correlationData = new CorrelationData("unique-id"); // 设置消息的唯一标识符
        rabbitTemplate.convertAndSend("retry_exchange", "retry_key", message, correlationData);
        
        // 设置 ConfirmCallback,用于监听消息的确认结果
        rabbitTemplate.setConfirmCallback((correlation, ack, cause) ->{if(ack){
                // 如果消息被确认
                System.out.println("Message with correlation id " + correlation.getId() + " confirmed");}else{
                // 如果消息被拒绝或超时
                System.out.println("Message with correlation id " + correlation.getId() + " rejected: " + cause);}});}}

在这个示例中,通过

CorrelationData

设置了消息的唯一标识符,并将其作为参数传递给

convertAndSend 

方法。然后,我们设置了

ConfirmCallback

,用于监听消息的确认结果。当消息被确认时,

ConfirmCallback

ack

参数将会为

 true

,表示消息被成功确认;当消息被拒绝或超时时,

ack 

参数将会为

false

,同时

cause

参数将会包含拒绝的原因。

通过监听

ConfirmCallback

,生产者可以在消息被确认时执行相应的操作,例如记录日志、更新状态等。

确认机制和重试机制的概念

RabbitMQ

中,消息的确认机制和重试机制是两个不同的概念,它们可以结合使用,但并不互相

排斥

。确认机制是用来告知

 RabbitMQ

消息的处理结果,而重试机制则是在消息处理失败时将消息重新放回队列,等待后续的重试。

如果消息被消费者确认了(即消费者成功处理了消息),

RabbitMQ

将会将该消息从队列中删除,不会再进行重试。这意味着即使消息在处理过程中出现了错误,但只要消费者成功确认了该消息,它就不会再次被放回队列进行重试。

然而,如果消费者拒绝了消息(即调用了

channel.basicReject(tag, true) 

方法),或者处理消息时发生了超时等问题,

RabbitMQ

将会将消息重新放回队列,等待后续的重试。这时,重试机制会起作用,根据配置的重试策略对消息进行重试,直到达到最大重试次数为止。

因此,确认机制和重试机制是可以结合使用的。确认机制用于告知

RabbitMQ

消息的处理结果,而重试机制则用于处理处理失败的消息,确保消息能够被成功处理。
如果按上文配置重试

3次

,那么生产者方法,会在第一次报错他被拒绝了,但是任然还会在队列里面重试

3次

才会结束,这就涉及到消息积压。这要注意

标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/weixin_44550490/article/details/138975889
版权归原作者 徐寿春 所有, 如有侵权,请联系我们删除。

“集成框架-RabbitMQ重试和确认”的评论:

还没有评论