0


5、Spring Boot 3.x 集成 RabbitMQ

一、前言

本篇主要是围绕着 Spring Boot 3.x 与 RabbitMQ 的集成,这边文章比较简单,RabbitMQ 的集成没有太大的变化,这篇文章主要是为了后续的 RabbitMQ 的动态配置做铺垫。
1、Docker 安装 RabbitMQ
2、Spring Boot 3.x 集成 RabbitMQ

二、Docker 安装 RabbitMQ

1、创建docker-network

# 创建docker网络,方便后续连通多个容器
docker network create local-net

2、拉取镜像并启动容器

# 搜索 rabbitmq 相关镜像
docker search rabbitmq
# 指定版本拉取 rabbitmq 镜像
docker pull rabbitmq:3-management
# 查看本地镜像
docker images

# 启动容器命令
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v /Users/kenny/docker/rabbitmq/data:/var/lib/rabbitmq/mnesia --network=local-net rabbitmq:3-management

# 查看启动容器
docker ps# 查看启动日志
docker logs -f rabbitmq

# 浏览器进入rabbitmq控制台# http://localhost:15672# 默认账号: guest# 默认密码: guest

三、Spring Boot 3.x 集成 RabbitMQ

1、pom.xml

<!-- RabbitMQ --><!-- Spring Boot RabbitMQ Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、application.yml

spring:# RabbitMQ配置rabbitmq:host: localhost
    port:5672username: guest
    password: guest
    listener:simple:# 手动确认模式acknowledge-mode: manual
        retry:# 开启重试enabled:true# 最大重试次数max-attempts:5# 首次重试时间间隔initial-interval:1000# 重试时间间隔递增max-interval:10000

3、初始化Exchange、Queue

RabbitFountConfig.java
importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * @program: chain
 * @description: RabbitMQ Fount 配置类
 * @author: Kenny.Qiu
 * @create: 2024/10/12 08:59
 */@ConfigurationpublicclassRabbitFountConfig{// exchangepublicfinalstaticString DEFAULT_EXCHANGE ="exchange.fount";// queuepublicfinalstaticString DEFAULT_QUEUE ="queue.fount";// routing keypublicfinalstaticString DEFAULT_ROUTING_KEY ="routing.key.fount";/**
     * 声明注册 fanout 模式的交换机
     *
     * @return 交换机
     */@BeanpublicFanoutExchangedefalutFanoutExchange(){// durable:是否持久化,默认是false// autoDelete:是否自动删除returnnewFanoutExchange(DEFAULT_EXCHANGE,true,false);}/**
     * 声明队列
     *
     * @return Queue
     */@BeanpublicQueuedefaultQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除returnnewQueue(DEFAULT_QUEUE,true);}/**
     * 声明绑定交换机与队列
     *
     * @return Binding
     */@BeanpublicBindingdefaultBinding(){returnBindingBuilder.bind(defaultQueue()).to(defalutFanoutExchange());}}

4、接收者

DefaultDirectReceiveQueueService
importcom.chain.air.rpp.exchange.config.rabbit.RabbitDirectConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/**
 * @program: chain
 * @description: 默认 direct 队列接收组件
 * @author: Kenny.Qiu
 * @create: 2024/10/12 10:04
 */@Slf4j@Component@RabbitListener(queues =RabbitDirectConfig.DEFAULT_QUEUE)publicclassDefaultDirectReceiveQueueService{/**
     * 默认 direct 队列接收消息
     *
     * @param message 消息内容
     */@RabbitHandlerpublicvoidmessageReceive(String message){
        log.info("默认 direct 队列接收消息:{}", message);}}

5、发送者

RabbitProducerService
importjakarta.annotation.Resource;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Service;/**
 * @className: RabbitProducerService
 * @program: chain
 * @description: RabbitMQ 生产者 Service 组件
 * @author: kenny
 * @create: 2024-10-04 01:11
 * @version: 1.0.0
 */@Slf4j@ServicepublicclassRabbitProducerService{@ResourceprivateRabbitTemplate rabbitTemplate;/**
     * 向动态创建的队列发送消息
     *
     * @param queueName 队列名称
     * @param message   消息内容
     */publicvoidsendMessageToQueue(String queueName,String message){
        log.info("向队列:{},发送消息:{}", queueName, message);
        rabbitTemplate.convertAndSend(queueName, message);}/**
     * 向动态创建的交换机发送消息
     *
     * @param exchangeName 交换机名称
     * @param routingKey   路由键
     * @param message      消息内容
     */publicvoidsendMessageToExchange(String exchangeName,String routingKey,String message){
        log.info("向交换机:{},路由键:{},发送消息:{}", exchangeName, routingKey, message);
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);}}

6、Controller

RabbitController.java
importcom.chain.air.rpp.exchange.rabbit.RabbitProducerService;importjakarta.annotation.Resource;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;/**
 * @className: RabbitController
 * @program: chain
 * @description: RabbitMQ Controller组件
 * @author: kenny
 * @create: 2024-10-03 22:02
 * @version: 1.0.0
 */@RestController@RequestMapping("/rabbit")publicclassRabbitController{/**
     * RabbitMQ 生产者 Service 组件
     */@ResourceprivateRabbitProducerService rabbitProducerService;/**
     * 发送消息到指定的队列
     *
     * @param queueName 队列名称
     * @param message   消息内容
     * @return 处理结果
     */@RequestMapping("/send")publicStringsendMessage(@RequestParamString queueName,@RequestParamString message){
        rabbitProducerService.sendMessageToQueue(queueName, message);return"向队列:"+ queueName +",发送消息:"+ message;}/**
     * 发送消息到指定的交换机
     *
     * @param exchangeName 交换机名称
     * @param routingKey   路由键
     * @param message      消息内容
     * @return 处理结果
     */@RequestMapping("/send/exchange")publicStringsendMessageToExchange(@RequestParamString exchangeName,@RequestParamString routingKey,@RequestParamString message){
        rabbitProducerService.sendMessageToExchange(exchangeName, routingKey, message);return"向交换机:"+ exchangeName +",发送消息:"+ message;}}

四、测试

1、启动服务

2、查看RabbitMQ控制台,Exchange、Queue是否完成创建

3、通过接口发送消息

4、通过RabbitMQ控制台,往Exchange、Queue分别发送消息,服务是否接收到

下一篇:6、Spring Boot 3.x集成RabbitMQ动态交换机、队列


本文转载自: https://blog.csdn.net/weixin_44224292/article/details/142843817
版权归原作者 Kenny.志 所有, 如有侵权,请联系我们删除。

“5、Spring Boot 3.x 集成 RabbitMQ”的评论:

还没有评论