一、前言
相较于 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等重量级的消息队列中间件,Redis在需求量小的情况下,也可以作为消息中间件来使用。Redis作为消息队列使用,常见的有List、发布/订阅模型以及在Redis5以后出现的Stream。Stream相较于前两种,最大的优点就是可以持久化。
二、下载Redis及引入Redis依赖
下载Redis5以上的客户端,win版下载地址
pom中引入redis依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
三、配置消费者及消费组
application.yml中配置stream key,消费组和消费者可配置多个。
redis:
mq:
streams:
# key名称
- name: RARSP:REPORT:READ:VS
groups:
# 消费组名称
- name: VS_GROUP
消费者名称
consumers: VS-CONSUMER-A,VS-CONSUMER-B
# key2
- name: RARSP:REPORT:READ:BLC
groups:
- name: BLC_GROUP
consumers: BLC-CONSUMER-A,BLC-CONSUMER-B
# key3
- name: RARSP:REPORT:READ:HD
groups:
- name: HD_GROUP
consumers: HD-CONSUMER-A,HD-CONSUMER-B
自定义三个实体类RedisMqGroup、RedisMqStream、RedisMq,对应application.yml中的配置
public class RedisMqGroup {
private String name;
private String[] consumers;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String[] getConsumers() {
return consumers;
}
public void setConsumers(String[] consumers) {
this.consumers = consumers;
}
}
public class RedisMqStream {
public String name;
public List<RedisMqGroup> groups;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<RedisMqGroup> getGroups() {
return groups;
}
public void setGroups(List<RedisMqGroup> groups) {
this.groups = groups;
}
}
@EnableConfigurationProperties
@Configuration
@ConfigurationProperties(prefix = "redis.mq")
public class RedisMq {
public List<RedisMqStream> streams;
public List<RedisMqStream> getStreams() {
return streams;
}
public void setStreams(List<RedisMqStream> streams) {
this.streams = streams;
}
}
四,配置Redsi及初始化stream、消费组、消费者
@Slf4j
@Configuration
public class RedisConfiguration {
@Resource
private RedisTemplate redisTemplate;
@Resource
private RedisStreamUtil redisStreamUtil;
@Resource
private RedisMq redisMq;
/**
* 处理乱码
* @return
*/
@Bean
public RedisTemplate redisTemplateInit() {
// key序列化
redisTemplate.setKeySerializer(new StringRedisSerializer());
//val实例化
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
//value hashmap序列化
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
//key haspmap序列化
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
return redisTemplate;
}
@Bean
public List<Subscription> subscription(RedisConnectionFactory factory){
List<Subscription> resultList = new ArrayList<>();
AtomicInteger index = new AtomicInteger(1);
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {
Thread thread = new Thread(r);
thread.setName("async-stream-consumer-" + index.getAndIncrement());
thread.setDaemon(true);
return thread;
});
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
// 一次最多获取多少条消息
.batchSize(5)
.executor(executor)
.pollTimeout(Duration.ofSeconds(1))
// .errorHandler()
.build();
for (RedisMqStream redisMqStream :redisMq.getStreams()) {
String streamName = redisMqStream.getName();
RedisMqGroup redisMqGroup = redisMqStream.getGroups().get(0);
initStream(streamName,redisMqGroup.getName());
var listenerContainer = StreamMessageListenerContainer.create(factory,options);
// 手动ask消息
Subscription subscription = listenerContainer.receive(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());
// 自动ask消息
/* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/
resultList.add(subscription);
listenerContainer.start();
}
return resultList;
}
private void initStream(String key, String group){
boolean hasKey = redisStreamUtil.hasKey(key);
if(!hasKey){
Map<String,Object> map = new HashMap<>(1);
map.put("field","value");
//创建主题
String result = redisStreamUtil.addMap(key, map);
//创建消费组
redisStreamUtil.createGroup(key,group);
//将初始化的值删除掉
redisStreamUtil.del(key,result);
log.info("stream:{}-group:{} initialize success",key,group);
}
}
}
Redis工具类
@Component
public class RedisStreamUtil {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 创建消费组
* @param key stream-key值
* @param group 消费组
* @return java.lang.String
*/
public String createGroup(String key, String group){
return stringRedisTemplate.opsForStream().createGroup(key, group);
}
/**
* 获取消费者信息
* @param key stream-key值
* @param group 消费组
* @return org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers
*/
public StreamInfo.XInfoConsumers queryConsumers(String key, String group){
return stringRedisTemplate.opsForStream().consumers(key, group);
}
/**
* 添加Map消息
* @param key stream对应的key
* @param value 消息数据
* @return
*/
public String addMap(String key, Map<String, Object> value){
return stringRedisTemplate.opsForStream().add(key, value).getValue();
}
/**
* 读取消息
* @param: key
* @return java.util.List<org.springframework.data.redis.connection.stream.MapRecord<java.lang.String,java.lang.Object,java.lang.Object>>
*/
public List<MapRecord<String, Object, Object>> read(String key){
return stringRedisTemplate.opsForStream().read(StreamOffset.fromStart(key));
}
/**
* 确认消费
* @param key
* @param group
* @param recordIds
* @return java.lang.Long
*/
public Long ack(String key, String group, String... recordIds){
return stringRedisTemplate.opsForStream().acknowledge(key, group, recordIds);
}
/**
* 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
* @param: key
* @param: recordIds
* @return java.lang.Long
*/
public Long del(String key, String... recordIds){
return stringRedisTemplate.opsForStream().delete(key, recordIds);
}
/**
* 判断是否存在key
* @param key
* @return
*/
public boolean hasKey(String key){
Boolean aBoolean = stringRedisTemplate.hasKey(key);
return aBoolean==null?false:aBoolean;
}
}
五、生产消息、消费消息
生产消息代码
Map<String,Object> message = new HashMap<>(2);
message.put("body","消息主题" );
message.put("sendTime", "消息发送时间");
String streamKey = "";//stream的key值,对应application.yml中配置的
redisStreamUtil.addMap(streamKey, message);
消费消息
@Slf4j
@Component
public class ReportReadMqListener implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
// stream的key值
String streamKey = message.getStream();
//消息ID
RecordId recordId = message.getId();
//消息内容
Map<String, String> msg = message.getValue();
//TODO 处理逻辑
//逻辑处理完成后,ack消息,删除消息,group为消费组名称
redisStreamUtil.ack(streamKey,group,recordId.getValue());
redisStreamUtil.del(streamKey,recordId.getValue());
}
}
版权归原作者 窦朋飞 所有, 如有侵权,请联系我们删除。