0


springboot 用rocketmq实现批量消息,亲测可用

本地测试,一定要启动rocketmq ,否则报错。启动方式请百度。

1.创建Springboot项目,添加rockermq 依赖

<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

2.配置rocketmq

端口

server:
port: 8083

配置 rocketmq

rocketmq:
name-server: 127.0.0.1:9876
#生产者
producer:
#生产者组名,规定在一个应用里面必须唯一
group: group1
#消息发送的超时时间 默认3000ms
send-message-timeout: 3000
#消息达到4096字节的时候,消息就会被压缩。默认 4096
compress-message-body-threshold: 4096
#最大的消息限制,默认为128K
max-message-size: 4194304
#同步消息发送失败重试次数
retry-times-when-send-failed: 3
#在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
retry-next-server: true
#异步消息发送失败重试的次数
retry-times-when-send-async-failed: 3

3.创建生产者 发送批量消息—小于4MB
发送批量消息,最主要的区别是在发送消息的send方法入参一个List。

package com.example.springbootrocketdemo.controller;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;

/**

  • 批量消息
  • @author qzz

*/
@RestController
public class RocketMQBatchController {

@Autowired
 private RocketMQTemplate rocketMQTemplate;


 /**
  * 发送批量消息
  * 发送批量消息,最主要的区别是在发送消息的send方法入参一个List。
  */
 @RequestMapping("/testBatchSend")
 public void testSyncSend(){

    List<Message<String>> messageList = new ArrayList<>();
     for(int i=0;i<10;i++){
         messageList.add(MessageBuilder.withPayload("批量消息"+(i+1)).build());
     }
     //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
     //参数二:消息内容
     SendResult sendResult = rocketMQTemplate.syncSend("test-topic-batch",messageList);
     System.out.println(sendResult);
 }

}

SpringBoot给我们提供了RocketMQTemplate模板类,我们利用这个类可以以多种形式发送消息。
发送方法指定Topic主题test-topic-batch。

官网也提示:如果批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB,实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。
4.新建消息消费者监听RocketMQConsumerListener,监听消息,消费消息

注意,RocketMQConsumerListener类如果写在当前工程下,生产的消息会在当前类消费掉。

我们创建其他项目消费。

package com.example.springbootrocketdemo.config;

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**

  • 消费批量消息
  • 配置RocketMQ监听
  • ConsumeMode.ORDERLY:顺序消费
  • @author qzz

*/
@Service
@RocketMQMessageListener(consumerGroup = "test-batch",topic = "test-topic-batch",consumeMode = ConsumeMode.ORDERLY)
public class RocketMQBatchConsumerListener implements RocketMQListener<String> {

@Override
 public void onMessage(String s) {
     System.out.println("consumer 批量消息,收到消息:"+s);
 }

}

消费者类要实现

RocketMQListener

接口,以及动态指定消息类型String。

类上要加上@RocketMQMessageListener注解,指定topic主题test-topic-batch,以及消费者组test-batch

5.启动服务,测试效果

从控制台打印可以看出,发送一次批量消息,创建了多了msgId(使用逗号拼接而成),默认消费时也是 一次消费一次。

标签: 大数据

本文转载自: https://blog.csdn.net/yuanshiren133/article/details/125722661
版权归原作者 yuanshiren133 所有, 如有侵权,请联系我们删除。

“springboot 用rocketmq实现批量消息,亲测可用”的评论:

还没有评论