一、配置
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yaml
#RabbitMQ 配置
#地址
spring.rabbitmq.host=192.168.1.5
#端口
spring.rabbitmq.port=5672
#用户
spring.rabbitmq.username=usermq
#密码
spring.rabbitmq.password=111111
#开启发布确认模式 生产者--交换机
spring.rabbitmq.publisher-confirm-type=correlated
#允许消息回退 交换机--队列
spring.rabbitmq.publisher-returns=true
#(5672为端口,15672为web端口)
二、使用
配置类通过bean注入
创建队列,创建路由,创建队列绑定 (都是通过xxxBuider工具类创建)
创建交换机,创建队列(同时声名对应的死信交换机),绑定路由,绑定死信交换机和队列
交换机类型
DirectExchange (直接交换机,路由模式)
TopicExchange (主题模式 路由的路由)
FanoutExchange (发布/订阅,和路由id没关系,广播方式)
HeadersExchange
CustomExchange (自定义交换机 ,使用延迟消息插件的时候配置)
全部配置在@Configuration类中
(1)创建普通交换机
@Bean("yExchange") //起别名
public DirectExchange yExchange(){
//new DirectExchange(X_EXCHANGE)
//除了创建对象方式创建路由或者队列外 还可以通过工具类创建
return ExchangeBuilder.directExchange(Y_DEAD_LETTER_EXCHANGE).build();
}
(2) 创建普通队列
@Bean("queueYC")
public Queue queueYC(){
//队列
return QueueBuilder.durable(DELAY_QUEUE_NAME).build();
}
(3)绑定 交换机--队列
//@Qualifier 自动注入bean
//交换机--队列 通过路由id绑定(直接交换机类型) with
//绑定
@Bean
public Binding binding(@Qualifier("queueYC") Queue queue,@Qualifier("yExchange")DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("YCXX");
}
(4)创建带有死信交换机的队列
死信交换机也是普通的交换机,创建方式和普通交换机一致
死信交换机绑定的队列也是普通的队列
带有死信交换机的为死信队列
死信队列中的消息成为死信后会被转发到死信交换机中
死信消息:比如ttl过期(存活时间)
示例
//声名普通队列列B QB--Y
@Bean("queueB")
public Queue queueB(){
Map<String,Object> map =new HashMap<>();
//设置死信息交换机
map.put("x-dead-letter-exchange","Y_DEAD_LETTER_EXCHANGE");
//设置RotingKey
map.put("x-dead-letter-routing-key","YD"); //(需要是死信交换机和 死信队列之间的路由id,不是当前队列和死刑死信路由的id,当前队列和死信交换机不需要路由id)
//设置ttl 消息存活时间 单位是ms,设置当前队列消息的过期时间,也可以在生产者发送消息时候设置
map.put("x-message-ttl",4000);
//配置队列 可以通过工具类创建
//队列名字 //其他配置
return QueueBuilder.durable("QUEUE_B").withArguments(map).build();
}
(5)生产者
创建RabbitTemplate对象(自动注入)
调用发送消息
rabbitTemplate.convertAndSend("交换机","路由和队列绑定的路由id","消息");
例如
@RequestMapping("/send/{msg}/{ttl}")
String TtlMsg(@PathVariable("msg") String msg,@PathVariable("ttl") String ttl){
log.info("发送消息给QC:{},ttl:{},当前时间:{}",msg,ttl,new Date());
//发送消息的交换机 路由id 消息 其他设置
rabbitTemplate.convertAndSend("X", "XC", msg,new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置过期时间
message.getMessageProperties().setExpiration(ttl);
return message;
}
});
return "发送成功";
}
(6)消费者
需要消息监听器监听器
给方法加上 @RabbitListener(queues = "QD") 注解监听QD队列,即接收QD队列的消息
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = "confirm_queue")
public void receiveConfigMEssage(Message message){
log.info("接收到的队列confirm_queue消息:{}",new String(message.getBody()));
}
}
(7)Message对象
message.getBody():
消息内容bytes类型 使用new String(message.getBody())装箱成String类型
message.getMessageProperties().setExpiration(ttl)
发送消息时候设置过期时间
(8)延时队列优化(死信实现延时,有缺陷)
死信消息做延迟有巨大缺陷
RabbitMQ只会检测第一个消息是否过期
后面的ttl会受到前一条消息ttl影响
如果前一条消息ttl为10s 后一条消息ttl为2s ,那么后一条消息ttl也会变成ttl(后一条消息需要等待前一天消息发去出去)
总结:如果第一个的延时消息时长很长,而且第二个消息的延时时长很短,第二个消息并不会有序执行
三、Rabbitmq插件实现延迟队列(重点)
下载插件
插件放在这个目录下
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.13/plugins
安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启RabbitMQ
systemctl restart rabbitmq-server
消息延迟的是交换机
安装插件后新增交换机类型
x-delayed-message
声名延迟交换机
声名交换机
交换机类型
DirectExchange
TopicExchange
FanoutExchange
HeadersExchange
CustomExchange (自定义类型)
延迟交换机需要使用 CustomExchange (自定义类型) 需要设置为延迟类型(直接设置) ,也需要设置类型(直接还是路由模式,map设置)
CustomExchange 不能使用ExchangeBuilder 创建,因为没有对应方法
只能 new CustomExchange("交换机名字","x-delayed-message","是否持久化","是否自动删除",map)
Map<String,Object> map=new HashMap<>();
//设置延迟 交换机的类型(设置为路由模式)
map.put("x-delayed-type","direct"); //设置了 (x-delayed-message)延迟交换机后,还要设置它的模式 (设置路由模式)
设置延迟交换机
//声名交换机
//DirectExchange
//TopicExchange
//FanoutExchange
//HeadersExchange
//CustomExchange
@Bean("ycExchange")
public CustomExchange customExchange(){
Map<String,Object> map=new HashMap<>();
//设置延迟 交换机的类型(设置为路由模式)
map.put("x-delayed-type","direct");
//不能使用ExchangeBuilder工具类创建
// ExchangeBuilder.
//1、交换机名称
//2、交换机类型
//3、是否需要持久化
//4、是否需要自动删除
//5、其他配置参数
return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,map);
}
绑定
不是用Buider创建的需要使用noargs()适配
//绑定
@Bean
public Binding binding(@Qualifier("queueYC") Queue queue,@Qualifier("ycExchange")CustomExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("YCXX").noargs();
}
延迟消息总结
使用RabbitMQ来实现延迟消息很好的利用RabbitMQ的特性
消息可靠发送,消息可靠投递,死信消息至少被消费一次以及未被真确处理的消息不会被丢弃。另外,通过RabbltMq集群的特性可以很好
的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失
四、发布确认
生产者发送消息给交换机--队列--消费者
如果消息发送识失败,把消息放入缓存中
队列或者交换机出问题就算交换机问题
(1)确认回调接口(监控消息是否被收到)
针对全部交换机
监控: 生产者--交换机 过程
在配置文件当中开启发布确认模式()
spring.rabbitmq.publisher-confirm-type=correlated
none 默认禁止发布确认模式
correlated 开启发布确认(只监控生产者--交换机 监控不到 交换机--队列)
simple(相当于单个确认) 两种效果 其一效果和correlated值一样会触发回调方法
其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForsOrDie方法等待broker节点
返回发送结果,根据返回结果来判定下一步的逻辑,要注意的是waitForConfirmsOrDie方法如果
返回false则会关闭channel,则接下来无法发送消息到broker
发送消息时候
每次发送消息到交换机中都会被监控到(需要设置消息id)
//消息的其他属性(id) 需要开启发布确认模式
CorrelationData correlationData=new CorrelationData("1");
//发送消息 交换机 路由id 消息 消息的其他属性
rabbitTemplate.convertAndSend("confirm_exchange","GJK",message,correlationData);
生产者
需要传递配置CorrelationData 对象来传递消息id等信息
//发消息
@RequestMapping("/pron/{message}")
public String sendMessage(@PathVariable("message") String message){
//消息的其他属性(id) 需要开启发布确认模式
CorrelationData correlationData=new CorrelationData("1");
//发送消息 交换机 路由id 消息 消息的其他属性
rabbitTemplate.convertAndSend("confirm_exchange","GJK",message,correlationData);
}
配置类
继承设置RabbitTemplate.ConfirmCallback 接口
重写
//发布确认回调接口
//消息退回
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct //后置处理器 类似生命周期钩子,创建类时候最后执行
private void init() {
//把当前配置注入注入
rabbitTemplate.setConfirmCallback(this);
}
//消息确认 生产者--交换机
@Override //消息信息 交换机是收到消息 失败原因
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id=correlationData.getId()!=null?correlationData.getId():"";
if(b){
log.info("交换机已经收到Id为:{}的消息",id);
}else {
log.info("交换机还未收到ID为:{}的消息,原因为:{}",id,s);
}
}
}
(2)回退消息
针对全部队列,在发布确认的前提下,消息会回退到回调函数中,可以在回调函数中重新创建生产者转发
监控:交换机--队列 过程
开启回退消息
spring.rabbitmq.publisher-returns=true
或者
//设置 交换机--队列 路由失败时候 自动退回消费者(returnedMessage进行处理) 默认是false直接丢弃消息
rabbitTemplate.setMandatory(true);
交换机--队列消息路由失败时候对消息进行处理:通过mandatory参数可以在当消息传递过程装中
不可到达目的地时候将消息返回给生产者
回调函数
配置类需要实现
RabbitTemplate.ReturnsCallback接口
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnsCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct //后置处理器 类似生命周期钩子,创建类时候最后执行
private void init() {
//把当前配置注入注入
rabbitTemplate.setConfirmCallback(this);
//设置 交换机--队列 路由失败时候 自动退回消费者(returnedMessage进行处理) 默认是false直接丢弃消息
// rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(this);
}
//消息确认 生产者--交换机
@Override //消息信息 交换机是收到消息 失败原因
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id=correlationData.getId()!=null?correlationData.getId():"";
if(b){
log.info("交换机已经收到Id为:{}的消息",id);
}else {
log.info("交换机还未收到ID为:{}的消息,原因为:{}",id,s);
}
}
//回退消息,回调函数 交换机--队列
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
//可以在这创建生产者再次发送当前消息returnedMessage
log.error("消息:{},被交换机:{}退回,退回原因:{},路由key:{}",
new String(returnedMessage.getMessage().getBody()), //消息内容
returnedMessage.getExchange(),//交换机
returnedMessage.getReplyText(), //失败原因
returnedMessage.getRoutingKey() //路由id
);
}
}
生产者发送消息和发布确认中的生产者一致即可,不用发送ReturnedMessage 对象(会自动生成)
但消息确认中CorrelationData 对象需要发送(不会自动生成)
五、备份交换机
作用:自动处理 交换机--队列 中被退回的消息 转发到备份交换机中。
类型死信队列绑定-死信交换机,交换机--备份交换机
将确认交换机的消息(设置了发布确认模式,开启了消息退回)
配置
构建确认交换机时候需要配置备份交换机 withArgument("alternate-exchange","备份交换机名字")
例如
//交换机声名--绑定备份交换机
@Bean("confirmExchange")
public DirectExchange directExchange(){
//交换机名字 是否持久化 //设置备份交换机 //备份交换机名字
return ExchangeBuilder.directExchange(CONFIRM_CONFIG_NAME).durable(true).withArgument("alternate-exchange",BACKUP_CONFIG_NAME1).build();
}
备份交换机可以设置成广播类型交换机
//创建备份交换机
@Bean("bfExchange")
public FanoutExchange baexchange(){
return ExchangeBuilder.fanoutExchange(BACKUP_CONFIG_NAME1).build();
}
绑定备份交换机的队列称为报警队列(普通队列)
例如
//报警队列
@Bean("bjQueue")
public Queue bjqueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME2).build();
}
//绑定报警队列和备份交换机
@Bean
public Binding bfMyinding(@Qualifier("bfQueue")Queue queue,@Qualifier("bfExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
六、其他知识
(1)幂等性(消息重复消费)
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
举个最简单是例子,那就是多次支付
消费者再消费MQ中消息时候,消费者在返回确认时候ack网络中断,所有MQ未收到确认信息,所以这条消息会被发送给其他消费者进行消费
造成消息重复消费
解决问题:
消息全局ID:每次消费时候先判断该id是否被消费过
指纹码机制:一些规则或者时间搓加别的服务给到唯一信息码(劣势:高并发时候频繁对比数据库信息会造成瓶颈)
利用redis原子性:执行setnx命令 ,天然具有幂等性,从而实现不重复消费
(2)队列优先级
订单优先级
优先权重 0-255 越大越优先
数据优先处理
优先级排序
队列需要设置优先级,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序
队列设置最大优先级(0-255)
QueueBuilder.durable("priorityQueue").maxPriority(10).build();
消息设置优先级 maxPriority(10)
(3)惰性队列
惰性队列: 消息保存在内存中还是在磁盘上
正常情况:消息是保存在内存中
惰性队列:消息保存在磁盘中
使用在:消费者下线或者宕机
优点:减少内存小号
缺点:速度慢
两种模式:default(默认) lazy(惰性模式)
withArguments( “x-queue-mode”,"lazy") 设置惰性队列
QueueBuilder.durable(QUEUE_B).withArguments(map)build()
RabbitMQ集群
镜像队列
高可用负载均衡
使用第三方服务器
Federation Exchange
联邦交换机
Shovel
可以把源端的数据转发到目的端
版权归原作者 不爱小白的小孩 所有, 如有侵权,请联系我们删除。