0


Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动

1为什么要使用MQ?

在 **Spring Boot Event这篇文章中 **已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢?

首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。

通过MQ可以实现将事件推送到进程外的Broker中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。

源码地址:

https://gitee.com/sparkle3021/springboot3-study

2整合RocketMQ

依赖版本
  • JDK 17
  • Spring Boot 3.2.0
  • RocketMQ-Client 5.0.4
  • RocketMQ-Starter 2.2.0

Spring Boot 3.0+ 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。

引入RocketMQ依赖
  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client-java</artifactId>
  4. <version>5.0.4</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.rocketmq</groupId>
  8. <artifactId>rocketmq-spring-boot-starter</artifactId>
  9. <version>2.2.0</version>
  10. </dependency>
解决Spring Boot3+不兼容 spring.factories

rocketmq-spring-boot-starter:2.2.2版本中:

参考配置文件
  1. # RocketMQ 配置
  2. rocketmq:
  3. name-server: 127.0.0.1:9876
  4. consumer:
  5. group: event-mq-group
  6. # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
  7. pull-batch-size: 1
  8. producer:
  9. # 发送同一类消息的设置为同一个group,保证唯一
  10. group: event-mq-group
  11. # 发送消息超时时间,默认3000
  12. sendMessageTimeout: 10000
  13. # 发送消息失败重试次数,默认2
  14. retryTimesWhenSendFailed: 2
  15. # 异步消息重试此处,默认2
  16. retryTimesWhenSendAsyncFailed: 2
  17. # 消息最大长度,默认1024 * 1024 * 4(默认4M)
  18. maxMessageSize: 4096
  19. # 压缩消息阈值,默认4k(1024 * 4)
  20. compressMessageBodyThreshold: 4096
  21. # 是否在内部发送失败时重试另一个broker,默认false
  22. retryNextServer: false
  • 方法一 :通过 @Import(RocketMQAutoConfiguration.class) 在配置类中引入
  • 方法二:在resources资源目录下创建文件夹及文件 META-INF/spring,org.springframework.boot.autoconfigure.AutoConfiguration.imports。

文件内容为RocketMQ自动配置类路径: org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

3RocketMQ 使用

解决Spring Boot3+不支持spring.factories的问题
  1. import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.context.annotation.Import;
  5. /**
  6. * 启动类
  7. */
  8. @Import(RocketMQAutoConfiguration.class)
  9. @SpringBootApplication
  10. public class MQEventApplication {
  11. public static void main(String[] args) {
  12. SpringApplication.run(MQEventApplication.class, args);
  13. }
  14. }
RocketMQ操作工具

RocketMQ Message实体

  1. import cn.hutool.core.util.IdUtil;
  2. import jakarta.validation.constraints.NotBlank;
  3. import lombok.AllArgsConstructor;
  4. import lombok.Builder;
  5. import lombok.Data;
  6. import lombok.NoArgsConstructor;
  7. import org.apache.commons.collections.CollectionUtils;
  8. import org.apache.commons.lang3.ObjectUtils;
  9. import org.springframework.messaging.Message;
  10. import org.springframework.messaging.support.MessageBuilder;
  11. import java.io.Serializable;
  12. import java.util.List;
  13. /**
  14. * RocketMQ 消息
  15. */
  16. @Data
  17. @Builder
  18. @AllArgsConstructor
  19. @NoArgsConstructor
  20. public class RocketMQMessage<T> implements Serializable {
  21. /**
  22. * 消息队列主题
  23. */
  24. @NotBlank(message = "MQ Topic 不能为空")
  25. private String topic;
  26. /**
  27. * 延迟级别
  28. */
  29. @Builder.Default
  30. private DelayLevel delayLevel = DelayLevel.OFF;
  31. /**
  32. * 消息体
  33. */
  34. private T message;
  35. /**
  36. * 消息体
  37. */
  38. private List<T> messages;
  39. /**
  40. * 使用有序消息发送时,指定发送到队列
  41. */
  42. private String hashKey;
  43. /**
  44. * 任务Id,用于日志打印相关信息
  45. */
  46. @Builder.Default
  47. private String taskId = IdUtil.fastSimpleUUID();
  48. }

RocketMQTemplate 二次封装

  1. import com.yiyan.study.domain.RocketMQMessage;
  2. import jakarta.annotation.Resource;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.apache.rocketmq.client.producer.SendCallback;
  6. import org.apache.rocketmq.client.producer.SendResult;
  7. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.stereotype.Component;
  10. /**
  11. * RocketMQ 消息工具类
  12. */
  13. @Slf4j
  14. @Component
  15. public class RocketMQService {
  16. @Resource
  17. private RocketMQTemplate rocketMQTemplate;
  18. @Value("${rocketmq.producer.sendMessageTimeout}")
  19. private int sendMessageTimeout;
  20. /**
  21. * 异步发送消息回调
  22. *
  23. * @param taskId 任务Id
  24. * @param topic 消息主题
  25. * @return the send callback
  26. */
  27. private static SendCallback asyncSendCallback(String taskId, String topic) {
  28. return new SendCallback() {
  29. @Override
  30. public void onSuccess(SendResult sendResult) {
  31. log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());
  32. }
  33. @Override
  34. public void onException(Throwable throwable) {
  35. log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());
  36. }
  37. };
  38. }
  39. /**
  40. * 发送同步消息,使用有序发送请设置HashKey
  41. *
  42. * @param message 消息参数
  43. */
  44. public <T> void syncSend(RocketMQMessage<T> message) {
  45. log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
  46. SendResult sendResult;
  47. if (StringUtils.isNotBlank(message.getHashKey())) {
  48. sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
  49. } else {
  50. sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());
  51. }
  52. log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
  53. message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
  54. }
  55. /**
  56. * 批量发送同步消息
  57. *
  58. * @param message 消息参数
  59. */
  60. public <T> void syncSendBatch(RocketMQMessage<T> message) {
  61. log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
  62. message.getTaskId(), message.getTopic(), message.getMessages().size());
  63. SendResult sendResult;
  64. if (StringUtils.isNotBlank(message.getHashKey())) {
  65. sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());
  66. } else {
  67. sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());
  68. }
  69. log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",
  70. message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());
  71. }
  72. /**
  73. * 异步发送消息,异步返回消息结果
  74. *
  75. * @param message 消息参数
  76. */
  77. public <T> void asyncSend(RocketMQMessage<T> message) {
  78. log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());
  79. if (StringUtils.isNotBlank(message.getHashKey())) {
  80. rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),
  81. asyncSendCallback(message.getTaskId(), message.getTopic()));
  82. } else {
  83. rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),
  84. asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());
  85. }
  86. }
  87. /**
  88. * 批量异步发送消息
  89. *
  90. * @param message 消息参数
  91. */
  92. public <T> void asyncSendBatch(RocketMQMessage<T> message) {
  93. log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",
  94. message.getTaskId(), message.getTopic(), message.getMessages().size());
  95. if (StringUtils.isNotBlank(message.getHashKey())) {
  96. rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),
  97. asyncSendCallback(message.getTaskId(), message.getTopic()));
  98. } else {
  99. rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),
  100. asyncSendCallback(message.getTaskId(), message.getTopic()));
  101. }
  102. }
  103. /**
  104. * 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;
  105. *
  106. * @param message 消息参数
  107. */
  108. public <T> void sendOneWay(RocketMQMessage<T> message) {
  109. sendOneWay(message, false);
  110. }
  111. /**
  112. * 单向消息 - 批量发送
  113. *
  114. * @param message 消息体
  115. * @param batch 是否为批量操作
  116. */
  117. public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {
  118. log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]"
  119. : "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),
  120. message.getTaskId(), message.getTopic(), message.getMessages().size());
  121. if (StringUtils.isNotBlank(message.getHashKey())) {
  122. if (batch) {
  123. message.getMessages().
  124. forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));
  125. } else {
  126. rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());
  127. }
  128. } else {
  129. if (batch) {
  130. message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));
  131. } else {
  132. rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());
  133. }
  134. }
  135. }
  136. }
定义RocketMQ消费者
  1. import com.yiyan.study.constants.MQConfig;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  4. import org.apache.rocketmq.spring.core.RocketMQListener;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * MQ消息监听
  8. */
  9. @Component
  10. @Slf4j
  11. @RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,
  12. consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
  13. public class MQListener implements RocketMQListener<String> {
  14. @Override
  15. public void onMessage(String message) {
  16. log.info("MQListener 接收消息 : {}", message);
  17. }
  18. }
定义测试类发送消息
  1. import cn.hutool.core.thread.ThreadUtil;
  2. import com.yiyan.study.constants.MQConfig;
  3. import com.yiyan.study.domain.RocketMQMessage;
  4. import com.yiyan.study.utils.RocketMQService;
  5. import jakarta.annotation.Resource;
  6. import org.junit.jupiter.api.Test;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. /**
  9. * MQ测试
  10. */
  11. @SpringBootTest
  12. public class MQTest {
  13. @Resource
  14. private RocketMQService rocketMQService;
  15. @Test
  16. public void sendMessage() {
  17. int count = 1;
  18. while (count <= 50) {
  19. rocketMQService.syncSend(RocketMQMessage.builder()
  20. .topic(MQConfig.EVENT_TOPIC)
  21. .message(count++)
  22. .build());
  23. }
  24. // 休眠等待消费消息
  25. ThreadUtil.sleep(2000L);
  26. }
  27. }

4测试

总结

感谢您的阅读~


本文转载自: https://blog.csdn.net/m0_67847535/article/details/135984453
版权归原作者 漫走云雾 所有, 如有侵权,请联系我们删除。

“Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动”的评论:

还没有评论