1. 本文简介:
rabbitmq的连接配置, 以及回收机制配置, 修改默认java序列化方式
1.1 java序列化的缺点
---> 1.1.1 无法跨语言
serialVersionUID版本号必须相同,相同类名 版本号不一样 反序列化失败
--->1.1.2 易被攻击
Apache Commons Collections 允许链式的任意的类函数反射调用,攻击者通过“实现了 Java 序列化协议”的端口,把攻击代码上传到服务器上,再由 Apache Commons Collections 里的 TransformedMap 来执行。
---> 1.1.3 序列化后的流太大
Java 序列化实现的二进制编码完成的二进制数组大小,比 ByteBuffer 实现的二进制编码完成的二进制数组大小要大上几倍。
---> 1.1.4 序列化性能太差
ObjectOutputStream序列化效率很低
2. 配置总览
2.1 基础配置
#基础配置
spring.rabbitmq.host=*
spring.rabbitmq.port=5672
spring.rabbitmq.username=pzy
spring.rabbitmq.password=*
spring.rabbitmq.virtual-host=develop
2.2 连接重试配置
# 开启rabbit初始化重试机制
spring.rabbitmq.template.retry.enabled=true
## 最大重试间隔时间
spring.rabbitmq.template.retry.max-interval=1000ms
## 最大重试次数
spring.rabbitmq.template.retry.max-attempts=3
# 间隔乘数
spring.rabbitmq.template.retry.multiplier=1
# 初始化的时间间隔
spring.rabbitmq.template.retry.initial-interval=1000ms
2.3 异常重试机制
#异常重试机制设置[未加死信队列 五次异常后 直接抛弃了]
#设置是否重回队列 true即出现异常会将消息重新发送到队列中
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#设置是否启用消息重试机制,默认为false。
spring.rabbitmq.listener.simple.retry.enabled=true
#设置消息重试的最大次数,默认为3。
spring.rabbitmq.listener.simple.retry.max-attempts=5
#设置消息重试的初始间隔时间,默认为1000ms。
spring.rabbitmq.listener.simple.retry.initial-interval=2000ms
#设置消息重试的时间间隔倍数,默认为1(重试时间越来越长)
spring.rabbitmq.listener.simple.retry.multiplier=1.2
##设置消息重试的最大时间间隔,默认为10000ms。
spring.rabbitmq.listener.simple.retry.max-interval=3000ms
2.4 确认模式(本篇是自动)
#spring.rabbitmq.listener.simple.acknowledge-mode=none
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
#spring.rabbitmq.listener.direct.acknowledge-mode=manual
---> 2.4.1 如图所示
2.5 发送确认设置
#发布消息成功到交换器后会触发回调方法(默认禁用none)
spring.rabbitmq.publisher-confirm-type=correlated
#消息发布不可达目的地的时候 才进行回退
spring.rabbitmq.publisher-returns=true
---> 2.5.1 参数解释: (老版的功能 直接是一个布尔值 false不开启 true开启)
None 禁用发布确认模式,是默认值
CORRELATED 发布消息成功到交换机后会触发回调方法
SIMPLE 有两种效果:其一: 效果和 CORRELATED 值一样会触发回调方法
其二: 在发布消息成功 使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,无法发送消息到 broker
---> 2.5.2 写一个callbackConfig, 重写方法
package com.aisce.axmall.order.config.rabbitmq;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
/**
* rabbitmq的成功与失败
* 消息回调
* @author pzy
* @description: TODO
* @version 1.0.1 beta版
*/
@Slf4j
@Configuration
public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
/**
* 消息正常发送 或者发送到broker后出现问题
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("confirm==>发送到broker失败\r\n" + "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
correlationData, ack, cause);
} else {
log.info("confirm==>发送到broker成功\r\n" + "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
correlationData, ack, cause);
}
}
/**
* 压根没到目的地 执行
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(@NonNull Message message, int replyCode,
@NonNull String replyText, @NonNull String exchange, @NonNull String routingKey) {
log.error("error returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
"replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
message, replyCode, replyText, exchange, routingKey);
}
//可进行后续操作
}
2.6 重新配置序列化
有两种方式(原理差不多)
---> 方法一: 都以application/json传递接收
@Configuration
public class RabbitConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
---> 方法二: 都以application/json传递接收
/**
* rabbitmq配置类
*
* @author pzy
* @version 0.1.0
* @description: TODO
*/
@Configuration
public class RabbitConfig {
@Autowired
private SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory;
@Autowired
private RabbitCallbackConfig rabbitCallbackConfig;
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//消息到不到队列 自动重新返回到生产者
rabbitTemplate.setMandatory(true);//其实前面配置加了
rabbitTemplate.setConfirmCallback(rabbitCallbackConfig);
rabbitTemplate.setReturnCallback(rabbitCallbackConfig);
// 使用 JSON 序列化与反序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
---> 展示效果:
3. 配置总结
3.1 完整的application.properties的配置
#beta版 rabbitmq V3.0.1 版本启动测试 pzy===============================================>
#基础配置
spring.rabbitmq.host=*
spring.rabbitmq.port=5672
spring.rabbitmq.username=pzy
spring.rabbitmq.password=*
spring.rabbitmq.virtual-host=develop
#发送确认机制设置
#发布消息成功到交换器后会触发回调方法(默认禁用none)
spring.rabbitmq.publisher-confirm-type=correlated
#消息发布不可达目的地的时候 才进行回退
spring.rabbitmq.publisher-returns=true
# 队列设置
#设置每次预抓取的数量是3,处理完之前不收下一条 默认250
spring.rabbitmq.listener.simple.prefetch=3
# 手动确认模式
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#spring.rabbitmq.listener.direct.acknowledge-mode=manual
# 开启rabbit初始化重试机制
spring.rabbitmq.template.retry.enabled=true
## 最大重试间隔时间
spring.rabbitmq.template.retry.max-interval=1000ms
## 最大重试次数
spring.rabbitmq.template.retry.max-attempts=3
# 间隔乘数
spring.rabbitmq.template.retry.multiplier=1
# 初始化的时间间隔
spring.rabbitmq.template.retry.initial-interval=1000ms
#异常重试机制设置[未加死信队列 五次异常后 直接抛弃了]
#设置是否重回队列 true即出现异常会将消息重新发送到队列中
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#设置是否启用消息重试机制,默认为false。
spring.rabbitmq.listener.simple.retry.enabled=true
#设置消息重试的最大次数,默认为3。
spring.rabbitmq.listener.simple.retry.max-attempts=5
#设置消息重试的初始间隔时间,默认为1000ms。
spring.rabbitmq.listener.simple.retry.initial-interval=2000ms
#设置消息重试的时间间隔倍数,默认为1(重试时间越来越长)
spring.rabbitmq.listener.simple.retry.multiplier=1.2
##设置消息重试的最大时间间隔,默认为10000ms。
spring.rabbitmq.listener.simple.retry.max-interval=3000ms
# beta版 ============================================================================>
3.2 完整的application.yml配置
properties 转成 yml 后 注释没了 具体看上面的
#只有服务的消费者 决定怎么消费确认 生产者决定不了
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
default-requeue-rejected: true
retry:
enabled: true
initial-interval: 2000ms
max-attempts: 5
max-interval: 3000ms
multiplier: 1.2
host: *
port: 5672
username: pzy
password: *
virtual-host: develop
# 新版的就是publisher-confirm-type=correlated
publisher-confirms: true
publisher-returns: true
template:
retry:
enabled: true
initial-interval: 1000ms
max-attempts: 3
max-interval: 1000ms
multiplier: 1
4. 文章传送门
微服务: 00-rabbitmq出现的异常以及解决方案
微服务: 01-rabbitmq的应用场景及安装(docker)
微服务 02-rabbitmq在springboot中如何使用(上篇)
微服务: 03-rabbitmq在springboot中如何使用(下篇)
5. 下文预告
微服务: 05-rabbitmq设置重试次数并设置死信队列
版权归原作者 pingzhuyan 所有, 如有侵权,请联系我们删除。