文章目录
添加 RocketMQ 依赖
- 在 Maven 仓库【https://mvnrepository.com/】中搜索 RocketMQ 依赖:
- 在 SpringBoot 项目的 Pom.xml 文件中添加对应 MQ 版本的依赖:
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency>
消费者 Consumer
YAML 配置
在 SpringBoot 项目的 yml 配置文件中添加以下配置:
rocketmq:name-server: 192.168.68.121:9876# rocketMq的nameServer地址
创建监听器
创建一个 MQMsgListener 类用于监听 RocketMQ 的消息,类上标注注解:
@Component
、
@RocketMQMessageListener
,该类需要实现 RocketMQListener 接口,并使用泛型指定接收的消息类型:
@Component@RocketMQMessageListener(topic ="delayTopic",consumerGroup="boot-mq-group-consumer")publicclassMQMsgListenerimplementsRocketMQListener<MessageExt>{@OverridepublicvoidonMessage(MessageExt message){String msgId = message.getMsgId();String msg =newString(message.getBody());System.out.println("消息id:"+msgId+"消息内容:"+msg);}}
@RocketMQMessageListener
注解参数如下:
参数描述topic消费者订阅的主题consumerGroup消费者组consumeMode消费模式:并发接收消息 | 有序接收消息【
ConsumeMode.CONCURRENTLY
or
ConsumeMode.ORDERLY
】messageModel消息模式:集群模式 | 广播模式【
MessageModel.CLUSTERING
or
MessageModel.BROADCASTING
】selectorType过滤消息的方式:Tag | SQL92【
SelectorType.TAG
or
SelectorType.SQL92
】selectorExpression过滤消息的表达式:Tag | SQL92【`tag1maxReconsumeTimes消息消费失败后,可被重复投递的最大次数。消息重试只针对集群消费模式生效。delayLevelWhenNextConsume并发模式的消息重试策略。-1,无需重试,直接放入死信队列(%DLQ%+消费组)
消息过滤
Tag 过滤
消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。
编写并启动消费者项目订阅 tagTopic 主题:
@Component@RocketMQMessageListener(topic ="tagTopic",
consumerGroup ="boot-mq-group-consumer",
selectorType =SelectorType.TAG,
selectorExpression ="java")publicclassMQMsgListenerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(String message){System.out.println(message);}}
编写生产者 Controller,使用 RocketMQTemplate 的
syncSend()
方法发送一个带 Tag 的同步消息:
@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/tag")publicStringsendSyncMessage(){SendResult result = rocketMQTemplate.syncSend("tagTopic:java","这是一个带有 java tag 的消息");return"发送状态:"+ result.getSendStatus()+"<br>消息id:"+ result.getMsgId();}}
运行项目,访问接口:http://localhost:8080/send/tag
查看 RocketMQ 控制台,可以看到消息带有 java tag:
查看消费者项目的 IDEA 控制台:
生产者 Producer
YAML 配置
在 SpringBoot 项目的 yml 配置文件中添加以下配置:
rocketmq:name-server: 192.168.68.121:9876# rocketMq的nameServer地址producer:group: boot-mq-group-producer # 生产者组名
注:生产者需要标注生产者组名,否则会报异常:
'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.
发送同步消息
编写 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息,并将消息发送的结果进行打印:
@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/sync/{msg}")publicStringsendSyncMessage(@PathVariableString msg){SendResult result = rocketMQTemplate.syncSend("syncTopic", msg);return"发送状态:"+result.getSendStatus()+"<br>消息id:"+result.getMsgId();}}
运行项目,访问接口:http://localhost:8080/send/sync/同步消息
访问控制台,查看【syncTopic】主题,可以看到队列中存在一条消息:
发送异步消息
不同于同步消息,异步消息在发出后,并不会等待服务端返回响应,直接继续向下执行,发送方通过回调接口接收服务端响应,并处理响应结果。
编写 Controller,使用 RocketMQTemplate 的
asyncSend()
方法发送异步消息,并使用回调接口打印发送的结果:
@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/async/{msg}")publicStringsendAsyncMessage(@PathVariableString msg){
rocketMQTemplate.asyncSend("asyncTopic", msg,newSendCallback(){@OverridepublicvoidonSuccess(SendResult sendResult){System.out.println("异步消息发送成功");}@OverridepublicvoidonException(Throwable throwable){System.out.println("异步消息发送失败");}});System.out.println("异步消息已发送完成");return"发送异步消息";}}
运行项目,访问接口:http://localhost:8080/send/async/异步消息,查看 IDEA 控制台:
访问控制台,查看【asyncTopic】主题,可以看到队列中存在一条消息:
发送单向消息
编写 Controller,使用 RocketMQTemplate 的
sendOneWay()
方法发送单向消息:
@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/oneWay/{msg}")publicStringsendOneWayMessage(@PathVariableString msg){
rocketMQTemplate.sendOneWay("oneWayTopic",msg);return"单向消息发送成功";}}
运行项目,访问接口:http://localhost:8080/send/oneWay/单向消息
访问控制台,查看【oneWayTopic】主题,可以看到队列中存在一条消息:
发送延迟消息
编写并启动消费者项目订阅 delayTopic 主题:
@Component@RocketMQMessageListener(topic ="delayTopic",consumerGroup="boot-mq-group-consumer")publicclassMQMsgListenerimplementsRocketMQListener<MessageExt>{@OverridepublicvoidonMessage(MessageExt message){String msgId = message.getMsgId();String msg =newString(message.getBody());System.out.println("消息id:"+msgId+"\n消息内容:"+msg+"\n消息收到时间:"+newDate());}}
编写生产者 Controller,使用 RocketMQTemplate 的
syncSend()
方法发送同步消息:
@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/delay/{msg}")publicStringsendDelayMessage(@PathVariableString msg){Message<String> message =MessageBuilder.withPayload(msg).build();// 延迟级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"SendResult result = rocketMQTemplate.syncSend("delayTopic", message,2000,3);return"发送状态:"+ result.getSendStatus()+"<br>消息id:"+ result.getMsgId()+"<br>消息发送时间:"+newDate();}}
运行项目,访问接口:http://localhost:8080/send/delay/延迟消息
查看消费者项目的 IDEA 控制台,可以看到过去了10s,对应我们设置的延迟级别。
发送顺序消息
编写订单类,用于模拟【下订单->发短信->物流->签收】的顺序流程:
publicclassOrder{//订单号privateString orderId;//订单名称privateString orderName;//订单的流程顺序privateString seq;}
编写并启动两个消费者项目订阅 orderlyTopic 主题,并将消费模式设置为顺序消费模式:
@Component@RocketMQMessageListener(topic ="orderlyTopic",
consumerGroup="boot-mq-group-consumer",
consumeMode =ConsumeMode.ORDERLY)publicclassMQMsgListenerimplementsRocketMQListener<Order>{@OverridepublicvoidonMessage(Order message){System.out.println("消费者:"+message);}}
编写生产者 Controller,使用 RocketMQTemplate 的
syncSendOrderly()
方法发送同步顺序消息:
@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/orderly")publicStringsendOrderlyMessage(){List<Order> orders =Arrays.asList(newOrder(UUID.randomUUID().toString(),"下订单","1"),newOrder(UUID.randomUUID().toString(),"发短信","1"),newOrder(UUID.randomUUID().toString(),"物流","1"),newOrder(UUID.randomUUID().toString(),"签收","1"),newOrder(UUID.randomUUID().toString(),"下订单","2"),newOrder(UUID.randomUUID().toString(),"发短信","2"),newOrder(UUID.randomUUID().toString(),"物流","2"),newOrder(UUID.randomUUID().toString(),"签收","2"));//控制流程:下订单->发短信->物流->签收//将 seq 作为 hashKey,这样 seq 相同的会放在同一个队列里面,顺序消费
orders.forEach(order ->{
rocketMQTemplate.syncSendOrderly("orderlyTopic",order,order.getSeq());});return"发送成功";}}
运行项目,访问接口:http:localhost:8080/send/orderly
查看 RocketMQ 控制台,可以看到我们的消息分别存储在两个队列中:
查看消费者项目的 IDEA 控制台,按照消息的顺序进行消费:
发送批量消息
编写并启动消费者项目订阅 batchOrderly 主题:
@Component@RocketMQMessageListener(topic ="batchOrderly",
consumerGroup="boot-mq-group-consumer")publicclassMQMsgListenerimplementsRocketMQListener<Order>{@OverridepublicvoidonMessage(Order message){System.out.println(Thread.currentThread().getName()+":"+message);}}
编写生产者 Controller,将消息打包成
Collection<Message> msgs
传入
syncSend()
方法中发送:
@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/batch")publicStringsendOrderlyMessage(){List<Message> messages =Arrays.asList(MessageBuilder.withPayload(newOrder(UUID.randomUUID().toString(),"下订单","1")).build(),MessageBuilder.withPayload(newOrder(UUID.randomUUID().toString(),"下订单","1")).build(),MessageBuilder.withPayload(newOrder(UUID.randomUUID().toString(),"下订单","1")).build(),MessageBuilder.withPayload(newOrder(UUID.randomUUID().toString(),"下订单","1")).build());return rocketMQTemplate.syncSend("batchOrderly", messages).getSendStatus().toString();}}
运行项目,访问接口:http:localhost:8080/send/batch
查看 RocketMQ 控制台,可以看到队列中一次传入4条消息:
查看消费者项目的 IDEA 控制台,多个线程并发进行消费:
发送集合消息
编写并启动消费者项目订阅 listTopic 主题:
@Component@RocketMQMessageListener(topic ="listTopic",
consumerGroup="boot-mq-group-consumer")publicclassMQMsgListenerimplementsRocketMQListener<List<Order>>{@OverridepublicvoidonMessage(List<Order> orders){
orders.forEach(o ->{System.out.println(Thread.currentThread().getName()+":"+o);});}}
编写生产者 Controller,将集合传入
syncSend()
方法中发送:
@RestControllerpublicclassProducerController{@AutowiredprivateRocketMQTemplate rocketMQTemplate;@GetMapping("/send/list")publicStringsendOrderlyMessage(){List<Order> orders =Arrays.asList(newOrder(UUID.randomUUID().toString(),"下订单","1"),newOrder(UUID.randomUUID().toString(),"下订单","1"),newOrder(UUID.randomUUID().toString(),"下订单","1"),newOrder(UUID.randomUUID().toString(),"下订单","1"));
rocketMQTemplate.syncSend("listTopic",orders);return"发送成功";}}
运行项目,访问接口:http:localhost:8080/send/list
查看 RocketMQ 控制台,可以看到队列中一条消息:
查看消费者项目的 IDEA 控制台,进行消费:
版权归原作者 I'm Jie 所有, 如有侵权,请联系我们删除。