0


@RabbitListener 消息队列 消息序列化

MessageConvert

涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析。RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等

  • 当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化
  • SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差
  • 当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@Configuration
public class RabbitMQConfig {

    public static final String WINCALLCDR_QUEUE = "WINCHANCDR_QUEUE";

    //生产者
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //发送消息进行序列化
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    //消费者
    @Bean("rabbitListenerContainerFactory")
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory mqConnectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(mqConnectionFactory);
        //--加上这句  自定义MessageConverter
        factory.setMessageConverter(new RabbitMessageConverter());
        //反序列化
        //factory.setMessageConverter(new Jackson2JsonMessageConverter());

        //factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack
        return factory;
    }
}

自定义MessageConverter

在一些场景下我们希望在消息发送到MQ之前或者接受消息前对消息做一些自定义处理,这个时候就需要自定义MessageConverter了

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

public class RabbitMessageConverter implements MessageConverter {

    /**
     * 发送消息时转换
     */
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        System.out.println("=======toMessage=========");
        return new Message(object.toString().getBytes(), messageProperties);
    }

    /**
     * 接受消息时转换
     */
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return new String(message.getBody());
    }
}

@RabbitListener 用法

使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理。@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。使用@RabbitListener可以设置一个自己明确默认值的RabbitListenerContainerFactory对象。可以在配置文件中设置RabbitListenerAnnotationBeanPostProcessor并通过rabbit:annotation-driven/来设置@RabbitListener的执行,当然也可以通过@EnableRabbit注解来启用@RabbitListener。

注意
消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常

配置消费者

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import net.icsoc.axt.job.config.RabbitMQConfig;
import net.icsoc.axt.job.dto.WinCallCdrDTO;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@Slf4j
public class CallListener {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void convertAndSendOrder() {
         //创建生产数据
         String jsonStr ="{user_id:234}"
         rabbitTemplate.convertAndSend("exchange.topic", "routingKey.aa", jsonStr);
    }

    @RabbitListener(queues = RabbitMQConfig.WINCALLCDR_QUEUE, containerFactory = "rabbitListenerContainerFactory")
    public void winCallCdr(String messsageBody) {

        //log.info("winCallCdr消费者收到消息  : " + messsageBody);
        WinCallCdrDTO winCallCdrDTO = JSON.parseObject(messsageBody, WinCallCdrDTO.class);
        try {
            exectueSaveWinCallCdrData2Db(winCallCdrDTO);
            log.info("winCallCdr成功消费消息 {}", winCallCdrDTO.getCallId());
        } catch (DataAccessException e) {
            log.error("消费winCallCdr异常 {} {}", messsageBody, e);
        }
    }
}

和 @RabbitHandler 搭配使用

@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {

    @RabbitHandler
    public void processMessage1(String message) {
        System.out.println(message);
    }

    @RabbitHandler
    public void processMessage2(byte[] message) {
        System.out.println(new String(message));
    }
    
}

@Payload 与 @Headers

使用 @Payload 和 @Headers 注解可以消息中的 body 与 headers 信息

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
    System.out.println("body:"+body);
    System.out.println("Headers:"+headers);
}

也可以获取单个 Header 属性

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
    System.out.println("body:"+body);
    System.out.println("token:"+token);
}

通过 @RabbitListener 注解声明 Binding

@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
        value = @Queue(value = "consumer_queue",durable = "true"),
        key = "key.#"
))
public void processMessage1(Message message) {
    System.out.println(message);
}

自动确认

生产者产生10笔消息,自动确认模式下,消息处理成功,消费者才会去获取下一笔消息;消息处理抛出异常,那么将会消息重回队列。自动确认分四种情况(第一就是正常消费,其他三种为异常情况)

  • 消息成功被消费,没有抛出异常,则自动确认,回复ack。不涉及requeue,毕竟已经成功了。requeue是对被拒绝的消息生效。
  • 当抛出ImmediateAcknowledgeAmqpException异常的时候,则视为成功消费,确认该消息。
  • 当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue = false(该异常会在重试超过限制后抛出)
  • 抛出其他的异常,消息会被拒绝,且requeue = true

手动确认

常用API

  • channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false); ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息
  • channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false); Reject表示拒绝消息。requeue:false表示被拒绝的消息是丢弃;true表示重回队列
  • channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false); nack表示拒绝消息。multiple表示拒绝指定了delivery_tag的所有未确认的消息,requeue表示不是重回队列

当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行。这种情况会导致消息队列处理出现阻塞,消息堆积,导致正常消息也无法运行。

消息重发送到队尾

可能会出现堆积

    //消费者处理消息缓慢
    @RabbitListener(queues = {"kinson1"})
    public void receiver3(Message msg, Channel channel) throws IOException {
        try {
            //打印数据
            String message = new String(msg.getBody(), StandardCharsets.UTF_8);
            log.info("【开始】:{}",message);
            if("0".equals(message)){
                throw new RuntimeException("0的消息消费异常");
            }
            log.info("【结束】:{}", message);
            //ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            //捕获异常后,重新发送到指定队列,自动ack不抛出异常即为ack
            channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
                    msg.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    msg.getBody());
        }
    }

如何处理异常消息

如果一个消息体本身有误,会导致该消息体,一直无法进行处理,而服务器中刷出大量无用日志。解决这个问题可以采取两种方案:

1.一种是对于日常细致处理,分清哪些是可以恢复的异常,哪些是不可以恢复的异常。对于可以恢复的异常我们采取第三条中的解决方案,对于不可以处理的异常,我们采用记录日志,直接丢弃该消息方案。

2.另一种是我们对每条消息进行标记,记录每条消息的处理次数,当一条消息,多次处理仍不能成功时,处理次数到达我们设置的值时,我们就丢弃该消息,但需要记录详细的日志。

将业务队列绑定死信队列,当消息被丢弃后,进入到死信队列(代码修复后监听死信队列补偿消息)。可以避免我们手动的恢复消息。

@Component
@Slf4j
public class CustomerRev {

    @RabbitListener(queues = {"kinson1"})
    public void receiver3(Message msg, Channel channel) throws IOException {
        try {
            //打印数据
            String message = new String(msg.getBody(), StandardCharsets.UTF_8);
            log.info("【开始】:{}",message);
            if("0".equals(message)){
                throw new RuntimeException("0的消息消费异常");
            }
            log.info("【结束】:{}", message);
        } catch (Exception e) {
            //捕获异常后,重新发送到指定队列,自动确认不抛出异常即为ack
            Integer retryCount;
            Map<String, Object> headers = msg.getMessageProperties().getHeaders();
            if(!headers.containsKey("retry-count")){
                retryCount=0;
            }else {
                retryCount = (Integer)headers.get("retry-count");
            }
            //判断是否满足最大重试次数(重试3次)
            if(retryCount++<3) {
                headers.put("retry-count",retryCount);
                //重新发送到MQ中
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentType("text/plain").headers(headers).build();
                channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
                        msg.getMessageProperties().getReceivedRoutingKey(), basicProperties,
                        msg.getBody());
            }
        }
    }
}

重试机制如何合理配置

重试机制能保证某些场景下消息能被消费掉。适合重试场景:大部分属于读取,如调用第三方接口、网络波动问题、暂时调用不了、网络连接等。重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟mq没有任何关系。

采坑:不管消息被消费了之后是手动确认还是自动确认,代码中不能使用try/catch捕获异常,否则重试机制失效。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          # 开启消费者重试机制(默认就是true,false则取消重试机制)
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间距(单位:秒)
          initial-interval: 2s

以上配置消息会重试5次,如果一直失败,RabbitMQ放弃消费了

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/hudeyong926/article/details/129706102
版权归原作者 @航空母舰 所有, 如有侵权,请联系我们删除。

“@RabbitListener 消息队列 消息序列化”的评论:

还没有评论