本地测试,一定要启动rocketmq ,否则报错。启动方式请百度。
1.创建Springboot项目,添加rockermq 依赖
<!--rocketMq依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
2.配置rocketmq
端口
server:
port: 8083配置 rocketmq
rocketmq:
name-server: 127.0.0.1:9876
#生产者
producer:
#生产者组名,规定在一个应用里面必须唯一
group: group1
#消息发送的超时时间 默认3000ms
send-message-timeout: 3000
#消息达到4096字节的时候,消息就会被压缩。默认 4096
compress-message-body-threshold: 4096
#最大的消息限制,默认为128K
max-message-size: 4194304
#同步消息发送失败重试次数
retry-times-when-send-failed: 3
#在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
retry-next-server: true
#异步消息发送失败重试的次数
retry-times-when-send-async-failed: 3
3.创建生产者 发送批量消息—小于4MB
发送批量消息,最主要的区别是在发送消息的send方法入参一个List。
package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;/**
- 批量消息
- @author qzz
*/
@RestController
public class RocketMQBatchController {@Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送批量消息 * 发送批量消息,最主要的区别是在发送消息的send方法入参一个List。 */ @RequestMapping("/testBatchSend") public void testSyncSend(){ List<Message<String>> messageList = new ArrayList<>(); for(int i=0;i<10;i++){ messageList.add(MessageBuilder.withPayload("批量消息"+(i+1)).build()); } //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 SendResult sendResult = rocketMQTemplate.syncSend("test-topic-batch",messageList); System.out.println(sendResult); }
}
SpringBoot给我们提供了RocketMQTemplate模板类,我们利用这个类可以以多种形式发送消息。
发送方法指定Topic主题test-topic-batch。
官网也提示:如果批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB,实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。
4.新建消息消费者监听RocketMQConsumerListener,监听消息,消费消息
注意,RocketMQConsumerListener类如果写在当前工程下,生产的消息会在当前类消费掉。
我们创建其他项目消费。
package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/**
- 消费批量消息
- 配置RocketMQ监听
- ConsumeMode.ORDERLY:顺序消费
- @author qzz
*/
@Service
@RocketMQMessageListener(consumerGroup = "test-batch",topic = "test-topic-batch",consumeMode = ConsumeMode.ORDERLY)
public class RocketMQBatchConsumerListener implements RocketMQListener<String> {@Override public void onMessage(String s) { System.out.println("consumer 批量消息,收到消息:"+s); }
}
消费者类要实现
RocketMQListener
接口,以及动态指定消息类型String。
类上要加上@RocketMQMessageListener注解,指定topic主题test-topic-batch,以及消费者组test-batch
5.启动服务,测试效果
从控制台打印可以看出,发送一次批量消息,创建了多了msgId(使用逗号拼接而成),默认消费时也是 一次消费一次。
版权归原作者 yuanshiren133 所有, 如有侵权,请联系我们删除。