0


RocketMQ(三):集成SpringBoot

RocketMQ系列文章

RocketMQ(一):基本概念和环境搭建

RocketMQ(二):原生API快速入门

RocketMQ(三):集成SpringBoot


目录

一、搭建环境

  • 需要创建两个服务,消息生产服务和消息消费者服务
  • 生产消息存在多个服务,消费则统一由一个服务处理
  • 这样可以做到解耦

pom.xml

  • 生产者和消费者都需要
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency>

生产者配置文件

  • 设置统一的生产者组,这样发送消息时就不用指定了
rocketmq:name-server: 127.0.0.1:9876# rocketMq的nameServer地址producer:group: boot-producer-group        # 生产者组别send-message-timeout:3000# 消息发送的超时时间retry-times-when-send-async-failed:2# 异步消息发送失败重试次数max-message-size:4194304# 消息的最大长度

生产者配置文件

  • 不能设置统一的消费者组,因为不同的消费者订阅关系不一致,需要设置不同的消费者组
rocketmq:name-server: localhost:9876

二、不同类型消息

直接引入即可

@AutowiredprivateRocketMQTemplate rocketMQTemplate;

1、同步消息

生产消息

  • 消息由消费者发送到broker后,会得到一个确认,是具有可靠性的
  • 比如:重要的消息通知,短信通知等
rocketMQTemplate.syncSend("bootTestTopic","我是boot的一个消息");

消费消息

  • RocketMQListener的泛型类型即消息类型- MessageExt类型是消息的所有内容- 其他类型则就只是消息体内容,没有消息头内容(keys、msgId、延迟时间、重试次数、主题名称...)
  • onMessage方法内没有报错就是签收了,报错就是拒收会重试
@Component@RocketMQMessageListener(topic ="bootTestTopic", consumerGroup ="boot-test-consumer-group")publicclassABootSimpleMsgListenerimplementsRocketMQListener<MessageExt>{@OverridepublicvoidonMessage(MessageExt message){System.out.println(newString(message.getBody()));}}

2、异步消息

  • 发送异步消息,发送完以后会有一个异步通知
  • 不影响程序往下执行
rocketMQTemplate.asyncSend("bootAsyncTestTopic","我是boot的一个异步消息",newSendCallback(){@OverridepublicvoidonSuccess(SendResult sendResult){System.out.println("成功");}@OverridepublicvoidonException(Throwable throwable){System.out.println("失败"+ throwable.getMessage());}});

3、单向消息

  • 不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险
  • 例如日志信息的发送
rocketMQTemplate.sendOneWay("bootOnewayTopic","单向消息");

4、延迟消息

  • RocketMQ不支持任意时间的延时
  • 只支持以下18个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟
  • private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
  • 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费
Message<String> msg =MessageBuilder.withPayload("我是一个延迟消息").build();
rocketMQTemplate.syncSend("bootMsTopic", msg,3000,4);

5、顺序消息

生产消息

  • 根据syncSendOrderly方法的第三个参数计算hash值决定消息放入哪个队列
// 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去  消费者 需要单线程消费List<MsgModel> msgModels =Arrays.asList(newMsgModel("qwer",1,"下单"),newMsgModel("qwer",1,"短信"),newMsgModel("qwer",1,"物流"),newMsgModel("zxcv",2,"下单"),newMsgModel("zxcv",2,"短信"),newMsgModel("zxcv",2,"物流"));
msgModels.forEach(msgModel ->{// 发送  一般都是以json的方式进行处理// 根据第三个参数计算hash值决定消息放入哪个队列
    rocketMQTemplate.syncSendOrderly("bootOrderlyTopic",JSON.toJSONString(msgModel), msgModel.getOrderSn());});

消费消息

  • 默认是并发消费模式,可以设置为单线程顺序模式
  • 设置消费重试次数
@Component@RocketMQMessageListener(topic ="bootOrderlyTopic",
        consumerGroup ="boot-orderly-consumer-group",
        consumeMode =ConsumeMode.ORDERLY,// 顺序消费模式 单线程
        maxReconsumeTimes =5// 消费重试的次数)publicclassBOrderlyMsgListenerimplementsRocketMQListener<MessageExt>{@OverridepublicvoidonMessage(MessageExt message){MsgModel msgModel =JSON.parseObject(newString(message.getBody()),MsgModel.class);System.out.println(msgModel);}}

6、带tag消息

  • tag带在主题后面用:来携带
rocketMQTemplate.syncSend("bootTagTopic:tagA","我是一个带tag的消息");

7、带key消息

Message<String> message =MessageBuilder.withPayload("我是一个带key的消息").setHeader(RocketMQHeaders.KEYS,"10086").build();
rocketMQTemplate.syncSend("bootKeyTopic", message);

获取带key和tag的消费者

  • 过滤模式有两种:正则表达式和sql92方式
  • keys从MessageExt对象中获取
@Component@RocketMQMessageListener(topic ="bootTagTopic",
        consumerGroup ="boot-tag-consumer-group",
        selectorType =SelectorType.TAG,// tag过滤模式
        selectorExpression ="tagA || tagB"//        selectorType = SelectorType.SQL92,// sql92过滤模式//        selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true)publicclassCTagMsgListenerimplementsRocketMQListener<MessageExt>{@OverridepublicvoidonMessage(MessageExt message){System.out.println("获取keys: "+ message.getKeys());System.out.println("消息内容: "+newString(message.getBody()));}}

查看源码

  • destination目标 = 主题 : 标签
  • keys从消息头里面获取

在这里插入图片描述

三、消息消费两种模式

  • Rocketmq消息消费的模式分为两种:负载均衡模式广播模式
  • 负载均衡模式表示多个消费者交替消费同一个主题里面的消息
  • 广播模式表示每个消费者都消费一遍订阅的主题的消息

1、负载均衡模式

  • 队列会被消费者分摊
  • 队列数量应该>=消费者数量,否则多出来的消费者永远接收不到消息
  • mq服务器会记录消息的消费点位(即消息是否被消费)

创建多个消费者监听同一个主题

@Component@RocketMQMessageListener(topic ="modeTopic",
        consumerGroup ="mode-consumer-group-a",
        messageModel =MessageModel.CLUSTERING,// 集群模式(负载均衡))publicclassDC1implementsRocketMQListener<String>{@OverridepublicvoidonMessage(String message){System.out.println("我是mode-consumer-group-a组的第一个消费者:"+ message);}}@Component@RocketMQMessageListener(topic ="modeTopic",
        consumerGroup ="mode-consumer-group-a",
        messageModel =MessageModel.CLUSTERING// 集群模式(负载均衡))publicclassDC2implementsRocketMQListener<String>{@OverridepublicvoidonMessage(String message){System.out.println("我是mode-consumer-group-a组的第二个消费者:"+ message);}}@Component@RocketMQMessageListener(topic ="modeTopic",
        consumerGroup ="mode-consumer-group-a",
        messageModel =MessageModel.CLUSTERING// 集群模式(负载均衡))publicclassDC3implementsRocketMQListener<String>{@OverridepublicvoidonMessage(String message){System.out.println("我是mode-consumer-group-a组的第三个消费者:"+ message);}}

生产者发送多条消息

@TestpublicvoidmodeTest()throwsException{for(int i =1; i <=10; i++){
        rocketMQTemplate.syncSend("modeTopic","我是第"+ i +"个消息");}}

执行结果:

我是mode-consumer-group-a组的第一个消费者:我是第4个消息
我是mode-consumer-group-a组的第一个消费者:我是第8个消息
我是mode-consumer-group-a组的第三个消费者:我是第3个消息
我是mode-consumer-group-a组的第三个消费者:我是第7个消息
我是mode-consumer-group-a组的第二个消费者:我是第2个消息
我是mode-consumer-group-a组的第二个消费者:我是第6个消息
我是mode-consumer-group-a组的第二个消费者:我是第10个消息
我是mode-consumer-group-a组的第一个消费者:我是第1个消息
我是mode-consumer-group-a组的第一个消费者:我是第5个消息
我是mode-consumer-group-a组的第一个消费者:我是第9个消息

第一个消费者消费了5个消息,第二个消费了3个,第三个消费了2个,为什么没有平均开?

  • 10个消息,平均发送到4个队列

在这里插入图片描述

  • 第一个消费者接收队列0和队列1的消息
  • 第二个消费者接收队列2的消息
  • 第三个消费者接收队列3的消息
  • 如果是四个消费者,则每个消费者只能接收其中之一队列的消息
  • 如果是五个消费者,那么第五个消费者哪个队列的消息都不能接收到
  • 所谓的负载均衡则是队列被平分,而不是消息

在这里插入图片描述

  • 代理者点位:可以认为此队列接收的消息数量
  • 消费者点位:可以认为是已经消费的消息数量
  • 差值:代理者点位-消费者点位,待处理的消息数量

2、广播模式

  • 消费者组订阅的主题接收到的消息,每个消费者都会消费
  • mq服务器不会记录消息的消费点位(即消息是否被消费永远未知)

创建多个消费者监听同一个主题

  • 一个springboot服务配置了多个相同组和主题的consumer - 需要指定唯一instanceName- 实现RocketMQPushConsumerLifecycleListener接口
  • 否则报错The consumer group has been created before, specify another name please
@Component@RocketMQMessageListener(topic ="modeTopic",
        consumerGroup ="mode-consumer-group-b",
        messageModel =MessageModel.BROADCASTING// 广播模式)publicclassDC4implementsRocketMQListener<String>,RocketMQPushConsumerLifecycleListener{@OverridepublicvoidonMessage(String message){System.out.println("我是mode-consumer-group-b组的第一个消费者:"+ message);}@OverridepublicvoidprepareStart(DefaultMQPushConsumer defaultMQPushConsumer){
        defaultMQPushConsumer.setInstanceName("第一个消费者");}}@Component@RocketMQMessageListener(topic ="modeTopic",
        consumerGroup ="mode-consumer-group-b",
        messageModel =MessageModel.BROADCASTING// 广播模式)publicclassDC5implementsRocketMQListener<String>,RocketMQPushConsumerLifecycleListener{@OverridepublicvoidonMessage(String message){System.out.println("我是mode-consumer-group-b组的第二个消费者:"+ message);}@OverridepublicvoidprepareStart(DefaultMQPushConsumer defaultMQPushConsumer){
        defaultMQPushConsumer.setInstanceName("第二个消费者");}}@Component@RocketMQMessageListener(topic ="modeTopic",
        consumerGroup ="mode-consumer-group-b",
        messageModel =MessageModel.BROADCASTING// 广播模式)publicclassDC6implementsRocketMQListener<String>,RocketMQPushConsumerLifecycleListener{@OverridepublicvoidonMessage(String message){System.out.println("我是mode-consumer-group-b组的第三个消费者:"+ message);}@OverridepublicvoidprepareStart(DefaultMQPushConsumer defaultMQPushConsumer){
        defaultMQPushConsumer.setInstanceName("第三个消费者");}}

生产者发送多条消息

@TestpublicvoidmodeTest()throwsException{for(int i =1; i <=3; i++){
        rocketMQTemplate.syncSend("modeTopic","我是第"+ i +"个消息");}}

执行结果:

在这里插入图片描述

  • 生产者发送3条消息
  • 三个消费者,每个都会消费这3条消息
  • 所以,消费者点位也没法移动,索性就不动了

在这里插入图片描述


本文转载自: https://blog.csdn.net/qq_35512802/article/details/134408321
版权归原作者 冬天vs不冷 所有, 如有侵权,请联系我们删除。

“RocketMQ(三):集成SpringBoot”的评论:

还没有评论