0


高效处理消息:使用Spring Boot实现消息重试机制

当涉及到消息发送和接收的可靠性,Spring Boot提供了一些机制来确保消息的可靠传递。其中包括消息确认机制和重试机制。下面是一个示例代码,演示如何在Spring Boot中实现可靠的消息发送和接收。

首先,我们需要配置RabbitMQ的连接信息和相关属性。在

application.properties

文件中添加以下配置:

# RabbitMQ连接配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 消息重试配置
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.initial-interval=5000
spring.rabbitmq.listener.simple.retry.multiplier=2

接下来,我们创建队列、一个消息发送者(Producer)和一个消息接收者(Consumer)的示例代码。

定义队列示列代码:

@Configuration
public class RabbitMQConfig {

 @Bean
    public Queue fanout_queue_email() {
        return new Queue("fanout_queue_email");
    }
}

Producer(消息发送者)示例代码:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public MessageProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend("fanout_queue_sms", message);
        System.out.println("消息发送成功:" + message);
    }
}

在上面的代码中,我们注入了

RabbitTemplate

对象,它是Spring Boot提供的用于发送消息的工具类。通过调用

rabbitTemplate.convertAndSend()

方法发送消息到名为

fanout_queue_sms

的队列

Consumer(消息接收者)示例代码:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer {

    @RabbitListener(queues = "fanout_queue_sms")
    public void receiveMessage(String message) {
        System.out.println("接收到消息:" + message);
        // 模拟处理消息的业务逻辑
        processMessage(message);
    }

    private void processMessage(String message) {
        // 模拟业务处理耗时
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 手动抛出异常,触发消息重试
        if (message.contains("error")) {
            throw new RuntimeException("处理消息失败");
        }
        System.out.println("消息处理成功:" + message);
    }
}

在上面的代码中,我们使用

@RabbitListener

注解标记

receiveMessage()

方法,表示它是一个消息监听器,监听名为

fanout_queue_sms

的队列。当接收到消息时,会触发

receiveMessage()

方法进行处理。

processMessage()

方法中,我们模拟了业务处理的耗时,并通过手动抛出异常的方式触发消息重试机制。如果消息处理失败,将会抛出运行时异常,触发Spring Boot的重试机制。

调用示例代码:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class MessageApplication {

    public static void main(String[] args) {
        // 启动Spring Boot应用
        ConfigurableApplicationContext context = SpringApplication.run(MessageApplication.class, args);

        // 获取MessageProducer Bean
        MessageProducer producer = context.getBean(MessageProducer.class);

        // 发送消息
        producer.sendMessage("Hello, World!error");
    }
}

运行效果如下:
alt
本文由 mdnice 多平台发布

标签: 后端

本文转载自: https://blog.csdn.net/fgliu/article/details/131386890
版权归原作者 代码校园 所有, 如有侵权,请联系我们删除。

“高效处理消息:使用Spring Boot实现消息重试机制”的评论:

还没有评论