0


RockerMQ发送消息流程

引入依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.1.1</version>
  5. </dependency>

1. 定义RocketMQTemplateService工具类

  1. @Service
  2. public class RocketMQTemplateService {
  3. private static Logger logger = LoggerFactory.getLogger(RocketMQTemplateService.class);
  4. @Autowired
  5. private RocketMQTemplate rocketMQTemplate;
  6. /**
  7. * @Description: 功能描述
  8. * 当发送的消息不重要时,采用OneWay方式,以提升吞吐量,一般用户日志存储
  9. *
  10. * @param: 参数描述
  11. * @param destination
  12. * @param payload
  13. * @throws: 异常描述
  14. */
  15. public void sendOneWay(String destination, Object payload) {
  16. rocketMQTemplate.sendOneWay(destination, payload);
  17. }
  18. /**
  19. * @Description: 功能描述
  20. * 默认使用同步发送syncSend, 但拿不到回执;convertAndSend和send等价
  21. *
  22. * @param: 参数描述
  23. * @param destination
  24. * @param payload
  25. * @throws: 异常描述
  26. *
  27. */
  28. public void convertAndSend(String destination, Object payload) {
  29. rocketMQTemplate.convertAndSend(destination, payload);
  30. }
  31. /**
  32. * @Description: 功能描述
  33. * 同步发送,需设置延迟等级
  34. *
  35. * @param: 参数描述
  36. * @param destination
  37. * @param message
  38. * @param timeout
  39. * @param delayLevel
  40. * @return
  41. * @throws: 异常描述
  42. *
  43. */
  44. public SendResult syncSend(String destination, Object message, long timeout, int delayLevel) {
  45. return rocketMQTemplate.syncSend(destination, MessageBuilder.withPayload(message).build(), timeout, delayLevel);
  46. }
  47. /**
  48. * @Description: 功能描述
  49. * 异步发送,需设置延迟等级
  50. *
  51. * @param: 参数描述
  52. * @param destination
  53. * @param message
  54. * @param timeout
  55. * @param delayLevel
  56. * @throws: 异常描述
  57. *
  58. */
  59. public void asyncSend(String destination, Object message, long timeout, int delayLevel) {
  60. rocketMQTemplate.asyncSend(destination, MessageBuilder.withPayload(message).build(), new SendCallback() {
  61. @Override
  62. public void onSuccess(SendResult sendResult) {
  63. }
  64. @Override
  65. public void onException(Throwable e) {
  66. logger.error("rocketmq发送异步消息异常:destination = " + destination + ";timeout = " + timeout
  67. + ";delayLevel = " + delayLevel + ";message = " + message + ";error = " + e.getMessage());
  68. }
  69. }, timeout, delayLevel);
  70. }
  71. /**
  72. * @Description: 功能描述
  73. * 异步发送,需设置延迟等级
  74. *
  75. * @param: 参数描述
  76. * @param destination
  77. * @param message
  78. * @param sendCallback
  79. * @param timeout
  80. * @param delayLevel
  81. * @throws: 异常描述
  82. *
  83. */
  84. public void asyncSend(String destination, Object message, SendCallback sendCallback, long timeout, int delayLevel) {
  85. rocketMQTemplate.asyncSend(destination, MessageBuilder.withPayload(message).build(), sendCallback, timeout, delayLevel);
  86. }

部分适用场景

当发送的消息不重要时,采用one-way方式,以提升吞吐量
当发送的消息很重要时,且对响应时间不敏感的时候采用sync方式
当发送的消息很重要时,且对响应时间很是敏感的时候采用async方式

2. 使用

  1. @Autowired
  2. private RocketMQTemplateService rocketMQTemplateService;
  3. public void 方法名(参数 参数) {
  4. // 以异步消息为例
  5. rocketMQTemplateService.asyncSend("TopicDetailId", 参数, 300000, RocketMqDelayLevel.getDelayLevel(30));
  6. }

监听类

  1. @Component
  2. @RocketMQMessageListener(topic = BizRocketMq.TopicDetailId,
  3. consumeMode = ConsumeMode.CONCURRENTLY,//分为:有序模式和无序模式,设置为无序模式(并发模式)
  4. messageModel = MessageModel.CLUSTERING,//分为:集群模式和广播模式;设置为集群模式;广播模式只能与无序模式匹配设置,并且广播模式只执行一次,切记切记!
  5. consumerGroup = BizRocketMq.GroupDetailId)
  6. public class JiFenExChangeServiceListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
  7. @Override
  8. public void onMessage(MessageExt messageExt) {
  9. 你的业务逻辑
  10. }
  11. }

消息中两id必须一致

标签: java 服务器 rocketmq

本文转载自: https://blog.csdn.net/FanZaiYo/article/details/135624318
版权归原作者 帆仔哟 所有, 如有侵权,请联系我们删除。

“RockerMQ发送消息流程”的评论:

还没有评论