0


RabbitMq Consumer thread error, thread abort.异常导致服务关闭问题

问题描述

在使用rabbitMq消费者使用simple模式进行监听时,服务突然自动关闭,事前没有任何的cpu或者内存的报警。
查看关闭服务前的日志发现OOM异常
Consumer thread error, thread abort.
image.png
但是一个异常为什么会导致服务关闭呢?
开始看到OOM,我就想着启动参数上加了当发生OOM时生成堆的dump文件,然而查看文件目录,发现并没有看到生成的堆dump文件,这就十分奇怪

问题分析

后仔细看了报错日志
报错位置是org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run

@Override// NOSONAR - complexity - many catch blockspublicvoidrun(){// NOSONAR - line countif(!isActive()){return;}boolean aborted =false;this.consumer.setLocallyTransacted(isChannelLocallyTransacted());String routingLookupKey =getRoutingLookupKey();if(routingLookupKey !=null){SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey);// NOSONAR both never null}if(this.consumer.getQueueCount()<1){if(logger.isDebugEnabled()){
                    logger.debug("Consumer stopping; no queues for "+this.consumer);}SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);if(getApplicationEventPublisher()!=null){getApplicationEventPublisher().publishEvent(newAsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this,this.consumer));}this.start.countDown();return;}try{initialize();while(isActive(this.consumer)||this.consumer.hasDelivery()||!this.consumer.cancelled()){mainLoop();}}catch(InterruptedException e){
                logger.debug("Consumer thread interrupted, processing stopped.");Thread.currentThread().interrupt();
                aborted =true;publishConsumerFailedEvent("Consumer thread interrupted, processing stopped",true, e);}catch(QueuesNotAvailableException ex){
                logger.error("Consumer threw missing queues exception, fatal="+isMissingQueuesFatal(), ex);if(isMissingQueuesFatal()){this.startupException = ex;// Fatal, but no point re-throwing, so just abort.
                    aborted =true;}publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);}catch(FatalListenerStartupException ex){
                logger.error("Consumer received fatal exception on startup", ex);this.startupException = ex;// Fatal, but no point re-throwing, so just abort.
                aborted =true;publishConsumerFailedEvent("Consumer received fatal exception on startup",true, ex);}catch(FatalListenerExecutionException ex){// NOSONAR exception as flow control
                logger.error("Consumer received fatal exception during processing", ex);// Fatal, but no point re-throwing, so just abort.
                aborted =true;publishConsumerFailedEvent("Consumer received fatal exception during processing",true, ex);}catch(PossibleAuthenticationFailureException ex){
                logger.error("Consumer received fatal="+isPossibleAuthenticationFailureFatal()+" exception during processing", ex);if(isPossibleAuthenticationFailureFatal()){this.startupException =newFatalListenerStartupException("Authentication failure",newAmqpAuthenticationException(ex));// Fatal, but no point re-throwing, so just abort.
                    aborted =true;}publishConsumerFailedEvent("Consumer received PossibleAuthenticationFailure during startup", aborted, ex);}catch(ShutdownSignalException e){if(RabbitUtils.isNormalShutdown(e)){if(logger.isDebugEnabled()){
                        logger.debug("Consumer received Shutdown Signal, processing stopped: "+ e.getMessage());}}else{logConsumerException(e);}}catch(AmqpIOException e){if(e.getCause()instanceofIOException&& e.getCause().getCause()instanceofShutdownSignalException&& e.getCause().getCause().getMessage().contains("in exclusive use")){getExclusiveConsumerExceptionLogger().log(logger,"Exclusive consumer failure", e.getCause().getCause());publishConsumerFailedEvent("Consumer raised exception, attempting restart",false, e);}else{logConsumerException(e);}}catch(Error e){//NOSONAR
                logger.error("Consumer thread error, thread abort.", e);publishConsumerFailedEvent("Consumer threw an Error",true, e);getJavaLangErrorHandler().handle(e);
                aborted =true;}catch(Throwable t){//NOSONAR// by now, it must be an exceptionif(isActive()){logConsumerException(t);}}finally{if(getTransactionManager()!=null){ConsumerChannelRegistry.unRegisterConsumerChannel();}}// In all cases count down to allow container to progress beyond startupthis.start.countDown();killOrRestart(aborted);if(routingLookupKey !=null){SimpleResourceHolder.unbind(getRoutingConnectionFactory());// NOSONAR never null here}}

辛亏之前度过rabbitMq监听的相关源码,我知道这是消费者线程执行开始执行的地方,在mainLoop中循环消费消息。如果消费发生异常抛出,那么被catch住后会发布事件publishConsumerFailedEvent
spring在处理这个事件后会进行服务的关闭。
原来这里的OOM是调用别的服务,别的服务抛出的。这个异常又没有捕获,抛到SimpleMessageListenerContainer中的run方法触发了关闭服务的事件。
查看了监听器的方法后
发现对异常做了捕捉

try{}catch(Exception exception){
            log.error("exception occur={}", exception);}finally{
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}

发现采用的是手动ack的模式。并且对Exception做了异常捕获,那么上面的异常时怎么抛到SimpleMessageListenerContainer中的
原来是因为OutOfMemoryError并不继承自Exception
image.png
如果也想捕捉Error异常
还需要加一个捕获

try{}catch(Exception exception |Error error){
            log.error("exception={},error={}", exception,error);}finally{
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}

结论

因此在采用手动ack模式时,一定要保证异常/错误不要抛出线程

标签: rabbitmq ruby c++

本文转载自: https://blog.csdn.net/qq_37436172/article/details/132251337
版权归原作者 氵奄不死的鱼 所有, 如有侵权,请联系我们删除。

“RabbitMq Consumer thread error, thread abort.异常导致服务关闭问题”的评论:

还没有评论