0


springboot kafka 实现延时队列

好文推荐:
2.5万字详解23种设计模式
基于Netty搭建websocket集群实现服务器消息推送
2.5万字讲解DDD领域驱动设计

文章目录

在这里插入图片描述

一、延时队列定义

延时队列:是一种消息队列,可以用于在指定时间或经过一定时间后执行某种操作。

小编已经做好了Kafka延时队列的封装,以后只需要一行代码就可以实现kafka延时队列了,代码中有详细注释,完整代码已经给大家整理好了,领取方式放在了文章末。

二、应用场景

1,订单超时自动取消:用户下单后,如果在指定时间内未完成支付,系统会自动取消订单,释放库存。
2,定时推送:比如消息通知,用户预约某个服务,系统会在服务开始前一定时间发送提醒短信。
3,定时任务:将需要定时执行的任务放入延时队列中,等到指定的时间到达时再进行执行,例如生成报表、统计数据等操作。
4,限时抢购:将限时抢购的结束时间放入延时队列中,当时间到达时自动下架商品。

三、技术实现方案:

1. Redis

1.1 优点:
①Redis的延迟队列是基于Redis的sorted set实现的,性能较高。
②Redis的延迟队列可以通过TTL设置过期时间,灵活性较高。
③简单易用,适用于小型系统。
④性能较高,支持高并发。

1.2 缺点:
①可靠性相对较低,可能会丢失消息,就算redis最高级别的持久化也是有可能丢一条的,每次请求都做aof,但是aof是异步的,所以不保证这一条操作能被持久化。
②而且Redis持久化的特性也导致其在数据量较大时,存储和查询效率逐渐降低,此时会需要对其进行分片和负载均衡。
③Redis的延迟队列需要手动实现消息重试机制,更严谨的消息队列需要数据库兜底。

1.3 应用场景:
①适用于较小规模的系统,实时性要求较高的场景。
②适用于轻量级的任务调度和消息通知场景,适合短期延迟任务,不适合长期任务,例如订单超时未支付等。

2. Kafka

2.1 优点:
①Kafka的优点在于其高并发、高吞吐量和可扩展性强,同时支持分片。
②可靠性高,支持分布式和消息持久化。
③消费者可以随时回溯消费。
④支持多个消费者并行消费、消费者组等机制。

2.2 缺点:
①没有原生的延迟队列功能,需要使用topic和消费者组来实现,实现延迟队列需要额外的开发工作。
②消费者需要主动拉取数据,可能会导致延迟,精度不是特别高。
在此案例中代码已经实现了,直接拿来使用就可以了。

2.3 应用场景:
适用于大规模的数据处理,实时性要求较高的,高吞吐量的消息处理场景。

3. RabbitMQ

3.1 优点:
①RabbitMQ的延迟队列是通过RabbitMQ的插件实现的,易于部署和使用。
②RabbitMQ的延迟队列支持消息重试和消息顺序处理,可靠性较高。
③支持消息持久化和分布式。
④支持优先级队列和死信队列。
⑤提供了丰富的插件和工具。

3.2 缺点:
①RabbitMQ的延迟队列性能较低,不适用于高吞吐量的场景。
②性能较低,不适合高并发场景。
③实现延迟队列需要额外的配置,但是配置就很简单了。

3.3应用场景:
适用于中小型的任务调度和消息通知,对可靠性要求高的场景。

4. RocketMQ

4.1 优点:
①RocketMQ的延迟队列是RocketMQ原生支持的,易于使用和部署。
②RocketMQ的延迟队列支持消息重试和消息顺序处理,可靠性较高。
③高性能和高吞吐量,支持分布式和消息持久化。
④RocketMQ使用简单,性能好,并且支持延迟队列功能。

4.2 缺点:
①RocketMQ的延迟队列不支持动态添加或删除队列。
②RocketMQ的延迟队列需要保证消息的顺序,可能会导致消息延迟。
③在节点崩溃后,RocketMQ有可能发生消息丢失。

4.3 应用场景:
①适用于大规模的数据处理,对性能和吞吐量要求较高的场景。
②适合于任务量较大、需要延迟消息和定时消息的场景。例如电商平台、社交软件等。
③适用于分布式任务调度和高可靠性消息通知场景。

四、Kafka延时队列背景

  1. 基于以上四种实现延时队列的分析来,选择对应的技术方案的基础上呢,不同公司的mq的基础设施不同,如果只有Kafka,也没必要引入RabbitMQ和RocketMq来实现,引入新的组件也会顺便带来新的问题。
  2. 网上搜Kafka实现延时队列有很多文章,很多文章说使用Kafka内部的时间轮,支持延时操作,但这是Kafka自己内部使用的,时间轮只是一个工具类,用户无法将其作为延迟队列来使用。
  3. Kafka延时队列的最佳实践,使用Kafka消费者的暂停和恢复机制来实现

五、Kafka延时队列实现思路

  1. 解决一个问题前首先要明确问题,如何让Kafka有延时队列的功能呢?
  2. 就是在Kafka消费者消费的时候延时消费,不久搞定了嘛
  3. 那如何延时消费呢,网上有些文章使用Thread.sleep进行延时消费这是不靠谱的(亲身实践),sleep的时间超过了Kafka配置的max.poll.records时间,消费者无法及时提交offset,kafka就会认为这个消费者已经挂了,会进行rebalance也就是重新分配分区给消费者,以保证每个分区只被一个消费者消费
  4. 也有同学说了,为了不发生rebalance,那可以增加max.poll.records时间啊,但是这样的话,如果要sleep几天的时间,难道max.poll.records要写几天的时间嘛,有违Kafka的设计原理了,那怎么办呢?
  5. 这时候Kafka的pause暂停消费和resume恢复消费就登场了,pause暂停某个分区之后消费者不会再poll拉取该分区的消息,直到resume恢复该分区之后才会重新poll消息。
  6. 我已经做好了Kafka延时队列的封装,以后只需要一行代码就可以实现延时队列了

六、Kafka延时队列架构图

在这里插入图片描述

七、kafka延时任务代码实现

以下代码只列出了核心实现,完整代码已经给大家整理好了,可以关注【微信公众号】微信搜索【老板来一杯java】,然后【加群】直接获取源码。
源码目录:
在这里插入图片描述

1. KafkaSyncConsumer:Kafka消费者

该类封装了一个线程安全的KafkaConsumer,因为原生的 KafkaConsumer是不支持线程共享的,直接使用会报错:java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

packagecom.wdyin.kafka.delay;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.consumer.OffsetAndMetadata;importorg.apache.kafka.common.TopicPartition;importjava.time.Duration;importjava.util.Collections;importjava.util.Map;importjava.util.Properties;importjava.util.Set;/**
 * Kafka同步消费者
 * @author WDYin
 * @date 2023/4/14
 **/publicclassKafkaSyncConsumer<K,V>extendsKafkaConsumer<K,V>{KafkaSyncConsumer(Properties properties){super(properties);}@OverridepublicsynchronizedConsumerRecords<K,V>poll(Duration timeout){returnsuper.poll(timeout);}@OverridepublicsynchronizedSet<TopicPartition>paused(){returnsuper.paused();}synchronizedvoidpauseAndSeek(TopicPartition partition,long offset){super.pause(Collections.singletonList(partition));super.seek(partition, offset);}@OverridepublicsynchronizedvoidcommitSync(Map<TopicPartition,OffsetAndMetadata> offsets){super.commitSync(offsets);}synchronizedvoidresume(TopicPartition topicPartition){super.resume(Collections.singleton(topicPartition));}@OverridepublicsynchronizedvoidcommitSync(){super.commitSync();}}

2. KafkaDelayQueue:Kafka延迟队列

定义一个Kafka延期队列,包含的内容:KafkaDelayQueue,其中有延迟队列配置,主题,消费组,延迟时间,目标主题,KafkaSyncConsumer,ApplicationContext,poll线程池,delay线程池等等

packagecom.wdyin.kafka.delay;importlombok.Getter;importlombok.Setter;importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.common.TopicPartition;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.context.ApplicationContext;importorg.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;importjava.time.Duration;importjava.util.Collections;importjava.util.concurrent.ThreadPoolExecutor;/**
 * kafka延时队列
 *
 * @Author WDYin
 * @Date 2022/7/2
 **/@Slf4j@Getter@SetterclassKafkaDelayQueue<K,V>{privateString topic;privateString group;privateInteger delayTime;privateString targetTopic;privateKafkaDelayConfig kafkaDelayConfig;privateKafkaSyncConsumer<K,V> kafkaSyncConsumer;privateApplicationContext applicationContext;privateThreadPoolTaskScheduler threadPoolPollTaskScheduler;privateThreadPoolTaskScheduler threadPoolDelayTaskScheduler;voidsend(){try{
            kafkaSyncConsumer.subscribe(Collections.singletonList(topic));this.threadPoolPollTaskScheduler
                    .scheduleWithFixedDelay(pollTask(), kafkaDelayConfig.getPollInterval());}catch(Exception e){
            log.error("KafkaDelayQueue subscribe error", e);}}privateKafkaPollTask<K,V>pollTask(){returnnewKafkaPollTask<>(this,Duration.ofMillis(kafkaDelayConfig.getPollTimeout()), delayTime, applicationContext);}KafkaDelayTask<K,V>delayTask(TopicPartition partition){returnnewKafkaDelayTask<>(kafkaSyncConsumer, partition);}@Slf4jprivatestaticclassKafkaPollTask<K,V>implementsRunnable{privateKafkaDelayQueue<K,V> kafkaDelayQueue;privateDuration timeout;privateInteger delayTime;privateApplicationContext applicationContext;KafkaPollTask(KafkaDelayQueue<K,V> kafkaDelayQueue,Duration timeout,Integer delayTime,ApplicationContext applicationContext){this.kafkaDelayQueue = kafkaDelayQueue;this.timeout = timeout;this.applicationContext = applicationContext;this.delayTime = delayTime;}@Overridepublicvoidrun(){try{ConsumerRecords<K,V> records = kafkaDelayQueue.getKafkaSyncConsumer().poll(timeout);
                applicationContext.publishEvent(newKafkaPollEvent<>(records, delayTime, kafkaDelayQueue));}catch(Exception e){
                log.error("KafkaDelayQueue consumer fail", e);}}}@Slf4jprivatestaticclassKafkaDelayTask<K,V>implementsRunnable{privateKafkaSyncConsumer<K,V> kafkaSyncConsumer;privateTopicPartition partition;privateKafkaDelayTask(KafkaSyncConsumer<K,V> kafkaSyncConsumer,TopicPartition partition){this.kafkaSyncConsumer = kafkaSyncConsumer;this.partition = partition;}@Overridepublicvoidrun(){try{
                kafkaSyncConsumer.resume(partition);}catch(Exception e){
                log.error("KafkaDelayQueue resume failed", e);}}}}

3. KafkaDelayQueueFactory:Kafka延迟队列工厂

Kafka延期队列的工厂,用于及其管理延迟队列

packagecom.wdyin.kafka.delay;importlombok.Data;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.springframework.context.ApplicationContext;importorg.springframework.util.Assert;importorg.springframework.util.StringUtils;importjava.util.Properties;/**
 * 延时队列工厂
 * @author WDYin
 * @date 2023/4/17
 **/@DatapublicclassKafkaDelayQueueFactory{privateKafkaDelayConfig kafkaDelayConfig;privateProperties properties;privateApplicationContext applicationContext;privateInteger concurrency;publicKafkaDelayQueueFactory(Properties properties,KafkaDelayConfig kafkaDelayConfig){Assert.notNull(properties,"properties cannot null");Assert.notNull(kafkaDelayConfig.getDelayThreadPool(),"delayThreadPool cannot null");Assert.notNull(kafkaDelayConfig.getPollThreadPool(),"pollThreadPool cannot null");Assert.notNull(kafkaDelayConfig.getPollInterval(),"pollInterval cannot null");Assert.notNull(kafkaDelayConfig.getPollTimeout(),"timeout cannot null");this.properties = properties;this.kafkaDelayConfig = kafkaDelayConfig;}publicvoidlistener(String topic,String group,Integer delayTime,String targetTopic){if(StringUtils.isEmpty(topic)){thrownewRuntimeException("topic cannot empty");}if(StringUtils.isEmpty(group)){thrownewRuntimeException("group cannot empty");}if(StringUtils.isEmpty(delayTime)){thrownewRuntimeException("delayTime cannot empty");}if(StringUtils.isEmpty(targetTopic)){thrownewRuntimeException("targetTopic cannot empty");}KafkaSyncConsumer<String,String> kafkaSyncConsumer =createKafkaSyncConsumer(group);KafkaDelayQueue<String,String> kafkaDelayQueue =createKafkaDelayQueue(topic, group, delayTime, targetTopic, kafkaSyncConsumer);
        kafkaDelayQueue.send();}privateKafkaDelayQueue<String,String>createKafkaDelayQueue(String topic,String group,Integer delayTime,String targetTopic,KafkaSyncConsumer<String,String> kafkaSyncConsumer){KafkaDelayQueue<String,String> kafkaDelayQueue =newKafkaDelayQueue<>(kafkaSyncConsumer, kafkaDelayConfig);Assert.notNull(applicationContext,"kafkaDelayQueue need applicationContext");
        kafkaDelayQueue.setApplicationContext(applicationContext);
        kafkaDelayQueue.setDelayTime(delayTime);
        kafkaDelayQueue.setTopic(topic);
        kafkaDelayQueue.setGroup(group);
        kafkaDelayQueue.setTargetTopic(targetTopic);return kafkaDelayQueue;}privateKafkaSyncConsumer<String,String>createKafkaSyncConsumer(String group){
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);returnnewKafkaSyncConsumer<>(properties);}}

4. KafkaPollListener:Kafka延迟队列事件监听

packagecom.wdyin.kafka.delay;importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.OffsetAndMetadata;importorg.apache.kafka.common.TopicPartition;importorg.springframework.context.ApplicationListener;importorg.springframework.kafka.core.KafkaTemplate;importjava.time.Instant;importjava.time.LocalDateTime;importjava.time.ZoneId;importjava.util.*;/**
 * 延时队列监听
 * @Author : WDYin
 * @Date : 2021/5/7
 * @Desc :
 */@Slf4jpublicclassKafkaPollListener<K,V>implementsApplicationListener<KafkaPollEvent<K,V>>{privateKafkaTemplate kafkaTemplate;publicKafkaPollListener(KafkaTemplate kafkaTemplate){this.kafkaTemplate = kafkaTemplate;}@OverridepublicvoidonApplicationEvent(KafkaPollEvent<K,V> event){ConsumerRecords<K,V> records =(ConsumerRecords<K,V>) event.getSource();Integer delayTime = event.getDelayTime();KafkaDelayQueue<K,V> kafkaDelayQueue = event.getKafkaDelayQueue();KafkaSyncConsumer<K,V> kafkaSyncConsumer = kafkaDelayQueue.getKafkaSyncConsumer();//1.获取poll到的有消息的分区Set<TopicPartition> partitions = records.partitions();//2.存储需要commit的消息,提高效率批量提交Map<TopicPartition,OffsetAndMetadata> commitMap =newHashMap<>();//3.遍历有消息的分区
        partitions.forEach((partition)->{List<ConsumerRecord<K,V>> consumerRecords = records.records(partition);//4.遍历分区里面的消息for(ConsumerRecord<K,V>record: consumerRecords){//5.获取消息创建时间long startTime =(record.timestamp()/1000)*1000;long endTime = startTime + delayTime;//6.不符合条件的分区暂停消费long now =System.currentTimeMillis();if(endTime > now){
                    kafkaSyncConsumer.pauseAndSeek(partition,record.offset());//7.使用 schedule()执行定时任务
                    kafkaDelayQueue.getThreadPoolPollTaskScheduler().schedule(kafkaDelayQueue.delayTask(partition),newDate(endTime));//无需继续消费该分区下的其他消息,直接消费其他分区break;}
                log.info("{}: partition:{}, offset:{}, key:{}, value:{}, messageDate:{}, nowDate:{}, messageDate:{}, nowDate:{}",Thread.currentThread().getName()+"#"+Thread.currentThread().getId(),record.topic()+"-"+record.partition(),record.offset(),record.key(),record.value(),LocalDateTime.ofInstant(Instant.ofEpochMilli(startTime),ZoneId.systemDefault()),LocalDateTime.now(), startTime,Instant.now().getEpochSecond());//发送目标主题
                kafkaTemplate.send(kafkaDelayQueue.getTargetTopic(),record.value());//更新需要commit的消息
                commitMap.put(partition,newOffsetAndMetadata(record.offset()+1));}});//8.批量提交,提高效率,commitSync耗时几百毫秒if(!commitMap.isEmpty()){
            kafkaSyncConsumer.commitSync(commitMap);}}}

5. KafkaConfig:Kafka配置文件

packagecom.wdyin.kafka.config;importcom.wdyin.kafka.delay.KafkaDelayConfig;importcom.wdyin.kafka.delay.KafkaDelayQueueFactory;importcom.wdyin.kafka.delay.KafkaPollListener;importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importorg.apache.kafka.common.serialization.StringSerializer;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.ApplicationContext;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.config.KafkaListenerContainerFactory;importorg.springframework.kafka.core.DefaultKafkaConsumerFactory;importorg.springframework.kafka.core.DefaultKafkaProducerFactory;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.listener.ConcurrentMessageListenerContainer;importorg.springframework.kafka.listener.ContainerProperties;importjavax.annotation.Resource;importjava.util.HashMap;importjava.util.Map;importjava.util.Properties;/**
 * @author WDYin
 * @date 2023/4/21
 **/@Configuration@Slf4jpublicclassKafkaConfig{@Value("${spring.kafka.consumer.bootstrap-servers}")privateString bootstrapServers;@ResourceprivateApplicationContext applicationContext;/**
     * 消费者参数配置
     * @param bootstrapServers
     * @param isAutoSubmit
     * @return
     */privateMap<String,Object>consumerProps(String bootstrapServers,Boolean isAutoSubmit){Map<String,Object> props =newHashMap<>();//kafka broker地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//取消自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, isAutoSubmit);//一次拉取消息数量,可根据实际情况自行调整
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"100");//序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");return props;}/**
     * spring生产者参数配置
     * @param bootstrapServer
     * @return
     */privateHashMap<String,Object>producerProps(String bootstrapServer){HashMap<String,Object> configProps =newHashMap<>();//broke地址
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);//序列化
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);//幂等发送给broker
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");//生产者时将多少数据累积到一个批次中,设置为0的目的提高实时性
        configProps.put(ProducerConfig.BATCH_SIZE_CONFIG,"0");return configProps;}/**
     * 用于spring的@listener注解进行消费,并非用于延时队列
     * @return
     */@Bean("kafkaContainerFactory")publicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>>kafkaContainerFactory(){ConcurrentKafkaListenerContainerFactory factory =newConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(newDefaultKafkaConsumerFactory<>(consumerProps(bootstrapServers,Boolean.FALSE)));//线程数为1
        factory.setConcurrency(1);//poll超时时间
        factory.getContainerProperties().setPollTimeout(1500L);//手动立即提交offset
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}/**
     * spring kafkaTemplate注册
     * @return
     */@BeanpublicKafkaTemplate<String,String>kafkaTemplate(){returnnewKafkaTemplate<>(newDefaultKafkaProducerFactory<>(producerProps(bootstrapServers)));}/**
     * 延时队列-Kafka同步消费者配置
     * @return
     */publicPropertieskafkaSyncConsumerProperties(){// Consumer的配置Properties properties =newProperties();// 服务地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// 关闭offset自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");// 消费者offset自动提交到Kafka的频率(以毫秒为单位)
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"10000");
        properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"15000");// KEY的反序列化器类
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");// VALUE的反序列化器类
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");return properties;}/**
     * 延时队列-注册延时队列工厂
     * @return
     */@BeanpublicKafkaDelayQueueFactorykafkaDelayQueueFactory(){KafkaDelayConfig kafkaDelayConfig =newKafkaDelayConfig();
        kafkaDelayConfig.setPollThreadPool(1);
        kafkaDelayConfig.setPollTimeout(50);
        kafkaDelayConfig.setPollInterval(50);
        kafkaDelayConfig.setDelayThreadPool(10);KafkaDelayQueueFactory kafkaDelayQueueFactory =newKafkaDelayQueueFactory(kafkaSyncConsumerProperties(), kafkaDelayConfig);
        kafkaDelayQueueFactory.setApplicationContext(applicationContext);return kafkaDelayQueueFactory;}/**
     * 延时队列-注册消费者poll监听器
     * @param kafkaTemplate
     * @return
     */@BeanpublicKafkaPollListenerkafkaPollListener(KafkaTemplate kafkaTemplate){returnnewKafkaPollListener<>(kafkaTemplate);}}

6. KafkaDelayApplication:Kafka延迟任务注册

一个延时主题对应一个延迟时间,后续有新的延迟任务只需要在此注册延迟任务的监听即可!

importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;importjavax.annotation.Resource;/**
 * @author WDYin
 * @date 2023/4/18
 **/@ComponentpublicclassKafkaDelayApplication{@ResourceprivateKafkaDelayQueueFactory kafkaDelayQueueFactory;@PostConstructpublicvoidinit(){//延迟30秒
        kafkaDelayQueueFactory.listener("delay-30-second-topic","delay-30-second-group",1*30*1000,"delay-60-second-target-topic");//延迟60秒
        kafkaDelayQueueFactory.listener("delay-60-second-topic","delay-60-second-group",1*60*1000,"delay-60-second-target-topic");//延迟30分钟
        kafkaDelayQueueFactory.listener("delay-30-minute-topic","delay-30-minute-group",30*60*1000,"delay-30-minute-target-topic");}}

九、测试

  1. 先往延时主题【delay-60-second-topic】发送一千条消息,一共10个分区,每个分区100条消息,消息时间是2023-04-21 16:37:26分,延迟消息消费时间就应该是2023-04-21 16:38:26在这里插入图片描述
  2. 延时队列进行消费:通过日志查看,消息日期和延迟队列消费消息时间正好相差一分钟在这里插入图片描述

10、总结

  1. 本案例已成功实现Kafka的延时队列,并进行实测,代码引入可用非常方便。
  2. Kafka实现的延时队列支持秒级别的延时任务,不支持毫秒级别,但是毫秒级别的延时任务也没有意义
  3. 注意一个主题对应的延时时间是一致的,不能在同一个主题里放不同时间的延时任务。
  4. 此方案的缺点就是,如果数据量很大,一定要保证Kafka的消费能力,否则可能会导致延迟,精度不是特别高,不过如果延迟小时级别的任务,差异个几秒种肯定可以接受的,一般场景肯定满足。
  5. 完整代码已经给大家整理好了,可以关注【微信公众号】微信搜索【老板来一杯java】,然后【加群】直接获取源码。

好文推荐:
2.5万字详解23种设计模式
微服务springcloud环境下基于Netty搭建websocket集群实现服务器消息推送----netty是yyds
2.5万字讲解DDD领域驱动设计

标签: kafka java 分布式

本文转载自: https://blog.csdn.net/qq_41889508/article/details/130259348
版权归原作者 王德印 所有, 如有侵权,请联系我们删除。

“springboot kafka 实现延时队列”的评论:

还没有评论