0


rabbitmq无法连接问题

背景

    最近一个项目使用了rabbitmq作为消息队列,进行异步解耦操作,因涉及到数据的一致性问题,设置了手动应答和持久化功能。开发过程中一切顺利,然而天将降大任于斯人也必先苦其心智老其筋骨,饿其体肤,空乏其身,好吧偏题了。。。。在最终的测试运行中发现一些偶尔会有消息无法发送的情况,有时候1、2周出现,有时候1、2小时出现完全没有规律。本文记载了相关问题并继续处理。

Rabbit配置

    1、设置publisher-confirm-type和publisher-returns发布确认属性,其中publisher-confirm-type有三类值:NONE、CORRELATED、SIMPLE
  • NONE:禁用发布确认模式,是默认值;

  • CORRELATED:发布消息成功到交换器后会触发回调方法;

  • SIMPLE:触发回调方法,并在发布消息成功后,调用waitForConfirms或waitForConfirmsOrDie方法等待返回发送结果。

      2、配置acknowledge-mode为manual手动确认消息
    
  • acknowledge-mode 三种值

  • none 自动确认,收到消息就通知broker,是默认值

  • manual 手动确认

  • auto 根据异常情况确认

  rabbitmq:
    host: ******
    port: 5672
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true

RabbitTemplate配置

public RabbitSend(RabbitTemplate rabbitTemplate) {
        super();
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setReturnsCallback(data -> {
            try {
                Thread.sleep(SLEEP_TIME);
                logger.info("消息发送重试=====>{}", data);
            } catch (Exception e) {
                logger.error("发送失败", e);
            }
        });
        this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                logger.info("消息发送确认成功====>{}", correlationData);
            } else {
                logger.info("消息发送失败=====>{}", correlationData);
            }
        });
    }

发送消息

    发送消息这里设置消息的持久化属性。
public void routeSend(String message, String exchange, String routingKey) {
        Message msg = this.setMessage(message);
        logger.info("开始发送消息");
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, new CorrelationData());
        logger.info("消息发送完成");
    }

    private Message setMessage(String json) {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//持久化
        return new Message(json.getBytes(), messageProperties);
    }

消费消息

    消费消息这里采用手动应答的方式,同时如果出现异常将消息移到队尾。
try {
            //处理消息,并手动应答

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            logger.error("消费失败:" + e.getMessage());
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(),
                    MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(message));
        }

问题

    悲剧的时刻来了,系统运行过程中会出现莫名奇妙的发送消息失败,并且程序假死。于是只能去调试源码,通过日志的打印发现“消息发送完成”这行日志没有打印出来,因此基本确定是rabbitTemplate.convertAndSend这行有问题,跟踪进去:
//1128行
public void convertAndSend(String exchange, String routingKey, final Object object,
            @Nullable CorrelationData correlationData) throws AmqpException {

        send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
    }

//1063行execute()

public void send(final String exchange, final String routingKey,
            final Message message, @Nullable final CorrelationData correlationData)
            throws AmqpException {
        execute(channel -> {
            doSend(channel, exchange, routingKey, message,
                    (RabbitTemplate.this.returnsCallback != null
                            || (correlationData != null && StringUtils.hasText(correlationData.getId())))
                            && isMandatoryFor(message),
                    correlationData);
            return null;
        }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
    }

//2136行 doExecute(action, connectionFactory)
@Nullable
    private <T> T execute(final ChannelCallback<T> action, final ConnectionFactory connectionFactory) {
        if (this.retryTemplate != null) {
            try {
                return this.retryTemplate.execute(
                        (RetryCallback<T, Exception>) context -> doExecute(action, connectionFactory),
                        (RecoveryCallback<T>) this.recoveryCallback);
            }
            catch (RuntimeException e) { // NOSONAR catch and rethrow needed to avoid next catch
                throw e;
            }
            catch (Exception e) {
                throw RabbitExceptionTranslator.convertRabbitAccessException(e);
            }
        }
        else {
            return doExecute(action, connectionFactory);
        }
    }

//ConnectionFactoryUtils.createConnection
private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionFactory) {
...
connection = ConnectionFactoryUtils.createConnection(connectionFactory,this.usePublisherConnection);
...
}

继续ConnectionFactoryUtils.java

public static Connection createConnection(final ConnectionFactory connectionFactory,
            final boolean publisherConnectionIfPossible) {

        if (publisherConnectionIfPossible) {
            ConnectionFactory publisherFactory = connectionFactory.getPublisherConnectionFactory();
            if (publisherFactory != null) {
                return publisherFactory.createConnection();
            }
        }
        return connectionFactory.createConnection();
    }

跳转到CachingConnectionFactory.java

public final Connection createConnection() throws AmqpException {
        if (this.stopped) {
            throw new AmqpApplicationContextClosedException(
                    "The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");
        }
        synchronized (this.connectionMonitor) {//罪魁祸首,最后阻塞在这里
            if (this.cacheMode == CacheMode.CHANNEL) {
                if (this.connection.target == null) {
                    this.connection.target = super.createBareConnection();
                    // invoke the listener *after* this.connection is assigned
                    if (!this.checkoutPermits.containsKey(this.connection)) {
                        this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
                    }
                    this.connection.closeNotified.set(false);
                    getConnectionListener().onCreate(this.connection);
                }
                return this.connection;
            }
            else if (this.cacheMode == CacheMode.CONNECTION) {
                return connectionFromCache();
            }
        }
        return null; // NOSONAR - never reach here - exceptions
    }

经过漫长的调试最终发现阻塞在CachingConnectionFactory的721行synchronized (this.connectionMonitor)。connectionMonitor是一个Object对象,加了synchronized锁,但是在对connectionMonitor加锁的地方都打上断点后发现并没有哪里锁住了对象。。。

    本文记录了rabbitmq的问题,期待大神能够提点。

    
标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/qq_37382917/article/details/128635382
版权归原作者 苜蓿花乐园 所有, 如有侵权,请联系我们删除。

“rabbitmq无法连接问题”的评论:

还没有评论