0


【Jeepay】02-Kafka实现延迟消息与广播模式详细设计

在专题的上一章中,重点讲解了项目的改造背景、难点分析
传送门:【Jeepay】01-Kafka实现延迟消息与广播模式概要设计

在进入正篇之前,想简单说一下,之所以会如此的追本溯源的去记录:

第一是因为:一个可以落地的解决方案的敲定,是综合项目各方面的原因得到的。没有完美的架构,只有刚好的架构;没有满足一切的架构,只有满足目标的架构。

第二是因为想要通过这样的记录,让后面的同学能快速的理解:实践中并不需要沿用我的解决方案,只要能把思路打开,一定会找到更加适合你们项目的方式。

好了废话不多说,本章会就Kafka实现延迟消息与广播模式的技术细节展开讨论。

Kafka延迟发送

解决思路:

Kafka延迟发送的解决思路:利用Redis的ZSet集合,实现Redis缓存队列

生产者在调用延迟发送方法时,消息并不会立刻被投递到Topic中,转而发送到延迟队列

将当前时间戳与延迟时间进行相加,将结果作为ZSet的score进行设置。

除此之外,延迟队列有包含线程池、分布式锁。每5s循环一次,对比当前时间戳与ZSet的score。拉取缓存队列中到期的消息,将消息重新组装,投递到Topic并进行消费。
kafka延迟消息

代码实现

Kafka延迟发送的核心代码

packagecom.kearey.boot.mq.vender.kafka;importcom.kearey.boot.cache.delay.ADelayQueueInfo;importcom.kearey.boot.cache.delay.kafka.KafkaDelayQueue;importlombok.SneakyThrows;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.ApplicationArguments;importorg.springframework.boot.ApplicationRunner;importorg.springframework.stereotype.Component;importjava.util.List;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;/**  
 * @author kearey 
 * @description: 延迟缓存队列定时消费,并投递到常规队列  
 * @date 2023年04月26日  
 * @version: 1.0  
 */@Slf4j@ComponentpublicclassKafkaDelaySenderimplementsApplicationRunner{publicstaticExecutorService executorService =Executors.newFixedThreadPool(5);@AutowiredKafkaDelayQueue cacheQueue;@AutowiredKafkaSender kafkaSender;privateboolean stopFlag=false;publicvoidsetStopFlag(boolean stopFlag){this.stopFlag = stopFlag;}/**  
     * @Tips: ApplicationRunner 的实现方法,项目启动时加载  
     * @param args  
     * @throws Exception     
     */@Overridepublicvoidrun(ApplicationArguments args)throwsException{  
        log.debug("===== 容器启动完毕,开始加载 delaySend() 方法 =====");this.delaySend();}@SneakyThrowspublicvoiddelaySend(){  
        executorService.execute(()->{while(!stopFlag){List<ADelayQueueInfo> messageList = cacheQueue.pull();// 循环处理ZSet集合  if( messageList ==null|| messageList.size()==0){try{// 没有拉取到的话,就睡眠5s  Thread.sleep(5*1000);}catch(InterruptedException e){  
                        e.printStackTrace();}}else{for(ADelayQueueInfo info : messageList){// 尝试获取锁,获取不到就执行下一个  if(cacheQueue.tryLock()){// 发送消息  
                            kafkaSender.sendCache(info);// 删除缓存  
                            cacheQueue.remove(info);// 释放锁  
                            cacheQueue.releaseLock();}}}}});}}

Redis延时队列

基于Redis的ZSet实现的Kafka延时队列数据结构:

ZSet数据结构说明:

zadd <key><score1><value1><score2><value2>...

key:DELAY_QUEUE
score:timestamp
value:topic;message

Zset缓存队列

1、key为

DELAY_QUEUE

。为了减少分布式环境下,延迟队列中的资源竞争,在设置

DELAY_QUEUE

时,可以增加

_ip_port

的后缀。

2、score为

当前时间戳 + 延迟时间的求和

。ZSet集合依据score,会从最低分到最高分的方式排序集合中的成员,这样设计就可以实现按照先后时间排序的效果。

3、value为:kafka的

topic + message

的组合字段,用

;

进行分割。 如果觉得利用分隔符组合value的方式不优雅,这里也可以采用json串的数据格式。

将延时消息按照如上结构存储到Redis后,启动定时任务进行扫描。可以使用springboot自带的定时任务,也可以自己用线程池来自己实现。

以上就是基于Redis的延迟队列的实现思路。

代码实现

基于Redis的ZSet实现的Kafka延时队列核心代码:

packagecom.kearey.boot.cache.delay.kafka;importcom.kearey.boot.cache.constant.DelayQueueConstant;importcom.kearey.boot.cache.delay.ADelayQueueInfo;importcom.kearey.boot.cache.delay.IDelayQueue;importcom.kearey.boot.cache.delay.InitDelayQueue;importlombok.SneakyThrows;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.stereotype.Component;importjava.util.ArrayList;importjava.util.List;importjava.util.Set;importjava.util.concurrent.TimeUnit;importjava.util.stream.Collectors;/**  
 * @author kearey 
 * @description: 延迟缓存队列 —— Kafka实现类  
 * @useage: com.kearey.boot.mq.vender.kafka.KafkaDelaySender  
 *          com.kearey.boot.mq.vender.kafka.KafkaSender 
 * @date 2023年04月25日  
 * @version: 1.0  
 */@Slf4j@ComponentpublicclassKafkaDelayQueueimplementsIDelayQueue{@AutowiredRedisTemplate redisTemplate;@AutowiredInitDelayQueue config;@Override@SneakyThrowspublicBooleanpushMessage(ADelayQueueInfo info,int delay){// score为当前的 时间戳+延迟时间  long score=System.currentTimeMillis()+ delay *1000;String mqTopic = info.getMQTopic();String mqMessage = info.getMQMessage();  
        log.debug("====== 将延迟消息投递至Redis:  topic:"+mqTopic+" === Message:"+mqMessage+"+=====");Boolean add = redisTemplate.opsForZSet().add(config.getDELY_QUEUE(), mqTopic+DelayQueueConstant.SPLIT_FLAG +mqMessage, score);return add;}@Override@SneakyThrowspublicList<ADelayQueueInfo>pull(){List<ADelayQueueInfo> msgList  =newArrayList<>();try{//@Tips:rangeByScore 根据score范围获取 从0到当前时间戳可以拉取当前时间及以前的需要被消费的消息  Set<String> scoreSet = redisTemplate.opsForZSet().rangeByScore(config.getDELY_QUEUE(),0,System.currentTimeMillis());  
            log.debug("====== 拉取最新需要被消费的消息 数量:"+scoreSet.size()+" =====");if( scoreSet ==null|| scoreSet.size()==0){returnnull;}//@Tips:stream 流处理  
            msgList = scoreSet.stream().map(msg ->{KafkaDelayQueueInfo kafkaInfo =newKafkaDelayQueueInfo();try{//截取字符串  
                    kafkaInfo.setMQTopic(msg.split(DelayQueueConstant.SPLIT_FLAG)[0]);  
                    kafkaInfo.setMQMessage(msg.split(DelayQueueConstant.SPLIT_FLAG)[1]);}catch(Exception e){  
                    e.printStackTrace();}return kafkaInfo;}).collect(Collectors.toList());}catch(Exception e){  
            log.error(e.toString());}return msgList;}@Override@SneakyThrowspublicBooleanremove(ADelayQueueInfo info){String mqTopic = info.getMQTopic();String mqMessage = info.getMQMessage();Long remove = redisTemplate.opsForZSet().remove(config.getDELY_QUEUE(), mqTopic+DelayQueueConstant.SPLIT_FLAG+mqMessage);return remove >0?true:false;}@Override@SneakyThrowspublicBooleantryLock(){boolean lock =false;//获得锁  
        lock = redisTemplate.opsForValue().setIfAbsent(DelayQueueConstant.LOCK_KEY , config.getDELY_QUEUE(),30,TimeUnit.SECONDS);return lock;}@Override@SneakyThrowspublicvoidreleaseLock(){  
        redisTemplate.delete(DelayQueueConstant.LOCK_KEY);}}

广播模式

这里列举两种实现思路:

解决思路1:
在Kafka中,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。不同消费组中的消费者消费时互不影响。 Kafka就是通过消费者组的方式来实现P2P模式和广播模式。

基于以上前提,有两种实现方式:

1.在消费者 @KafkaListener注解中,利用Spring EL 表达式,动态的赋予全局唯一的GroupID。
2.利用kafka监听工厂,初始化消费者工厂配置,动态的赋予全局唯一的GroupID。

这样就能保证每个项目启动时消费者分组不同,从而达到广播消费的目的。
Kafka消息广播和单播

解决思路2:
由于我们的公共支付服务本身是提供给内部机构进行使用,本身业务范围和规模不大。Jeepay的分账、转账等功能并不在我们的使用范围内。

通过源码得知,涉及到消息广播的就只有:更新系统配置参数需要改造。既然是更新系统配置参数,那索性就把这一部分改成数据库操作。

考虑到时间成本的原因,这样的改造是最快能看到效果的。如果你的项目要求时间紧,任务重,且后续业务迭代速度不快,思路2也不失为一个最优的解决方案。

代码实现

这里我就贴一下在解决思路1中提到的,实现Kafka消费者的广播模式的核心代码,以供参考。

Spring-kafka 动态赋予groupId,实现kafka的广播模式:

packagecom.kearey.boot.mq.conf;importcom.kearey.boot.cache.delay.InitDelayQueue;importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.config.KafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.kafka.core.DefaultKafkaConsumerFactory;importorg.springframework.kafka.listener.ConcurrentMessageListenerContainer;importorg.springframework.stereotype.Component;importjava.util.HashMap;importjava.util.Map;/**  
 * @author kearey 
 * @description: Kafka动态设置消费组GroupID  
 * @date 2023年05月13日  
 * @version: 1.0  
 */@Slf4j@ComponentpublicclassKafkaConsumerConfig{@Value("${spring.kafka.bootstrap-servers}")privateString BROKERS;@Value("${spring.kafka.consumer.enable-auto-commit}")privateBoolean ENABLE_AUTO_COMMIT;@Value("${spring.kafka.consumer.auto-commit-interval-ms}")privateString AUTO_COMMIT_INTERVAL_MS;@Value("${spring.kafka.consumer.auto-offset-reset}")privateString AUTO_OFFSET_RESET;@Value("${spring.kafka.consumer.max-poll-records}")privateString MAX_POLL_RECORDS;privateString CURRENT_INSTANCE_GROUP_ID;@AutowiredInitDelayQueue config;/**构建kafka监听工厂*/@BeanpublicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>>kafkaListenerContainerFactory()throwsException{ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<String,String>();  
        factory.setConsumerFactory(consumerFactory());return factory;}/**初始化消费工厂配置,动态指定消费分组*/privateConsumerFactory<String,String>consumerFactory()throwsException{Map<String,Object> properties =newHashMap<String,Object>();  
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);  
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT);  
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, AUTO_COMMIT_INTERVAL_MS);  
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);  
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);  
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);/** 依照服务实例维度,根据延迟队列名称来命名GroupID */  
        CURRENT_INSTANCE_GROUP_ID = config.getDELY_QUEUE();  
        log.info("当前实例kafka分组id---{}",CURRENT_INSTANCE_GROUP_ID);  
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, CURRENT_INSTANCE_GROUP_ID);  
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);returnnewDefaultKafkaConsumerFactory<String,String>(properties);}}

消息可靠性

由于我们的公共支付服务本身是提供给内部机构进行使用,业务范围和规模不大。

Jeepay的分账、转账等功能并不在我们的使用范围内。

综合考虑,kafka本身的消息可靠性是可以满足我们自身的需要,并不需要额外的进行扩展和增强。

Jeepay集成Kafka

Jeepay的MQ框架的类图如下图所示,可以从红色部分开始,按照箭头到黄色的MQ集成扩展模块。按照顺序进行梳理学习。

红色和黄色部分是基于绿色部分的MQ框架进行开发的(绿色部分不变)。总体的脉络还是比较清晰易懂的。
Jeepay-mq-kafka

MQ 厂商定义类:

publicclassMQVenderCS{publicstaticfinalString YML_VENDER_KEY ="component.mq.vender";publicstaticfinalString ACTIVE_MQ ="activeMQ";publicstaticfinalString RABBIT_MQ ="rabbitMQ";publicstaticfinalString ROCKET_MQ ="rocketMQ";publicstaticfinalString ALIYUN_ROCKET_MQ ="aliYunRocketMQ";// 额外增加Kafka的扩展publicstaticfinalString KAFKA ="kafka";}

配置文件的配置,修改vender为kafka:

#系统业务参数
isys:...
  mq:
    vender: kafka

Kafka的生产者和消费者改造起来也很简单,复制并修改@ConditionalOnProperty的havingValue即可:

@Slf4j@Component@ConditionalOnProperty(name =MQVenderCS.YML_VENDER_KEY, havingValue =MQVenderCS.KAFKA)publicclassKafkaSenderimplementsIMQSender{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;@AutowiredKafkaDelayQueue cacheQueue;@Overridepublicvoidsend(AbstractMQ mqModel){
        kafkaTemplate.send(mqModel.getMQName(),mqModel.toMessage());}@Overridepublicvoidsend(AbstractMQ mqModel,int delay){KafkaDelayQueueInfo info =newKafkaDelayQueueInfo();
        info.setMQTopic(mqModel.getMQName());
        info.setMQMessage(mqModel.toMessage());// 发送延迟消息
        cacheQueue.pushMessage(info, delay);}/**
     * 延迟消息到期后进行的第二次发送
     * @param info
     */publicvoidsendCache(ADelayQueueInfo info){
        log.info("====== 时间到期,准备第二次发送延迟消息 =====");
        kafkaTemplate.send(info.getMQTopic(),info.getMQMessage());}}

消费者也同样的修改@ConditionalOnProperty的havingValue即可:

@Slf4j@Component@ConditionalOnProperty(name =MQVenderCS.YML_VENDER_KEY, havingValue =MQVenderCS.KAFKA)@ConditionalOnBean(BusnQueueModel.IMQReceiver.class)publicclassBusnKafkaQueueReceiverimplementsIMQMsgReceiver{@AutowiredprivateBusnQueueModel.IMQReceiver mqReceiver;/** 接收 【 queue 】 类型的消息 **/@Override@KafkaListener(topics ={BusnQueueModel.MQ_NAME})publicvoidreceiveMsg(String msg){
        log.info("=========== BusnQueueModel 接收到待消费的消息 === "+msg+" ================");
        mqReceiver.receive(BusnQueueModel.parse(msg));}}

源码仓库

详细完整的代码已经上传仓库,想要进一步交流的同学可以去到:
Jeepay集成kafka项目源码

标签: kafka java 分布式

本文转载自: https://blog.csdn.net/zk616123/article/details/130693874
版权归原作者 Kearey. 所有, 如有侵权,请联系我们删除。

“【Jeepay】02-Kafka实现延迟消息与广播模式详细设计”的评论:

还没有评论