0


RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ

文章目录

添加 RocketMQ 依赖

  1. 在 Maven 仓库【https://mvnrepository.com/】中搜索 RocketMQ 依赖:image-20230527214713414
  2. 在 SpringBoot 项目的 Pom.xml 文件中添加对应 MQ 版本的依赖:<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency>

消费者 Consumer

YAML 配置

在 SpringBoot 项目的 yml 配置文件中添加以下配置:

rocketmq:name-server: 192.168.68.121:9876# rocketMq的nameServer地址

创建监听器

创建一个 MQMsgListener 类用于监听 RocketMQ 的消息,类上标注注解:

@Component

@RocketMQMessageListener

,该类需要实现 RocketMQListener 接口,并使用泛型指定接收的消息类型:

@Component@RocketMQMessageListener(topic ="delayTopic",consumerGroup="boot-mq-group-consumer")publicclassMQMsgListenerimplementsRocketMQListener<MessageExt>{@OverridepublicvoidonMessage(MessageExt message){String msgId = message.getMsgId();String msg =newString(message.getBody());System.out.println("消息id:"+msgId+"消息内容:"+msg);}}
@RocketMQMessageListener

注解参数如下:
参数描述topic消费者订阅的主题consumerGroup消费者组consumeMode消费模式:并发接收消息 | 有序接收消息【

ConsumeMode.CONCURRENTLY

or

ConsumeMode.ORDERLY

】messageModel消息模式:集群模式 | 广播模式【

MessageModel.CLUSTERING

or

MessageModel.BROADCASTING

】selectorType过滤消息的方式:Tag | SQL92【

SelectorType.TAG

or

SelectorType.SQL92

】selectorExpression过滤消息的表达式:Tag | SQL92【`tag1maxReconsumeTimes消息消费失败后,可被重复投递的最大次数。消息重试只针对集群消费模式生效。delayLevelWhenNextConsume并发模式的消息重试策略。-1,无需重试,直接放入死信队列(%DLQ%+消费组)

消息过滤

Tag 过滤

消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。

编写并启动消费者项目订阅 tagTopic 主题:

@Component@RocketMQMessageListener(topic ="tagTopic",
        consumerGroup ="boot-mq-group-consumer",
        selectorType =SelectorType.TAG,
        selectorExpression ="java")publicclassMQMsgListenerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(String message){System.out.println(message);}}

编写生产者 Controller,使用 RocketMQTemplate 的

syncSend()

方法发送一个带 Tag 的同步消息:

@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/tag")publicStringsendSyncMessage(){SendResult result = rocketMQTemplate.syncSend("tagTopic:java","这是一个带有 java tag 的消息");return"发送状态:"+ result.getSendStatus()+"<br>消息id:"+ result.getMsgId();}}

运行项目,访问接口:http://localhost:8080/send/tag

image-20230528191958989

查看 RocketMQ 控制台,可以看到消息带有 java tag:

image-20230528191938535

查看消费者项目的 IDEA 控制台:

image-20230528191142421

生产者 Producer

YAML 配置

在 SpringBoot 项目的 yml 配置文件中添加以下配置:

rocketmq:name-server: 192.168.68.121:9876# rocketMq的nameServer地址producer:group: boot-mq-group-producer # 生产者组名

注:生产者需要标注生产者组名,否则会报异常:

'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.

发送同步消息

编写 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息,并将消息发送的结果进行打印:

@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/sync/{msg}")publicStringsendSyncMessage(@PathVariableString msg){SendResult result = rocketMQTemplate.syncSend("syncTopic", msg);return"发送状态:"+result.getSendStatus()+"<br>消息id:"+result.getMsgId();}}

运行项目,访问接口:http://localhost:8080/send/sync/同步消息

image-20230527231022909

访问控制台,查看【syncTopic】主题,可以看到队列中存在一条消息:

image-20230527231142472

发送异步消息

不同于同步消息,异步消息在发出后,并不会等待服务端返回响应,直接继续向下执行,发送方通过回调接口接收服务端响应,并处理响应结果。

编写 Controller,使用 RocketMQTemplate 的

asyncSend()

方法发送异步消息,并使用回调接口打印发送的结果:

@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/async/{msg}")publicStringsendAsyncMessage(@PathVariableString msg){
        rocketMQTemplate.asyncSend("asyncTopic", msg,newSendCallback(){@OverridepublicvoidonSuccess(SendResult sendResult){System.out.println("异步消息发送成功");}@OverridepublicvoidonException(Throwable throwable){System.out.println("异步消息发送失败");}});System.out.println("异步消息已发送完成");return"发送异步消息";}}

运行项目,访问接口:http://localhost:8080/send/async/异步消息,查看 IDEA 控制台:

image-20230527232838438

访问控制台,查看【asyncTopic】主题,可以看到队列中存在一条消息:

image-20230527233249499

发送单向消息

编写 Controller,使用 RocketMQTemplate 的

sendOneWay()

方法发送单向消息:

@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/oneWay/{msg}")publicStringsendOneWayMessage(@PathVariableString msg){
        rocketMQTemplate.sendOneWay("oneWayTopic",msg);return"单向消息发送成功";}}

运行项目,访问接口:http://localhost:8080/send/oneWay/单向消息

image-20230527233640217

访问控制台,查看【oneWayTopic】主题,可以看到队列中存在一条消息:

image-20230527233751658

发送延迟消息

编写并启动消费者项目订阅 delayTopic 主题:

@Component@RocketMQMessageListener(topic ="delayTopic",consumerGroup="boot-mq-group-consumer")publicclassMQMsgListenerimplementsRocketMQListener<MessageExt>{@OverridepublicvoidonMessage(MessageExt message){String msgId = message.getMsgId();String msg =newString(message.getBody());System.out.println("消息id:"+msgId+"\n消息内容:"+msg+"\n消息收到时间:"+newDate());}}

编写生产者 Controller,使用 RocketMQTemplate 的

syncSend()

方法发送同步消息:

@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/delay/{msg}")publicStringsendDelayMessage(@PathVariableString msg){Message<String> message =MessageBuilder.withPayload(msg).build();// 延迟级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"SendResult result = rocketMQTemplate.syncSend("delayTopic", message,2000,3);return"发送状态:"+ result.getSendStatus()+"<br>消息id:"+ result.getMsgId()+"<br>消息发送时间:"+newDate();}}

运行项目,访问接口:http://localhost:8080/send/delay/延迟消息

image-20230528141811562

查看消费者项目的 IDEA 控制台,可以看到过去了10s,对应我们设置的延迟级别。

image-20230528141834080

发送顺序消息

编写订单类,用于模拟【下订单->发短信->物流->签收】的顺序流程:

publicclassOrder{//订单号privateString orderId;//订单名称privateString orderName;//订单的流程顺序privateString seq;}

编写并启动两个消费者项目订阅 orderlyTopic 主题,并将消费模式设置为顺序消费模式:

@Component@RocketMQMessageListener(topic ="orderlyTopic",
        consumerGroup="boot-mq-group-consumer",
        consumeMode =ConsumeMode.ORDERLY)publicclassMQMsgListenerimplementsRocketMQListener<Order>{@OverridepublicvoidonMessage(Order message){System.out.println("消费者:"+message);}}

编写生产者 Controller,使用 RocketMQTemplate 的

syncSendOrderly()

方法发送同步顺序消息:

@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/orderly")publicStringsendOrderlyMessage(){List<Order> orders =Arrays.asList(newOrder(UUID.randomUUID().toString(),"下订单","1"),newOrder(UUID.randomUUID().toString(),"发短信","1"),newOrder(UUID.randomUUID().toString(),"物流","1"),newOrder(UUID.randomUUID().toString(),"签收","1"),newOrder(UUID.randomUUID().toString(),"下订单","2"),newOrder(UUID.randomUUID().toString(),"发短信","2"),newOrder(UUID.randomUUID().toString(),"物流","2"),newOrder(UUID.randomUUID().toString(),"签收","2"));//控制流程:下订单->发短信->物流->签收//将 seq 作为 hashKey,这样 seq 相同的会放在同一个队列里面,顺序消费
        orders.forEach(order ->{
            rocketMQTemplate.syncSendOrderly("orderlyTopic",order,order.getSeq());});return"发送成功";}}

运行项目,访问接口:http:localhost:8080/send/orderly

image-20230528152807514

查看 RocketMQ 控制台,可以看到我们的消息分别存储在两个队列中:

image-20230528152925141

查看消费者项目的 IDEA 控制台,按照消息的顺序进行消费:

image-20230528152848032

发送批量消息

编写并启动消费者项目订阅 batchOrderly 主题:

@Component@RocketMQMessageListener(topic ="batchOrderly",
        consumerGroup="boot-mq-group-consumer")publicclassMQMsgListenerimplementsRocketMQListener<Order>{@OverridepublicvoidonMessage(Order message){System.out.println(Thread.currentThread().getName()+":"+message);}}

编写生产者 Controller,将消息打包成

Collection<Message> msgs

传入

syncSend()

方法中发送:

@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/batch")publicStringsendOrderlyMessage(){List<Message> messages =Arrays.asList(MessageBuilder.withPayload(newOrder(UUID.randomUUID().toString(),"下订单","1")).build(),MessageBuilder.withPayload(newOrder(UUID.randomUUID().toString(),"下订单","1")).build(),MessageBuilder.withPayload(newOrder(UUID.randomUUID().toString(),"下订单","1")).build(),MessageBuilder.withPayload(newOrder(UUID.randomUUID().toString(),"下订单","1")).build());return rocketMQTemplate.syncSend("batchOrderly", messages).getSendStatus().toString();}}

运行项目,访问接口:http:localhost:8080/send/batch

image-20230528161620859

查看 RocketMQ 控制台,可以看到队列中一次传入4条消息:

image-20230528161706194

查看消费者项目的 IDEA 控制台,多个线程并发进行消费:

image-20230528161804943

发送集合消息

编写并启动消费者项目订阅 listTopic 主题:

@Component@RocketMQMessageListener(topic ="listTopic",
        consumerGroup="boot-mq-group-consumer")publicclassMQMsgListenerimplementsRocketMQListener<List<Order>>{@OverridepublicvoidonMessage(List<Order> orders){
        orders.forEach(o ->{System.out.println(Thread.currentThread().getName()+":"+o);});}}

编写生产者 Controller,将集合传入

syncSend()

方法中发送:

@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/list")publicStringsendOrderlyMessage(){List<Order> orders =Arrays.asList(newOrder(UUID.randomUUID().toString(),"下订单","1"),newOrder(UUID.randomUUID().toString(),"下订单","1"),newOrder(UUID.randomUUID().toString(),"下订单","1"),newOrder(UUID.randomUUID().toString(),"下订单","1"));
    rocketMQTemplate.syncSend("listTopic",orders);return"发送成功";}}

运行项目,访问接口:http:localhost:8080/send/list

image-20230528161620859

查看 RocketMQ 控制台,可以看到队列中一条消息:

image-20230528163701846

查看消费者项目的 IDEA 控制台,进行消费:

image-20230528163745691


本文转载自: https://blog.csdn.net/qq_20185737/article/details/130915401
版权归原作者 I'm Jie 所有, 如有侵权,请联系我们删除。

“RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ”的评论:

还没有评论