0


Spring Boot整合RabbitMQ之发布与订阅模式(fanout)

RabbitMQ的模式中,常用的模式有:简单模式,发布与订阅模式,工作模式,路由模式,主题模式。简单模式由于不太会运用到工作中,所以不准备记录了,今天记录下发布订阅模式和springboot的整合过程。废话不多说,直接开始。

1. 创建RabbitMQ的生产者

创建一个springboot项目,项目创建强烈推荐使用阿里云的springboot脚手架URL。地址:https://start.aliyun.com/。项目结构如下:

然后进行rabbitMq的整合过程

1.1 引入rabbitmq的jar包

在项目的pom.xml中引入rabbitmq的jar包,详情如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.3.12.RELEASE</version>
</dependency>

1.2 配置文件中添加配置

在项目的配置文件中添加rabbitmq的相关配置,配置详情如下:

server:
  port: 10001

# rabbitMq 相关配置
spring:
  application:
    name: springboot-rabbitmq-s1
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin

1.3 创建配置类

配置类用于将队列和交换机进行绑定,该操作也可以使用rabbitmq的管理界面操作,并不是一定需要的步骤。配置类详情如下:

package com.study.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author alen
 * @DATE 2022/6/7 23:50
 */
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "fanout-order-exchange";
    public static final String SMS_QUEUE = "sms-fanout-queue";
    public static final String EMAIL_QUEUE = "email-fanout-queue";
    public static final String WECHAT_QUEUE = "wechat-fanout-queue";

    /**
     * 1.
     * 声明交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        /**
         * FanoutExchange的参数说明:
         * 1. 交换机名称
         * 2. 是否持久化 true:持久化,交换机一直保留 false:不持久化,用完就删除
         * 3. 是否自动删除 false:不自动删除 true:自动删除
         */
        return new FanoutExchange(EXCHANGE_NAME, true, false);
    }

    /**
     * 2.
     * 声明队列
     * @return
     */
    @Bean
    public Queue smsQueue() {
        /**
         * Queue构造函数参数说明
         * 1. 队列名
         * 2. 是否持久化 true:持久化 false:不持久化
         */
        return new Queue(SMS_QUEUE, true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue(EMAIL_QUEUE, true);
    }

    @Bean
    public Queue wechatQueue() {
        return new Queue(WECHAT_QUEUE, true);
    }

    /**
     * 3.
     * 队列与交换机绑定
     */
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding wechatBinding() {
        return BindingBuilder.bind(wechatQueue()).to(fanoutExchange());
    }
}

1.4 模拟发送消息

创建一个service类,在类中进行rabbitMq消息的发送,源码如下:

package com.study.rabbitmq.service;

import cn.hutool.json.JSONUtil;
import com.study.rabbitmq.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Author alen
 * @DATE 2022/6/7 23:31
 */
@Service
@Slf4j
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void createOrder(Order order) {
        String body = JSONUtil.toJsonStr(order);
        log.info("订单信息:{}", body);
        //交换机名称
        String exchangeName = "fanout-order-exchange";
        //路由key 由于我们实现的是fanout模式(广播模式),不需要路由key,所有的消费者都可以进行监听和消费
        String routeKey = "";
        //发送mq消息
        rabbitTemplate.convertAndSend(exchangeName, routeKey, body);
        log.info("rabbitmq发送广播模式消息成功。。。");
    }
}

使用单元测试模拟消息发送,单元测试详情如下:

package com.study.rabbitmq;

import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;

@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void contextLoads() {
        for (long i = 1; i < 50; i++) {
            Order order = new Order();
            order.setRequestId(i);
            order.setUserId(i);
            order.setOrderNo(UUID.randomUUID().toString());
            order.setAmount(10L);
            order.setGoodsNum(1);
            order.setTotalAmount(10L);
            orderService.createOrder(order);
        }
    }
}

发送完后,我们可以在rabbitMq的管理后台看到已经发送成功的消息,效果如下:

可见消息已经全部发送完毕,因为前面的三个队列都是绑定在同一个交换机上,所以三个队列都会收到消息。

2. 创建RabbitMQ的消费者

创建消费者服务S2,项目结构参考生产者项目结构,然后进行消息消费的相关代码的实现,实现过程如下

2.1 引入RabbitMQ的jar包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.3.12.RELEASE</version>
</dependency>

2.2 在项目配置文件中添加配置

配置详情如下

server:
  port: 10002

# rabbitmq 相关配置
spring:
  application:
    name: springboot-rabbitmq-s2
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin

2.3 创建MQ消息消费者

消费者类详情如下

package com.study.rabbitmq.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @Author alen
 * @DATE 2022/6/8 8:15
 */
@Slf4j
@Service
@RabbitListener(queues = {"email-fanout-queue"}) //监听队列
public class FanoutEmailConsumer {

    @RabbitHandler
    public void emailMessage(String message) {
        log.info("Email fanout --接收到消息:{}", message);
    }
}

启动消费者项目,消费效果如下:

登录rabbitMq后台查看队列的消息情况如下

到此,似乎感觉整合得很顺利,没啥毛病。但是实际的运用中,以上演示过程中忽略了两个很重要的问题,一是我如何知道消息被顺利的发送到了队列,因为实际的工作中,不大可能每个消息都去rabbitmq管理后台查看。二是如果消息在消费的过程中出现了异常导致消息丢失,不重要的数据还好,如果是支付类的消息呢?就会产生严重的线上问题。那么这两个问题需要怎么处理呢?其实rabbitmq提供了消息发送结果回调和消息消费手动确认来处理这两个问题。

3. 消息发送回调

消息发送的过程分为两步,第一步是生产者将消息发送给交换机,第二步是交换机将消息发送到队列中,这两个步骤都可能发生发送失败的情况,rabbitmq提供了两种回调机制。分别是ConfirmCallback回调和ReturnCallback回调。实现方式如下:

3.1 添加配置信息

配置信息中添加开启confirm确认机制和return确认机制,配置详情如下:

server:
  port: 10001

spring:
  application:
    name: springboot-rabbitmq-s1
  rabbitmq:
    host: 124.223.41.158
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    # 发送者开启 return 确认机制
    publisher-returns: true
    # 发送者开启 confirm 确认机制
    publisher-confirm-type: correlated

3.2 配置类中注入RabbitTemplate

在创建配置类RabbitMQConfig中将自定义的RabbitTemplate对象注入bean容器中,详情如下:

package com.study.rabbitmq.config;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author alen
 * @DATE 2022/6/7 23:50
 */
@Slf4j
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "fanout-order-exchange";
    public static final String SMS_QUEUE = "sms-fanout-queue";
    public static final String EMAIL_QUEUE = "email-fanout-queue";
    public static final String WECHAT_QUEUE = "wechat-fanout-queue";

    /**
     * 1.
     * 声明交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        /**
         * FanoutExchange的参数说明:
         * 1. 交换机名称
         * 2. 是否持久化 true:持久化,交换机一直保留 false:不持久化,用完就删除
         * 3. 是否自动删除 false:不自动删除 true:自动删除
         */
        return new FanoutExchange(EXCHANGE_NAME, true, false);
    }

    /**
     * 2.
     * 声明队列
     * @return
     */
    @Bean
    public Queue smsQueue() {
        /**
         * Queue构造函数参数说明
         * 1. 队列名
         * 2. 是否持久化 true:持久化 false:不持久化
         */
        return new Queue(SMS_QUEUE, true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue(EMAIL_QUEUE, true);
    }

    @Bean
    public Queue wechatQueue() {
        return new Queue(WECHAT_QUEUE, true);
    }

    /**
     * 3.
     * 队列与交换机绑定
     */
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding wechatBinding() {
        return BindingBuilder.bind(wechatQueue()).to(fanoutExchange());
    }

    /**
     * 将自定义的RabbitTemplate对象注入bean容器
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启回调
        rabbitTemplate.setMandatory(true);
        //设置ConfirmCallback回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("回调数据:{}", correlationData);
                log.info("确认结果:{}", ack);
                log.info("返回原因:{}", cause);
            }
        });
        //设置ReturnCallback回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("发送消息:{}", JSONUtil.toJsonStr(message));
                log.info("结果状态码:{}", replyCode);
                log.info("结果状态信息:{}", replyText);
                log.info("交换机:{}", exchange);
                log.info("路由key:{}", routingKey);
            }
        });
        return rabbitTemplate;
    }
}

其他和自动确认的保持不变,消息发送方的改动就都在这里了,我们发一个消息看看效果,效果图示:

** 4. 消费者手动确认**

4.1 添加配置信息

在配置文件中添加手动确认的配置,详情如下:

server:
  port: 10002

spring:
  application:
    name: springboot-rabbitmq-s2
  rabbitmq:
    host: 124.223.41.158
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    listener:
      simple:
        # 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 auto
        acknowledge-mode: manual

4.2 消费手动确认

在消息消费的处理过程中,手动确认消息,代码详情如下:

package com.study.rabbitmq.service;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author alen
 * @DATE 2022/6/8 8:15
 */
@Slf4j
@Service
@RabbitListener(queues = {"email-fanout-queue"}) //监听队列
public class FanoutEmailConsumer {

    @RabbitHandler
    public void emailMessage(String msg, Channel channel, Message message) throws IOException {
        try {
            log.info("Email fanout --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收...");
                //basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息即将再次返回队列处理...");
                // basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

消费效果如下:

以上是springboot整合rabbitmq的全部过程,整个过程中,有自动确认的演示和将自动确认改为手动确认的过程演示。欢迎讨论!


本文转载自: https://blog.csdn.net/cssweb_sh/article/details/125194004
版权归原作者 ~小爷. 所有, 如有侵权,请联系我们删除。

“Spring Boot整合RabbitMQ之发布与订阅模式(fanout)”的评论:

还没有评论