0


Kafka重复消费以及消费线程安全关闭的解决方案

背景和原因分析

Kafka消费程序每次重启都会出现重复消费的情况,考虑是在kill掉程序的时候,有部分消费完的数据没有提交offsect。

props.setProperty("enable.auto.commit","true");

此处表明自动提交,即延迟提交(poll的时候会根据配置的自动提交时间间隔去进行检测并提交)。当kill掉程序的时候,可能消费完的数据还没有到达提交的时间点程序就被kill掉了。

重复消费解决方案:

关闭自动提交,采用异步提交+同步提交的方式来提交offsect。

// 关闭自动提交
props.setProperty("enable.auto.commit","false");// 消费逻辑try{while(true){ConsumerRecords<String,byte[]> records = consumer.poll(Duration.ofMillis(100));for(ConsumerRecord<String,byte[]>record: records){// 具体业务逻辑}
        consumer.commitAsync();}System.out.println("while end.");}catch(Exception e){System.err.println("consume error..."+ e.getMessage());}finally{try{
        consumer.commitSync();System.out.println("commit sync suc.");}catch(Exception e){System.err.println("commit sync error."+ e.getMessage());}finally{
        consumer.close();System.out.println("close.");}}

这样还不够,当kill掉程序的时候,会发现并没有走到finally中。说明线程非正常停止。

线程安全关闭解决方案:

1.使用线程池来运行线程
2.在实例销毁前使用结束标志手动停止线程
3.使用CountDownLatch等待线程停止

第一步:定义线程池

@BeanpublicThreadPoolTaskExecutorthreadPoolTaskExecutor(){int cpuCoreNum =Runtime.getRuntime().availableProcessors();ThreadPoolTaskExecutor threadPoolTaskExecutor =newThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(cpuCoreNum);
    threadPoolTaskExecutor.setMaxPoolSize(cpuCoreNum *2);
    threadPoolTaskExecutor.setQueueCapacity(2000);
    threadPoolTaskExecutor.setKeepAliveSeconds(60);
    threadPoolTaskExecutor.setThreadNamePrefix("global_thread_pool_task_executor");
    threadPoolTaskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.DiscardOldestPolicy());
    threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    threadPoolTaskExecutor.setAwaitTerminationSeconds(10);// 确保该值是线程池中各个线程阻塞的最大时长
    threadPoolTaskExecutor.initialize();return threadPoolTaskExecutor;}

此处两个配置参数至关重要
setWaitForTasksToCompleteOnShutdown(true)表示等待正在进行和排队的任务完成。
threadPoolTaskExecutor.setAwaitTerminationSeconds(10)虽然我们已经配置为等待正在进行和排队的任务完成,但Spring仍然会继续关闭容器的其余部分。这可能会释放任务执行器所需的资源,并导致任务失败。配置这个最大等待时间可以确保在指定的时间段内,容器级别的关闭过程将被阻止。
等待时间设置多少具体看线程池中业务线程最大耗时来定。
如果不停止线程,就会超过线程池的等待时间。通过以下WARN日志可以发现,在停止线程池的时候仍然存在业务线程没有停掉的情况,所以还需要定义一个标志来手动停止线程。

WARN 11472 ---[extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Timed out while waiting for executor 'threadPoolTaskExecutor' to terminate

第二步:定义结束标志,并在对象销毁前停止线程

// 线程中断标志publicvolatileboolean running =true;while(running){...}

然后再实现DisposableBean接口中的destroy方法,在实例销毁之前将running置为false停止线程

@Overridepublicvoiddestroy()throwsException{this.running =false;// 循环并非立即停止,而是等到当前执行的循环体执行结束才会停止,所以这个地方的等待时间需要与线程池中的setAwaitTerminationSeconds参数相对应}

当destroy方法运行结束,系统就会销毁掉当前实例,接着就会开始销毁当前实例的依赖(没有被其它实例所引用的话),而此时需要注意的是线程其实并没有运行结束。所以问题出现了:线程还在运行中,而运行所需要的资源(比如Redis连接资源)被提前关闭了,就会导致异常出现。所以在将running置为false之后还需要使用CountDwonLatch等待线程结束,再接着销毁其它依赖。
此处省略第三步,直接上完整的样例代码:

@ComponentpublicclassConsumerClosedSafelyimplementsCommandLineRunner,DisposableBean{privatevolatileboolean running =true;privatefinalCountDownLatch latch =newCountDownLatch(1);privatefinalString[] topics =newString[]{"test"};@AutowiredprivateThreadPoolTaskExecutor threadPoolTaskExecutor;publicvoidconsume()throwsException{Properties props =newProperties();//TODO 其它属性
        props.setProperty("enable.auto.commit","false");KafkaConsumer<String,String> consumer =newKafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topics));// 消费逻辑try{while(running){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));for(ConsumerRecord<String,String>record: records){//TODO 具体业务逻辑}
                consumer.commitAsync();}System.out.println("while end.");}catch(Exception e){System.err.println("consume error..."+ e.getMessage());}finally{try{
                consumer.commitSync();System.out.println("commit sync suc.");}catch(Exception e){System.err.println("commit sync error."+ e.getMessage());}finally{
                consumer.close();System.out.println("close.");// 计数器减一
                latch.countDown();System.out.println("latch count down .");}}}@Overridepublicvoidrun(String... args)throwsException{Runnable r =()->{try{consume();}catch(Exception e){System.exit(1);}};
        threadPoolTaskExecutor.execute(r);}@Overridepublicvoiddestroy()throwsException{// 终止循环this.running=false;// 等待运行结束
        latch.await();}}
标签: kafka java

本文转载自: https://blog.csdn.net/weixin_43932590/article/details/128865349
版权归原作者 燎原君 所有, 如有侵权,请联系我们删除。

“Kafka重复消费以及消费线程安全关闭的解决方案”的评论:

还没有评论