MessageConvert
涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析。RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等
- 当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化
- SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差
- 当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@Configuration
public class RabbitMQConfig {
public static final String WINCALLCDR_QUEUE = "WINCHANCDR_QUEUE";
//生产者
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//发送消息进行序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
//消费者
@Bean("rabbitListenerContainerFactory")
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory mqConnectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(mqConnectionFactory);
//--加上这句 自定义MessageConverter
factory.setMessageConverter(new RabbitMessageConverter());
//反序列化
//factory.setMessageConverter(new Jackson2JsonMessageConverter());
//factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack
return factory;
}
}
自定义MessageConverter
在一些场景下我们希望在消息发送到MQ之前或者接受消息前对消息做一些自定义处理,这个时候就需要自定义MessageConverter了
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class RabbitMessageConverter implements MessageConverter {
/**
* 发送消息时转换
*/
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
System.out.println("=======toMessage=========");
return new Message(object.toString().getBytes(), messageProperties);
}
/**
* 接受消息时转换
*/
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return new String(message.getBody());
}
}
@RabbitListener 用法
使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理。@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。使用@RabbitListener可以设置一个自己明确默认值的RabbitListenerContainerFactory对象。可以在配置文件中设置RabbitListenerAnnotationBeanPostProcessor并通过rabbit:annotation-driven/来设置@RabbitListener的执行,当然也可以通过@EnableRabbit注解来启用@RabbitListener。
注意
消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)
消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常
配置消费者
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import net.icsoc.axt.job.config.RabbitMQConfig;
import net.icsoc.axt.job.dto.WinCallCdrDTO;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
@Slf4j
public class CallListener {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void convertAndSendOrder() {
//创建生产数据
String jsonStr ="{user_id:234}"
rabbitTemplate.convertAndSend("exchange.topic", "routingKey.aa", jsonStr);
}
@RabbitListener(queues = RabbitMQConfig.WINCALLCDR_QUEUE, containerFactory = "rabbitListenerContainerFactory")
public void winCallCdr(String messsageBody) {
//log.info("winCallCdr消费者收到消息 : " + messsageBody);
WinCallCdrDTO winCallCdrDTO = JSON.parseObject(messsageBody, WinCallCdrDTO.class);
try {
exectueSaveWinCallCdrData2Db(winCallCdrDTO);
log.info("winCallCdr成功消费消息 {}", winCallCdrDTO.getCallId());
} catch (DataAccessException e) {
log.error("消费winCallCdr异常 {} {}", messsageBody, e);
}
}
}
和 @RabbitHandler 搭配使用
@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {
@RabbitHandler
public void processMessage1(String message) {
System.out.println(message);
}
@RabbitHandler
public void processMessage2(byte[] message) {
System.out.println(new String(message));
}
}
@Payload 与 @Headers
使用 @Payload 和 @Headers 注解可以消息中的 body 与 headers 信息
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
System.out.println("body:"+body);
System.out.println("Headers:"+headers);
}
也可以获取单个 Header 属性
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
System.out.println("body:"+body);
System.out.println("token:"+token);
}
通过 @RabbitListener 注解声明 Binding
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
value = @Queue(value = "consumer_queue",durable = "true"),
key = "key.#"
))
public void processMessage1(Message message) {
System.out.println(message);
}
自动确认
生产者产生10笔消息,自动确认模式下,消息处理成功,消费者才会去获取下一笔消息;消息处理抛出异常,那么将会消息重回队列。自动确认分四种情况(第一就是正常消费,其他三种为异常情况)
- 消息成功被消费,没有抛出异常,则自动确认,回复ack。不涉及requeue,毕竟已经成功了。requeue是对被拒绝的消息生效。
- 当抛出ImmediateAcknowledgeAmqpException异常的时候,则视为成功消费,确认该消息。
- 当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue = false(该异常会在重试超过限制后抛出)
- 抛出其他的异常,消息会被拒绝,且requeue = true
手动确认
常用API
- channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false); ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息
- channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false); Reject表示拒绝消息。requeue:false表示被拒绝的消息是丢弃;true表示重回队列
- channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false); nack表示拒绝消息。multiple表示拒绝指定了delivery_tag的所有未确认的消息,requeue表示不是重回队列
当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行。这种情况会导致消息队列处理出现阻塞,消息堆积,导致正常消息也无法运行。
消息重发送到队尾
可能会出现堆积
//消费者处理消息缓慢
@RabbitListener(queues = {"kinson1"})
public void receiver3(Message msg, Channel channel) throws IOException {
try {
//打印数据
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("【开始】:{}",message);
if("0".equals(message)){
throw new RuntimeException("0的消息消费异常");
}
log.info("【结束】:{}", message);
//ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
//捕获异常后,重新发送到指定队列,自动ack不抛出异常即为ack
channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
msg.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
msg.getBody());
}
}
如何处理异常消息
如果一个消息体本身有误,会导致该消息体,一直无法进行处理,而服务器中刷出大量无用日志。解决这个问题可以采取两种方案:
1.一种是对于日常细致处理,分清哪些是可以恢复的异常,哪些是不可以恢复的异常。对于可以恢复的异常我们采取第三条中的解决方案,对于不可以处理的异常,我们采用记录日志,直接丢弃该消息方案。
2.另一种是我们对每条消息进行标记,记录每条消息的处理次数,当一条消息,多次处理仍不能成功时,处理次数到达我们设置的值时,我们就丢弃该消息,但需要记录详细的日志。
将业务队列绑定死信队列,当消息被丢弃后,进入到死信队列(代码修复后监听死信队列补偿消息)。可以避免我们手动的恢复消息。
@Component
@Slf4j
public class CustomerRev {
@RabbitListener(queues = {"kinson1"})
public void receiver3(Message msg, Channel channel) throws IOException {
try {
//打印数据
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("【开始】:{}",message);
if("0".equals(message)){
throw new RuntimeException("0的消息消费异常");
}
log.info("【结束】:{}", message);
} catch (Exception e) {
//捕获异常后,重新发送到指定队列,自动确认不抛出异常即为ack
Integer retryCount;
Map<String, Object> headers = msg.getMessageProperties().getHeaders();
if(!headers.containsKey("retry-count")){
retryCount=0;
}else {
retryCount = (Integer)headers.get("retry-count");
}
//判断是否满足最大重试次数(重试3次)
if(retryCount++<3) {
headers.put("retry-count",retryCount);
//重新发送到MQ中
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentType("text/plain").headers(headers).build();
channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
msg.getMessageProperties().getReceivedRoutingKey(), basicProperties,
msg.getBody());
}
}
}
}
重试机制如何合理配置
重试机制能保证某些场景下消息能被消费掉。适合重试场景:大部分属于读取,如调用第三方接口、网络波动问题、暂时调用不了、网络连接等。重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟mq没有任何关系。
采坑:不管消息被消费了之后是手动确认还是自动确认,代码中不能使用try/catch捕获异常,否则重试机制失效。
spring:
rabbitmq:
listener:
simple:
retry:
# 开启消费者重试机制(默认就是true,false则取消重试机制)
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间距(单位:秒)
initial-interval: 2s
以上配置消息会重试5次,如果一直失败,RabbitMQ放弃消费了
版权归原作者 @航空母舰 所有, 如有侵权,请联系我们删除。