1、发布订阅
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:- Fanout:广播,将消息交给所有绑定到交换机的队列- Direct:定向,把消息交给符合指定routing key 的队列- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
1.1、交换机
** Exchange(交换机)只负责转发消息,不具备存储消息的能力**,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
总共有以下类型:直接(direct),主题(topic) ,标题(headers) , 扇出(fanout)
在之前的案例中我们并没有指定交换机的名字,但仍然能够将消息发送到队列。之前能实现的原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。
channel.basicPublish("", queueName, null, message.getBytes());
第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话。
临时队列
之前的章节我们使用的是具有特定名称的队列(还记得 hello 和 ack_queue 吗?)。队列的名称我们来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。
每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有**随机名称的队列**,或者能让服务器为我们选择一个随机队列名称那就更好了。其次**一旦我们断开了消费者的连接,队列将被自动删除。**创建临时队列的方式如下:
String queueName = channel.queueDeclare().getQueue();
绑定
什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定。
1.2、Fanout
我们姑且叫他广播叭!
在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
生产者
package com.songzhishu.rabbitmq.fanout;
import com.rabbitmq.client.Channel;
import com.songzhishu.rabbitmq.util.RabbitMqUtils;
import java.util.Scanner;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.fanout
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 11:04
* @Description: 消息发送
* @Version: 1.0
*/
public class FP {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机 参数1:交换机名称 参数2:交换机类型 fanout 广播类型
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//发送消息 控制台输入消息
System.out.println("输入消息:");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
//发送消息到交换机 参数1:交换机名称 参数2:路由key 参数3:消息持久化 参数4:消息内容
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println("消息发送完毕:"+message);
}
}
}
消费者
package com.songzhishu.rabbitmq.fanout;
import com.rabbitmq.client.Channel;
import com.songzhishu.rabbitmq.util.RabbitMqUtils;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.fanout
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 10:31
* @Description: 消息发送
* @Version: 1.0
*/
public class FC1 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机 参数1:交换机名称 参数2:交换机类型 fanout 广播类型
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//创建临时队列 队列名称是随机的 当消费者断开与队列的连接后 队列自动删除
String queue = channel.queueDeclare().getQueue();
//绑定交换机与队列 参数1:队列名称 参数2:交换机名称 参数3:路由key
channel.queueBind(queue,EXCHANGE_NAME,"");
System.out.println("等待接收消息,把接收到的消息打印在屏幕上......");
//接收消息 参数1:队列名称 参数2:开启消息的自动确认机制 参数3:消费时的回调接口
channel.basicConsume(queue,true,(consumerTag,message)->{
System.out.println("接收到的消息f1:"+new String(message.getBody()));
},consumerTag -> {
System.out.println("消息消费被中断");
});
}
}
1.3、Direct
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key)消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息当然如果 exchange 的绑定类型是 direct,**但是它绑定的多个队列的****key****如果都相同**,在这种情况下虽然绑定类型是 direct **但是它表现的就和****fanout****有点类似了**,就跟广播差不多。此外如果没有绑定关系的话就丢弃。
生产者
package com.songzhishu.rabbitmq.direct;
import com.rabbitmq.client.Channel;
import com.songzhishu.rabbitmq.util.RabbitMqUtils;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.direct
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 11:37
* @Description: TODO
* @Version: 1.0
*/
public class DP {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//睡眠
Thread.sleep(30000);
channel.basicPublish(EXCHANGE_NAME, "info", null, "这是info消息".getBytes());
channel.basicPublish(EXCHANGE_NAME, "warning", null, "这是warning消息".getBytes());
channel.basicPublish(EXCHANGE_NAME, "error", null, "这是error消息".getBytes());
}
}
消费者
1.4、Topic
Topic
类型的
Exchange
与
Direct
相比,都是可以根据
RoutingKey
把消息路由到不同的队列。只不过
Topic
类型
Exchange
可以让队列在绑定
Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:
item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
发送者
package com.songzhishu.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import com.songzhishu.rabbitmq.util.RabbitMqUtils;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.direct
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 11:37
* @Description: TODO
* @Version: 1.0
*/
public class TP {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//睡眠
Thread.sleep(30000);
//参数 1:交换机名称 2:路由key 3:传递消息额外设置 4:消息的具体内容
channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, "这是消息:quick.orange.rabbit".getBytes());
channel.basicPublish(EXCHANGE_NAME, "lazy.orange.elephant", null, "这是消息:lazy.orange.elephant".getBytes());
channel.basicPublish(EXCHANGE_NAME, "quick.orange.fox", null, "这是消息:quick.orange.fox".getBytes());
channel.basicPublish(EXCHANGE_NAME, "lazy.brown.fox", null, "这是消息:lazy.brown.fox".getBytes());
channel.basicPublish(EXCHANGE_NAME, "lazy.pink.rabbit", null, "这是消息:lazy.pink.rabbit".getBytes());
channel.basicPublish(EXCHANGE_NAME, "quick.brown.fox", null, "这是消息:quick.brown.fox".getBytes());
channel.basicPublish(EXCHANGE_NAME,"red.quick.orange.rabbit",null,"这是消息:red.quick.orange.rabbit".getBytes());
channel.basicPublish(EXCHANGE_NAME,"lazy.quick.orange.rabbit",null,"这是消息:lazy.quick.orange.rabbit".getBytes());
}
}
生产者
package com.songzhishu.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import com.songzhishu.rabbitmq.util.RabbitMqUtils;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.direct
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 11:33
* @Description: 消费者
* @Version: 1.0
*/
public class TC1 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,EXCHANGE_NAME,"*.orange.*");
System.out.println("TC1等待接受消息");
//接受到消息后回调
channel.basicConsume(queue,true,(consumerTag,message)->{
System.out.println("TC1接受到消息:"+new String(message.getBody()));
},(consumerTag)->{
System.out.println("接受消息失败");
});
}
}
package com.songzhishu.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import com.songzhishu.rabbitmq.util.RabbitMqUtils;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.direct
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 11:33
* @Description: 消费者
* @Version: 1.0
*/
public class TC2 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, EXCHANGE_NAME, "*.*.rabbit");
System.out.println("TC2等待接受消息");
//接受到消息后回调
channel.basicConsume(queue, true, (consumerTag, message) -> {
System.out.println("TC2接受到消息:" + new String(message.getBody()));
}, (consumerTag) -> {
System.out.println("接受消息失败");
});
}
}
package com.songzhishu.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import com.songzhishu.rabbitmq.util.RabbitMqUtils;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.direct
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 11:33
* @Description: 消费者
* @Version: 1.0
*/
public class TC3 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,EXCHANGE_NAME,"lazy.#");
System.out.println("TC3等待接受消息");
//接受到消息后回调
channel.basicConsume(queue,true,(consumerTag,message)->{
System.out.println("TC3接受到消息:"+new String(message.getBody()));
},(consumerTag)->{
System.out.println("接受消息失败");
});
}
}
2、死信队列
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的**原因导致****queue****中的某些消息无法被消费**,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
产生死信的原因:
- 消息 TTL 过期,消息由一定的存活时间
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
生产者
package com.songzhishu.rabbitmq.dm;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.songzhishu.rabbitmq.util.RabbitMqUtils;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.dm
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 13:03
* @Description: TODO
* @Version: 1.0
*/
public class DMP {
public static final String EXCHANGE_NAME = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//发送消息
for (int i = 1; i <= 10; i++) {
String message = "info" + i;
//参数1:交换机名称,参数2:路由key 参数3:传递消息额外设置 参数4:消息的具体内容
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
channel.basicPublish(EXCHANGE_NAME, "normal", properties, message.getBytes());
System.out.println("生产者发送消息:" + message);
}
}
}
消费者
package com.songzhishu.rabbitmq.dm;
import com.rabbitmq.client.Channel;
import com.songzhishu.rabbitmq.util.RabbitMqUtils;
import java.util.HashMap;
import java.util.Map;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.dm
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 13:04
* @Description: TODO
* @Version: 1.0
*/
public class DMC1 {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");
channel.exchangeDeclare(DEAD_EXCHANGE, "direct");
Map<String, Object> hashMap = new HashMap<>();
//正常队列设置死信交换机
hashMap.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信RoutingKey
hashMap.put("x-dead-letter-routing-key", "dead");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, hashMap);
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");
System.out.println("等待接收消息......");
//接收消息 参数1:队列名称,参数2:消息自动确认 true表示自动确认,false表示手动确认,参数3:消费时的回调接口
channel.basicConsume(NORMAL_QUEUE, true, (consumerTag, message) -> {
System.out.println("Consumer1接收到的消息是:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
});
}
}
package com.songzhishu.rabbitmq.dm;
import com.rabbitmq.client.Channel;
import com.songzhishu.rabbitmq.util.RabbitMqUtils;
import java.util.HashMap;
import java.util.Map;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.dm
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 13:04
* @Description: TODO
* @Version: 1.0
*/
public class DMC2 {
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("等待接收消息......");
//接收消息 参数1:队列名称,参数2:消息自动确认 true表示自动确认,false表示手动确认,参数3:消费时的回调接口
channel.basicConsume(DEAD_QUEUE, true, (consumerTag, message) -> {
System.out.println("Consumer2接收到的消息是:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
});
}
}
这里先启动DMC1,然后让其挂掉,然后启动DMP发送消息,此时消费者挂掉导致没有人接受消息在正常的队列中,然后消息在发送的时候设置的有过期时间,超过时间那么此时未被读取的消息就会被设置为死信,那么此时就会发送到死信队列中去,然后这时候启动DMC2,2消费者就会读取在死信队列中的消息。
超出长度
生产者,取消过期的设置,就是正常的发送消息,生产者1,设置消息正常的消息队列长度
//队列长度限制
hashMap.put("x-max-length", 5);
消息被拒
生产者不用修改,修改DMC1代码,取消长度限制,然后修改接受的代码,以及将消息的自动确认修改为false
//接收消息 参数1:队列名称,参数2:消息自动确认 true表示自动确认,false表示手动确认,参数3:消费时的回调接口
channel.basicConsume(NORMAL_QUEUE, false, (consumerTag, message) -> {
if ("info5".equals(new String(message.getBody()))) {
System.out.println("Consumer1接收到的消息是:" + new String(message.getBody()) + "此消息被拒绝");
//拒绝消息 参数1:消息标识 参数2:是否将消息重新放回队列
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("Consumer1接收到的消息是:" + new String(message.getBody()));
//手动确认消息 参数1:手动确认消息标识 参数2:是否开启多个消息同时确认
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
}, consumerTag -> {
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
});
3、延时队列
延时队列:队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
在之前的使用中其实已经涉及了,TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间。单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
方式一:在消息发送的时候设置过期时间
方式二:在创建队列的时候设置“x-messafe-ttl”的属性
如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为**消息是否过期是在即将投递到消费者之前判定的**,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
3.1、案例
生产者
package com.songzhishu.rabbitmq.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.controller
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 19:25
* @Description: 发送消息控制器
* @Version: 1.0
*/
@Slf4j
@RestController
@RequestMapping("/message")
public class SendMsgController {
@Resource
private RabbitTemplate rabbitTemplate;
//发送消息接口
@GetMapping("/sendMsg/{message}")
private void sendMsg(@PathVariable String message) {
log.info("当前时间:{},消息内容:{},发送消息两个队列" , new Date().toString(), message);
rabbitTemplate.convertAndSend("normal_exchange", "normal_one", "消息来自 ttl 为 5S 的队列: "+message);
rabbitTemplate.convertAndSend("normal_exchange", "normal_two", "消息来自 ttl 为 30S 的队列: "+message);
}
}
配置类
package com.songzhishu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.config
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 18:09
* @Description: TODO
* @Version: 1.0
*/
@Configuration
public class TTLQueueConfig {
//普通队列名称
public static final String NORMAL_QUEUE_NAME_ONE = "normal_queue_one";
public static final String NORMAL_QUEUE_NAME_TWO = "normal_queue_two";
//死信队列名称
public static final String DEAD_QUEUE_NAME = "dead_queue";
//普通交换机
public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";
//死信交换机
public static final String DEAD_EXCHANGE_NAME = "dead_exchange";
//声明普通交换机
@Bean("normalExchange")
public DirectExchange normalExchange(){
return new DirectExchange(NORMAL_EXCHANGE_NAME);
}
//声明死信交换机
@Bean("deadExchange")
public DirectExchange deadExchange(){
return new DirectExchange(DEAD_EXCHANGE_NAME);
}
//声明普通队列
@Bean("normalQueueOne")
public Queue normalQueueOne(){
Map<String, Object> args = new HashMap<>();
//设置死信交换机
args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
//设置死信routingKey
args.put("x-dead-letter-routing-key","dead");
//设置TTL
args.put("x-message-ttl",5000);
return QueueBuilder.durable(NORMAL_QUEUE_NAME_ONE).withArguments(args).build();
}
//声明普通队列
@Bean("normalQueueTwo")
public Queue normalQueueTwo(){
Map<String, Object> args = new HashMap<>();
//设置死信交换机
args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
//设置死信routingKey
args.put("x-dead-letter-routing-key","dead");
//设置TTL
args.put("x-message-ttl",30000);
return QueueBuilder.durable(NORMAL_QUEUE_NAME_TWO).withArguments(args).build();
}
//声明死信队列
@Bean("deadQueue")
public Queue deadQueue(){
return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
}
//绑定普通队列和普通交换机
@Bean
public Binding bindNormalQueueOne(
@Qualifier("normalQueueOne") Queue queue,
@Qualifier("normalExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("normal_one");
}
//绑定普通队列和普通交换机
@Bean
public Binding bindNormalQueueTwo(
@Qualifier("normalQueueTwo") Queue queue,
@Qualifier("normalExchange") DirectExchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("normal_two");
}
//绑定死信队列和死信交换机
@Bean
public Binding bindDeadQueue(
@Qualifier("deadQueue") Queue queue,
@Qualifier("deadExchange") DirectExchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("dead");
}
}
消费者
package com.songzhishu.rabbitmq.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.consumer
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 19:35
* @Description: 消费之死信队列
* @Version: 1.0
*/
@Slf4j
@Component
public class DeadQueueConsumer {
//消费消息
@RabbitListener(queues = "dead_queue")
public void receiveD(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},消息内容{}", new Date().toString(), msg);
}
}
以上的案例是在队列中设置ttl,还有一种方式是在消息的发送端,也就是我们的生产者中设置。
@GetMapping("/sendExpirationMsg/{message}/{ttl}")
private void sendMsgTTL(
@PathVariable("message") String message,
@PathVariable("ttl") String ttl
) {
log.info("当前时间:{},消息内容:{},设置的时间{}毫秒,发送消息两个队列", new Date().toString(), message, ttl);
rabbitTemplate.convertAndSend("normal_exchange", "normal_three", "消息来自 ttl 为 "+ttl+"毫秒 的队列: " + message, msg -> {
msg.getMessageProperties().setExpiration(ttl);
return msg;
});
}
配置类中添加一个普通的未设置ttl的队列并和交换机绑定关系
//声明普通队列
@Bean("normalQueueThree")
public Queue normalQueueThree() {
Map<String, Object> args = new HashMap<>();
//设置死信交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
//设置死信routingKey
args.put("x-dead-letter-routing-key", "dead");
return QueueBuilder.durable(NORMAL_QUEUE_NAME_THREE).withArguments(args).build();
}
//绑定
@Bean
public Binding bindNormalQueueThree(
@Qualifier("normalQueueThree") Queue queue,
@Qualifier("normalExchange") DirectExchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("normal_three");
}
命名消息二的时间短过期以后应该被死信队列处理啊,为啥子要在延迟时间长的消息一的后面!
消息可能并不会按时“死亡“,因为 **RabbitMQ****只会检查第一个消息是否过期**,如果过期则丢到死信队列,**如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行**。
3.2、Rabbitmq 插件实现延迟队列
3.2.1、安装插件
在官网上下载 Community Plugins — RabbitMQ,下载**rabbitmq_delayed_message_exchange** 插件,然后解压放置到 RabbitMQ 的插件目录。进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
这里说一下在Windows下的安装插件也是一样的操作命令!
生产者:
@GetMapping("/sendDelayedMsg/{message}/{ttl}")
private void sendDelayedMsg(
@PathVariable("message") String message,
@PathVariable("ttl") int ttl
) {
log.info("当前时间:{},消息内容:{},设置的时间{}毫秒,发送消息到延时队列", new Date().toString(), message, ttl);
rabbitTemplate.convertAndSend("delayed_exchange", "delayed.routingkey", "消息来自 ttl 为 "+ttl+"毫秒 的队列: " + message, msg -> {
msg.getMessageProperties().setDelay(ttl);
return msg;
});
}
配置类
package com.songzhishu.rabbitmq.config;
import com.sun.tracing.ProbeName;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.consumer
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 22:44
* @Description: TODO
* @Version: 1.0
*/
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delay_queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean("delayedExchange")
public CustomExchange delayedExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}
@Bean("delayedQueue")
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean
public Binding delayedQueueBindingDelayedExchange(
@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") Exchange Exchange
) {
return BindingBuilder.bind(queue).to(Exchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
消费者
package com.songzhishu.rabbitmq.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.consumer
* @Author: 斗痘侠
* @CreateTime: 2024-01-29 19:35
* @Description: 消费延时队列
* @Version: 1.0
*/
@Slf4j
@Component
public class DelayedQueueConsumer {
//消费消息
@RabbitListener(queues = "delay_queue")
public void receiveD(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},消息内容{}", new Date().toString(), msg);
}
}
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景
4、发布确认-高级
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?
生产者发消息给mq,如果收不到但是对于生产者来讲发送的消息的任务已经完成,事实上对于数据的并没正常的传递,那么这种情况就会导致数据的丢失,所以需要一个发布确认的机制给生产者并且还要有一个缓存机制来暂时存储这些没有被接收的数据进行二次的发送!
4.1、Springboot版本
配置文件
spring.rabbitmq.publisher-confirm-type=correlated
- NONE:禁用发布确认模式,是默认值
- CORRELATED:发布消息成功到交换器后会触发回调方法
- SIMPLE:经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。
生产者:
package com.songzhishu.rabbitmq.controller;
import com.songzhishu.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import springfox.documentation.swagger2.annotations.EnableSwagger2WebFlux;
import springfox.documentation.swagger2.annotations.EnableSwagger2WebMvc;
import javax.annotation.Resource;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.controller
* @Author: 斗痘侠
* @CreateTime: 2024-01-30 09:41
* @Description: TODO
* @Version: 1.0
*/
@Slf4j
@RestController
@RequestMapping("/confirm")
@EnableSwagger2WebMvc
@EnableSwagger2WebFlux
public class ConfirmProducerController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendConfirmMsg(@PathVariable("message") String message) {
log.info("消息{}",message);
CorrelationData correlationData=new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);
}
}
配置类:
package com.songzhishu.rabbitmq.config;
import com.sun.tracing.ProbeName;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.config
* @Author: 斗痘侠
* @CreateTime: 2024-01-30 09:26
* @Description: 确认机制配置类
* @Version: 1.0
*/
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
public static final String CONFIRM_ROUTING_KEY = "confirm.routingkey";
//声明 queue
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
//声明交换机 exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
//绑定关系
@Bean
public Binding confirmQueueBindingConfirmExchange(
@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange
) {
return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
}
}
消费者
package com.songzhishu.rabbitmq.consumer;
import com.rabbitmq.client.Channel;
import com.songzhishu.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Arrays;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.consumer
* @Author: 斗痘侠
* @CreateTime: 2024-01-30 09:48
* @Description: 确认消息消费者
* @Version: 1.0
*/
@Component
@Slf4j
public class ConfirmConsumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("接收到ConfirmQueue队列中的消息:{}", msg);
}
}
回调
package com.songzhishu.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.config
* @Author: 斗痘侠
* @CreateTime: 2024-01-30 10:34
* @Description: 交换机回调
* @Version: 1.0
*/
@Component
@Slf4j
public class CallBack implements RabbitTemplate.ConfirmCallback {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* @description:
* @author: 斗痘侠
* @date: 2024/1/30 10:35
* @param: correlationData 保存回调的消息的id
* @param: b 交换机收到消息
* @param: s 没有收到消息的原因
**/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData.getId() != null ? correlationData.getId() : "";
if (b) {
log.info("消息被交换机收取,消息id:{}", id);
} else {
log.info("消息没有被成功接受,消息id:{},失败的原因是{}", id, s);
}
}
@PostConstruct
public void init() {
//注入
rabbitTemplate.setConfirmCallback(this);
}
}
测试正常情况
测试交换机异常情况
测试队列异常
这时候交换机正常的接受消息,但是由于routingkey没有对比成功,从而导致消息没有被接收。
4.2、回退消息
** 在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的**。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
publisher-returns: true #rout失败会退回消息给生产者
回调
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}", new
String(returnedMessage.getMessage().getBody()),
returnedMessage.getExchange(),
returnedMessage.getReplyText(),
returnedMessage.getRoutingKey());
}
@PostConstruct
public void init() {
//注入
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
4.3、备份交换机
有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。
在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
配置类
@Bean("backupQueue")
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean("warningQueue")
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
//备份交换机
@Bean("backupExchange")
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
//声明交换机 exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
.withArguments(arguments).build();
}
//绑定
@Bean
public Binding backupQueueBindingBackupExchange(
@Qualifier("backupQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange exchange
) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding warningQueueBindingBackupExchange(
@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange exchange
) {
return BindingBuilder.bind(queue).to(exchange);
}
消费者
package com.songzhishu.rabbitmq.consumer;
import com.rabbitmq.client.Channel;
import com.songzhishu.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @BelongsProject: RabbitMQ
* @BelongsPackage: com.songzhishu.rabbitmq.consumer
* @Author: 斗痘侠
* @CreateTime: 2024-01-30 12:29
* @Description: TODO
* @Version: 1.0
*/
@Component
@Slf4j
public class WarningConsumer {
@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receptionWarningMsg(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("接收到warningQueue中不可路由的消息{}", msg);
}
}
测试结果
mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是**备份交换机优先级高**。
版权归原作者 蒋一清 所有, 如有侵权,请联系我们删除。