0


记一次RocketMQConsumer 服务关闭出现InterruptException异常

记一次RocketMQConsumer 服务关闭出现InterruptException异常

背景提要

出现问题主要还是版本升级

  1. 老版本核心rocketmq依赖<dependency><groupId>org.apache.rocketmq</groupId><artifactId>spring-boot-starter-rocketmq</artifactId><version>${vesion}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.3.2</version></dependency>
  2. 新版本核心rocketmq依赖<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.2</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.1</version></dependency>

java.lang.InterruptedException

简单列举一个InterruptedException

java.sql.SQLException: interrupt
    at com.alibaba.druid.pool.DruidDataSource.getConnectionInternal(DruidDataSource.java:1430)~[druid-1.1.12.jar!/:1.1.12]
    at com.alibaba.druid.pool.DruidDataSource.getConnectionDirect(DruidDataSource.java:1272)~[druid-1.1.12.jar!/:1.1.12]
    at com.alibaba.druid.filter.FilterChainImpl.dataSource_connect(FilterChainImpl.java:5007)~[druid-1.1.12.jar!/:1.1.12]
    at com.alibaba.druid.filter.FilterAdapter.dataSource_getConnection(FilterAdapter.java:2745)~[druid-1.1.12.jar!/:1.1.12]
    at com.alibaba.druid.filter.FilterChainImpl.dataSource_connect(FilterChainImpl.java:5003)~[druid-1.1.12.jar!/:1.1.12]
    at com.alibaba.druid.filter.stat.StatFilter.dataSource_getConnection(StatFilter.java:680)~[druid-1.1.12.jar!/:1.1.12]
    at com.alibaba.druid.filter.FilterChainImpl.dataSource_connect(FilterChainImpl.java:5003)~[druid-1.1.12.jar!/:1.1.12]
    at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:1250)~[druid-1.1.12.jar!/:1.1.12]
    at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:1242)~[druid-1.1.12.jar!/:1.1.12]
    at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:89)~[druid-1.1.12.jar!/:1.1.12]// 省略部分堆栈信息
    at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.handleMessage(DefaultRocketMQListenerContainer.java:399)[rocketmq-spring-boot-2.2.1.jar!/:2.2.1]
    at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.access$100(DefaultRocketMQListenerContainer.java:71)[rocketmq-spring-boot-2.2.1.jar!/:2.2.1]
    at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:359)[rocketmq-spring-boot-2.2.1.jar!/:2.2.1]
    at cn.techwolf.trace.rocketmq.spring.TracingMessageListenerConcurrently.consumeMessage(TracingMessageListenerConcurrently.java:37)[instrument-rocketmq-spring-1.101.jar!/:1.101]
    at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:392)[rocketmq-client-4.9.2.jar!/:4.9.2]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)[?:1.8.0_202]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)[?:1.8.0_202]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[?:1.8.0_202]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[?:1.8.0_202]
    at java.lang.Thread.run(Thread.java:748)[?:1.8.0_202]Caused by:java.lang.InterruptedException

注意

以下分析仅为个人观点,并不一定正确,如有杠精,请勿继续观看,欢迎留言讨论哈

提示:本文涉及的一些类,在 记一次RocketMQ服务启动时 NullPointerException问题 本文不做一些详细解释

spring关闭&rocketmq关闭时机(穿插 并不重要)

  1. spring在这里插入图片描述
  2. rocketmq 继承了SmartCycle 会在容器关闭的时候 回调其stop方法

rocketmq shutdown 分析

首先确认我们的RocketMQConsumer 的实现

DefaultRocketMQListenerContainer

:

// 类全路径org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainerpublicclassDefaultRocketMQListenerContainerimplementsInitializingBean,RocketMQListenerContainer,SmartLifecycle,ApplicationContextAware{privateDefaultMQPushConsumer consumer;@Overridepublicvoiddestroy(){// DisposableBean 回调 销毁bean的时候 调用this.setRunning(false);if(Objects.nonNull(consumer)){
            consumer.shutdown();}
        log.info("container destroyed, {}",this.toString());}@Overridepublicvoidstop(){// SmartLifecycle 回调 关闭容器前回调if(this.isRunning()){if(Objects.nonNull(consumer)){
                consumer.shutdown();}setRunning(false);}}}

可以看到关闭的话 是先调用到

DefaultRocketMQListenerContainer.stop

方法, 接下来就是看看

consumer.shutdown()

方法了:

// 类全路径 org.apache.rocketmq.client.consumer.DefaultMQPushConsumerpublicclassDefaultMQPushConsumerextendsClientConfigimplementsMQPushConsumer{protectedfinaltransientDefaultMQPushConsumerImpl defaultMQPushConsumerImpl;/**
     * Maximum time to await message consuming when shutdown consumer, 0 indicates no await.
     */privatelong awaitTerminationMillisWhenShutdown =0;@Overridepublicvoidshutdown(){this.defaultMQPushConsumerImpl.shutdown(awaitTerminationMillisWhenShutdown);if(null!= traceDispatcher){
            traceDispatcher.shutdown();}}}// 类全路径 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImplpublicclassDefaultMQPushConsumerImplimplementsMQConsumerInner{privateConsumeMessageService consumeMessageService;publicsynchronizedvoidshutdown(long awaitTerminateMillis){switch(this.serviceState){case CREATE_JUST:break;case RUNNING:this.consumeMessageService.shutdown(awaitTerminateMillis);this.persistConsumerOffset();this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());this.mQClientFactory.shutdown();
                log.info("the consumer [{}] shutdown OK",this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.destroy();this.serviceState =ServiceState.SHUTDOWN_ALREADY;break;case SHUTDOWN_ALREADY:break;default:break;}}}// ConsumeMessageService 有两个实现类 分别是 ConsumeMessageConcurrentlyService 和 ConsumeMessageOrderlyService// 我们使用的 consumeMessageService 的具体实现类是 ConsumeMessageConcurrentlyService 具体问题具体分析哈 // 类全路径 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyServicepublicclassConsumeMessageConcurrentlyServiceimplementsConsumeMessageService{privatefinalThreadPoolExecutor consumeExecutor;publicvoidshutdown(long awaitTerminateMillis){this.scheduledExecutorService.shutdown();ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis,TimeUnit.MILLISECONDS);this.cleanExpireMsgExecutors.shutdown();}}// 类全路径org.apache.rocketmq.common.utils.ThreadUtilspublicfinalclassThreadUtils{publicstaticvoidshutdownGracefully(ExecutorService executor,long timeout,TimeUnit timeUnit){// Disable new tasks from being submitted.
        executor.shutdown();try{// Wait a while for existing tasks to terminate.if(!executor.awaitTermination(timeout, timeUnit)){// 注意这里
                executor.shutdownNow();// Wait a while for tasks to respond to being cancelled.if(!executor.awaitTermination(timeout, timeUnit)){
                    log.warn(String.format("%s didn't terminate!", executor));}}}catch(InterruptedException ie){// (Re-)Cancel if current thread also interrupted.
            executor.shutdownNow();// Preserve interrupt status.Thread.currentThread().interrupt();}}}

从上面我们可以清楚的看到 执行到

ConsumeMessageConcurrentlyService.shutdown

的时候,

awaitTerminateMillis

默认值是0, 执行到

ThreadUtils.shutdownGracefully

时,会直接调用

shutdonwNow

,并没有等待

shutdonw 和 shutdownNow 的区别 百度搜一搜就行了

  • shutdown => 平缓关闭,等待所有已添加到线程池中的任务执行完在关闭
  • shutdownNow => 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务

对比以前代码 rocketmq-client 4.3.2

publicclassConsumeMessageConcurrentlyServiceimplementsConsumeMessageService{privatefinalThreadPoolExecutor consumeExecutor;publicvoidshutdown(long awaitTerminateMillis){this.scheduledExecutorService.shutdown();this.consumeExecutor.shutdown();// 这里this.cleanExpireMsgExecutors.shutdown();}}

故 我认为是,是因为直接调用了

shutdonwNow

导致服务关闭的时候出现中断 InterruptException 异常

解决方案

从上述分析可以知道 是因为直接调用shutdownNow 导致的,我们应该可以调整

awaitTerminateMillis

参数,也就是

DefaultMQPushConsumer.awaitTerminationMillisWhenShutdown

参数

而目前我似乎没看到有哪种方式支持全局配置该参数的方式(官方方式) 这里提供俩思路

RocketMQPushConsumerLifecycleListener

or

RocketMQPushConsumerLifecycleListener
// org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainerpublicclassDefaultRocketMQListenerContainerimplementsInitializingBean,RocketMQListenerContainer,SmartLifecycle,ApplicationContextAware{@OverridepublicvoidafterPropertiesSet()throwsException{initRocketMQPushConsumer();this.messageType =getMessageType();this.methodParameter =getMethodParameter();
        log.debug("RocketMQ messageType: {}", messageType);}privatevoidinitRocketMQPushConsumer()throwsMQClientException{// 省略部分代码if(Objects.nonNull(rpcHook)){
            consumer =newDefaultMQPushConsumer(consumerGroup, rpcHook,newAllocateMessageQueueAveragely(),
                enableMsgTrace,this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
            consumer.setVipChannelEnabled(false);}else{
            log.debug("Access-key or secret-key not configure in "+this+".");
            consumer =newDefaultMQPushConsumer(consumerGroup, enableMsgTrace,this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));}// 省略部分consumer 参数配置代码// 可以看到这 他会回调 prepareStart 方法if(rocketMQListener instanceofRocketMQPushConsumerLifecycleListener){((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);}elseif(rocketMQReplyListener instanceofRocketMQPushConsumerLifecycleListener){((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);}}}

可以看到 在

DefaultRocketMQListenerContainer

初始化完后会回调

RocketMQPushConsumerLifecycleListener、RocketMQPushConsumerLifecycleListener

prepareStart

方法

那就很简单了

@Service@RocketMQMessageListener(nameServer ="${spring.rocketmq.nameServer}",
        topic ="${topic}",
        consumerGroup ="${group}")publicclassTestConsumerimplementsRocketMQListener<String>,RocketMQPushConsumerLifecycleListener{@OverridepublicvoidonMessage(String msg){// do something}@OverridepublicvoidprepareStart(DefaultMQPushConsumer consumer){
        consumer.setAwaitTerminationMillisWhenShutdown(1000);// 设置}}

可以抽象成一个通用类 consumer 继承该类就行了

DefaultRocketMQListenerContainer.getConsumer
DefaultRocketMQListenerContainer

会被注册成bean 具体的实现在

org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration

中,这里就不分析了,我们可以尝试获取所有的

DefaultRocketMQListenerContainer

然后调用其

getConsumer

方法 示例代码 具体怎么触发这个代码 就得自己思考和完善了哈 本文不做解释了

publicvoidset(){List<DefaultRocketMQListenerContainer> containers =getAllBeans();for(DefaultRocketMQListenerContainer c : containers){
        c.getConsumer().setAwaitTerminationMillisWhenShutdown(1000);}}

最后,以上仅为本人分析,并不一定正确,用第一种方式确实没出现了InterruptException了,不知道是否是偶然;

欢迎留言讨论

标签: java apache spring boot

本文转载自: https://blog.csdn.net/jun8148/article/details/123211378
版权归原作者 谁动了我的小老弟 所有, 如有侵权,请联系我们删除。

“记一次RocketMQConsumer 服务关闭出现InterruptException异常”的评论:

还没有评论