更多SpringBoot3内容请关注我的专栏:《SpringBoot3》
期待您的点赞👍收藏⭐评论✍
重学SpringBoot3-集成RocketMQ(一)
Spring Boot 3 与 RocketMQ 整合,可以通过 Spring Messaging 结合 RocketMQ 的
rocketmq-spring-boot-starter
实现。在这个整合过程中,RocketMQ 作为消息队列系统,Spring Boot 负责提供应用框架,整合可以让开发者更加便捷地使用 RocketMQ 的生产和消费功能。今天就先介绍下SpringBoot3整合RocketMQ5.x,并给出常见消息类型代码示例。
环境准备
- Spring Boot 3.x 项目
- RocketMQ 服务器:版本V5.3,包括
NameServer
和Broker
,可以本地搭建或者使用云服务,搭建部分后面单独出教程。 - RocketMQ 依赖:Spring Boot 与 RocketMQ 的整合依赖
rocketmq-spring-boot-starter
。
1. 配置项目依赖
在 Spring Boot 项目的
pom.xml
中添加 RocketMQ 相关依赖。
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version><!-- 或选择最新稳定版本 --></dependency>
2. 配置 RocketMQ 信息
在
application.yml
文件中配置 RocketMQ 的相关连接信息,包括
name-server
和其他基础配置。
2.1配置文件
rocketmq-spring-boot-starter 2.2.0(不含)以下版本
spring:rocketmq:name-server: localhost:9876# NameServer 地址,集群使用';'隔开producer:group: springboot-producer-group # 生产者组名称send-message-timeout:3000retry-times-when-send-failed:2retry-next-server:trueaccess-key: RocketMQ # 若启用了 ACL 功能secret-key:12345678# 若启用了 ACL 功能consumer:group: springboot-consumer-group # 消费者组名称topic: test-topic # 订阅的主题access-key: RocketMQ # 若启用了 ACL 功能secret-key:12345678# 若启用了 ACL 功能
rocketmq-spring-boot-starter 2.2.0及其以上版本:
rocketmq:name-server: localhost:9876# NameServer 地址,集群使用';'隔开producer:group: springboot-producer-group # 生产者组名称send-message-timeout:3000retry-times-when-send-failed:2retry-next-server:trueaccess-key: RocketMQ # 若启用了 ACL 功能secret-key:12345678# 若启用了 ACL 功能consumer:group: springboot-consumer-group # 消费者组名称topic: test-topic # 订阅的主题access-key: RocketMQ # 若启用了 ACL 功能secret-key:12345678# 若启用了 ACL 功能
2.2导入自动配置类
按照之前介绍的自动配置,想让 RocketMQ 配生效,需要在启动类上添加如下代码或单独写个配置类:
@Import(RocketMQAutoConfiguration.class)
否在会报错:A component required a bean of type ‘org.apache.rocketmq.spring.core.RocketMQTemplate’ that could not be found.
@SpringBootApplication@Import(RocketMQAutoConfiguration.class)publicclassSpringBoot308RocketmqApplication{publicstaticvoidmain(String[] args){SpringApplication.run(SpringBoot308RocketmqApplication.class, args);}}
2.3创建Topic
示例代码仅一本地一个服务,即一个生产者和消费者,只需选一个broker,否在有些消息将无法消费。
3. 生产者代码示例
在 Spring Boot 项目中创建一个生产者服务,可以作为工具类,使用 RocketMQ 发送消息。
importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.messaging.support.MessageBuilder;importorg.springframework.stereotype.Service;@ServicepublicclassRocketMQProducer{@ResourceprivateRocketMQTemplate rocketMQTemplate;// 发送简单消息publicvoidsendMessage(String topic,String message){
rocketMQTemplate.convertAndSend(topic, message);System.out.println("Message sent: "+ message);}}
3.1同步消息
同步发送消息是指,Producer 发出⼀条消息后,会在收到 MQ 返回的 ACK 之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。RocketMQ 同步消息的方法形如 syncXx()。
/**
* 同步类型消息
*
* @param topic
* @param message
*/publicvoidsendMessage(String topic,String message){
rocketMQTemplate.syncSend(topic, message);System.out.println("Message sent: "+ message);}
3.2 异步消息
异步发送消息是指,Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。RocketMQ 同步消息的方法形如 asyncXx()。
/**
* 异步类型消息
*
* @param topic
* @param message
*/publicvoidasyncSendMessage(String topic,String message){
rocketMQTemplate.asyncSend(topic, message,newSendCallback(){@OverridepublicvoidonSuccess(SendResult sendResult){System.out.println("Async message sent: "+ message);}@OverridepublicvoidonException(Throwable e){System.out.println("Async message error: "+ e);}});System.out.println("Message sent: "+ message);}
3.3 单向消息
单向发送消息是指,Producer 仅负责发送消息,不等待、不处理 MQ 的 ACK。该发送方式时 MQ 也不返回 ACK。该方式的消息发送效率最高,但消息可靠性较差。
/**
* 发送单向消息
*
* @param topic
* @param message
*/publicvoidsendOneWayMessage(String topic,String message){
rocketMQTemplate.sendOneWay(topic, message);System.out.println("One way message sent: "+ message);}
3.4顺序消息
顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。
/**
* 发送顺序消息
*/publicvoidsendOrderlyMessage(String topic,String message,String shardingKey){for(int i =0; i <10; i++){String orderlyMessage = message + i;
rocketMQTemplate.syncSendOrderly(topic, orderlyMessage, shardingKey);System.out.println("Orderly message sent: "+ orderlyMessage +" with shardingKey: "+ shardingKey);}}
3.5延时消息
当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级默认有18个,可以在broker.conf中增加配置,然后重启broker:
# 延时等级
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
代码很简单:
/**
* 发送延迟消息
*
* @param topic
* @param message
* @param delayLevel
*/publicvoidsendDelayedMessage(String topic,String message,int delayLevel){
rocketMQTemplate.syncSend(topic,MessageBuilder.withPayload(message).build(),3000, delayLevel);System.out.println("Delayed message sent: "+ message +" with delayLevel: "+ delayLevel);}
除此之外,RocketMQ 还支持事务消息、批量消息、消息过滤等,后面再详细介绍。
4. 消费者代码示例
使用
@RocketMQMessageListener
注解来订阅主题并监听消息的到达,处理消息的消费逻辑。
packagecom.example.boot308rocketmq;importorg.apache.rocketmq.spring.annotation.RocketMQMessageListener;importorg.apache.rocketmq.spring.core.RocketMQListener;importorg.springframework.stereotype.Service;/**
* @author CoderJia
* @create 2024/09/09 15:12
* @Description
**/@Service@RocketMQMessageListener(topic ="test-topic", consumerGroup ="springboot-consumer-group")publicclassRocketMQConsumerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(String message){// 处理接收到的消息System.out.println("Received message: "+ message);}}
5. 调用生产者发送消息
为了便于测试,创建一个简单的 Spring Boot Controller层代码,用于调用生产者发送消息。
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/packagecom.example.boot308rocketmq.controller;importcom.example.boot308rocketmq.RocketMQProducer;importjakarta.annotation.Resource;importorg.springframework.http.ResponseEntity;importorg.springframework.stereotype.Controller;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;/**
* @author CoderJia
* @create 2024/9/9 下午 15:08
* @Description
**/@ControllerpublicclassMessageController{@ResourceprivateRocketMQProducer rocketMQProducer;@GetMapping("/sendMessage")publicResponseEntity<String>sendMessage(@RequestParamString message){
rocketMQProducer.sendOneWayMessage("test-topic", message);returnResponseEntity.ok("Message sent: "+ message);}@GetMapping("/sendOrderlyMessage")publicResponseEntity<String>sendOrderlyMessage(@RequestParamString message){
rocketMQProducer.sendOrderlyMessage("test-topic", message,"orderKey");returnResponseEntity.ok("Message sent: "+ message);}@GetMapping("/sendDelayedMessage")publicResponseEntity<String>sendDelayedMessage(@RequestParamString message,@RequestParamint delayLevel){
rocketMQProducer.sendDelayedMessage("test-topic", message, delayLevel);returnResponseEntity.ok("Delayed message sent: "+ message +" with delayLevel: "+ delayLevel);}}
6. 启动项目并验证
- 启动 RocketMQ 的
NameServer
和Broker
。 - 启动 Spring Boot 项目。
- 打开浏览器或者使用 Postman 访问发送消息的接口:
普通消息:
http://localhost:8080/sendMessage?message=HelloRocketMQ
顺序消息:
http://localhost:8080/sendOrderlyMessage?message=HelloRocketMQ
延迟消息:
http://localhost:8080/sendDelayedMessage?message=HelloDelayedRocketMQ&delayLevel=3
7. 整合总结
- 生产者:通过
RocketMQTemplate
提供了发送消息的方法,包括同步消息、异步消息、顺序消息、延迟消息等。 - 消费者:使用
@RocketMQMessageListener
注解,能够便捷地监听指定主题并消费消息。 - 事务消息:RocketMQ 还支持事务消息,适合实现两阶段提交的事务模型,后面会着重介绍。
这种整合方式在 Spring Boot 3 中非常自然,并且
rocketmq-spring-boot-starter
进一步简化了配置和集成,使得开发者可以专注于业务逻辑的实现。
版权归原作者 CoderJia_ 所有, 如有侵权,请联系我们删除。