0


重学SpringBoot3-集成RocketMQ(一)

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》
期待您的点赞👍收藏⭐评论✍

重学SpringBoot3-集成RocketMQ(一)

Spring Boot 3 与 RocketMQ 整合,可以通过 Spring Messaging 结合 RocketMQ 的

rocketmq-spring-boot-starter

实现。在这个整合过程中,RocketMQ 作为消息队列系统,Spring Boot 负责提供应用框架,整合可以让开发者更加便捷地使用 RocketMQ 的生产和消费功能。今天就先介绍下SpringBoot3整合RocketMQ5.x,并给出常见消息类型代码示例。

环境准备

  • Spring Boot 3.x 项目
  • RocketMQ 服务器:版本V5.3,包括 NameServerBroker,可以本地搭建或者使用云服务,搭建部分后面单独出教程。
  • RocketMQ 依赖:Spring Boot 与 RocketMQ 的整合依赖 rocketmq-spring-boot-starter

1. 配置项目依赖

在 Spring Boot 项目的

pom.xml

中添加 RocketMQ 相关依赖。

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version><!-- 或选择最新稳定版本 --></dependency>

2. 配置 RocketMQ 信息

application.yml

文件中配置 RocketMQ 的相关连接信息,包括

name-server

和其他基础配置。

2.1配置文件

rocketmq-spring-boot-starter 2.2.0(不含)以下版本

spring:rocketmq:name-server: localhost:9876# NameServer 地址,集群使用';'隔开producer:group: springboot-producer-group  # 生产者组名称send-message-timeout:3000retry-times-when-send-failed:2retry-next-server:trueaccess-key: RocketMQ    # 若启用了 ACL 功能secret-key:12345678# 若启用了 ACL 功能consumer:group: springboot-consumer-group  # 消费者组名称topic: test-topic  # 订阅的主题access-key: RocketMQ    # 若启用了 ACL 功能secret-key:12345678# 若启用了 ACL 功能

rocketmq-spring-boot-starter 2.2.0及其以上版本

rocketmq:name-server: localhost:9876# NameServer 地址,集群使用';'隔开producer:group: springboot-producer-group  # 生产者组名称send-message-timeout:3000retry-times-when-send-failed:2retry-next-server:trueaccess-key: RocketMQ    # 若启用了 ACL 功能secret-key:12345678# 若启用了 ACL 功能consumer:group: springboot-consumer-group  # 消费者组名称topic: test-topic  # 订阅的主题access-key: RocketMQ    # 若启用了 ACL 功能secret-key:12345678# 若启用了 ACL 功能

2.2导入自动配置类

按照之前介绍的自动配置,想让 RocketMQ 配生效,需要在启动类上添加如下代码或单独写个配置类:

@Import(RocketMQAutoConfiguration.class)

否在会报错:A component required a bean of type ‘org.apache.rocketmq.spring.core.RocketMQTemplate’ that could not be found.

@SpringBootApplication@Import(RocketMQAutoConfiguration.class)publicclassSpringBoot308RocketmqApplication{publicstaticvoidmain(String[] args){SpringApplication.run(SpringBoot308RocketmqApplication.class, args);}}

导入自动配置类

2.3创建Topic

示例代码仅一本地一个服务,即一个生产者和消费者,只需选一个broker,否在有些消息将无法消费。

创建Topic

3. 生产者代码示例

在 Spring Boot 项目中创建一个生产者服务,可以作为工具类,使用 RocketMQ 发送消息。

importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.messaging.support.MessageBuilder;importorg.springframework.stereotype.Service;@ServicepublicclassRocketMQProducer{@ResourceprivateRocketMQTemplate rocketMQTemplate;// 发送简单消息publicvoidsendMessage(String topic,String message){
        rocketMQTemplate.convertAndSend(topic, message);System.out.println("Message sent: "+ message);}}

3.1同步消息

同步发送消息是指,Producer 发出⼀条消息后,会在收到 MQ 返回的 ACK 之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。RocketMQ 同步消息的方法形如 syncXx()。

/**
     * 同步类型消息
     *
     * @param topic
     * @param message
     */publicvoidsendMessage(String topic,String message){
        rocketMQTemplate.syncSend(topic, message);System.out.println("Message sent: "+ message);}

同步消息

3.2 异步消息

异步发送消息是指,Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。RocketMQ 同步消息的方法形如 asyncXx()。

/**
 * 异步类型消息
 *
 * @param topic
 * @param message
 */publicvoidasyncSendMessage(String topic,String message){
    rocketMQTemplate.asyncSend(topic, message,newSendCallback(){@OverridepublicvoidonSuccess(SendResult sendResult){System.out.println("Async message sent: "+ message);}@OverridepublicvoidonException(Throwable e){System.out.println("Async message error: "+ e);}});System.out.println("Message sent: "+ message);}

异步消息

3.3 单向消息

单向发送消息是指,Producer 仅负责发送消息,不等待、不处理 MQ 的 ACK。该发送方式时 MQ 也不返回 ACK。该方式的消息发送效率最高,但消息可靠性较差。

/**
     * 发送单向消息 
     *
     * @param topic
     * @param message
     */publicvoidsendOneWayMessage(String topic,String message){
        rocketMQTemplate.sendOneWay(topic, message);System.out.println("One way message sent: "+ message);}

单向消息

3.4顺序消息

顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。

/**
     * 发送顺序消息
     */publicvoidsendOrderlyMessage(String topic,String message,String shardingKey){for(int i =0; i <10; i++){String orderlyMessage = message + i;
            rocketMQTemplate.syncSendOrderly(topic, orderlyMessage, shardingKey);System.out.println("Orderly message sent: "+ orderlyMessage +" with shardingKey: "+ shardingKey);}}

顺序消息

3.5延时消息

当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级默认有18个,可以在broker.conf中增加配置,然后重启broker:

# 延时等级
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

代码很简单:

/**
     * 发送延迟消息
     *
     * @param topic
     * @param message
     * @param delayLevel
     */publicvoidsendDelayedMessage(String topic,String message,int delayLevel){
        rocketMQTemplate.syncSend(topic,MessageBuilder.withPayload(message).build(),3000, delayLevel);System.out.println("Delayed message sent: "+ message +" with delayLevel: "+ delayLevel);}

延时消息

除此之外,RocketMQ 还支持事务消息、批量消息、消息过滤等,后面再详细介绍。

4. 消费者代码示例

使用

@RocketMQMessageListener

注解来订阅主题并监听消息的到达,处理消息的消费逻辑。

packagecom.example.boot308rocketmq;importorg.apache.rocketmq.spring.annotation.RocketMQMessageListener;importorg.apache.rocketmq.spring.core.RocketMQListener;importorg.springframework.stereotype.Service;/**
 * @author CoderJia
 * @create 2024/09/09 15:12
 * @Description
 **/@Service@RocketMQMessageListener(topic ="test-topic", consumerGroup ="springboot-consumer-group")publicclassRocketMQConsumerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(String message){// 处理接收到的消息System.out.println("Received message: "+ message);}}

5. 调用生产者发送消息

为了便于测试,创建一个简单的 Spring Boot Controller层代码,用于调用生产者发送消息。

/*
 * Copyright 2013-2018 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */packagecom.example.boot308rocketmq.controller;importcom.example.boot308rocketmq.RocketMQProducer;importjakarta.annotation.Resource;importorg.springframework.http.ResponseEntity;importorg.springframework.stereotype.Controller;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;/**
 * @author CoderJia
 * @create 2024/9/9 下午 15:08
 * @Description
 **/@ControllerpublicclassMessageController{@ResourceprivateRocketMQProducer rocketMQProducer;@GetMapping("/sendMessage")publicResponseEntity<String>sendMessage(@RequestParamString message){
        rocketMQProducer.sendOneWayMessage("test-topic", message);returnResponseEntity.ok("Message sent: "+ message);}@GetMapping("/sendOrderlyMessage")publicResponseEntity<String>sendOrderlyMessage(@RequestParamString message){
        rocketMQProducer.sendOrderlyMessage("test-topic", message,"orderKey");returnResponseEntity.ok("Message sent: "+ message);}@GetMapping("/sendDelayedMessage")publicResponseEntity<String>sendDelayedMessage(@RequestParamString message,@RequestParamint delayLevel){
        rocketMQProducer.sendDelayedMessage("test-topic", message, delayLevel);returnResponseEntity.ok("Delayed message sent: "+ message +" with delayLevel: "+ delayLevel);}}

6. 启动项目并验证

  1. 启动 RocketMQ 的 NameServerBroker
  2. 启动 Spring Boot 项目。
  3. 打开浏览器或者使用 Postman 访问发送消息的接口:

普通消息

http://localhost:8080/sendMessage?message=HelloRocketMQ

顺序消息

http://localhost:8080/sendOrderlyMessage?message=HelloRocketMQ

延迟消息

http://localhost:8080/sendDelayedMessage?message=HelloDelayedRocketMQ&delayLevel=3

7. 整合总结

  • 生产者:通过 RocketMQTemplate 提供了发送消息的方法,包括同步消息、异步消息、顺序消息、延迟消息等。
  • 消费者:使用 @RocketMQMessageListener 注解,能够便捷地监听指定主题并消费消息。
  • 事务消息:RocketMQ 还支持事务消息,适合实现两阶段提交的事务模型,后面会着重介绍。

这种整合方式在 Spring Boot 3 中非常自然,并且

rocketmq-spring-boot-starter

进一步简化了配置和集成,使得开发者可以专注于业务逻辑的实现。


本文转载自: https://blog.csdn.net/u014390502/article/details/142095806
版权归原作者 CoderJia_ 所有, 如有侵权,请联系我们删除。

“重学SpringBoot3-集成RocketMQ(一)”的评论:

还没有评论