RabbitMQ死信队列
1、概述
DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数
x-dead-letter-exchange
指定交换机即可。
2、代码实现
现在,我们通过创建一个过期队列来实现死信队列。
2.1、依赖和配置文件
创建Maven工程
DEADQueUe
,导入相关依赖,这里我创建了父工程来进行依赖的版本管理,
可以直接创建Spring Boot项目来进行代码的编写
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>
编写配置文件:
application.yaml
# 服务端口server:port:8080spring:#账号密码rabbitmq:username: admin
password: admin
#主机地址host: 192.168.200.200
virtual-host: /
port:5672
2.2、配置两个队列
2.2.1、过期队列配置
创建config文件夹,以Direct模式为例,先创建
TTLRabbitConfig.java
@ConfigurationpublicclassTTLRabbitConfig{//创建一个过期时间为5秒的队列@BeanpublicQueueDirectQueue1(){Map<String,Object> args =newHashMap<>();
args.put("x-message-ttl",5000);//这里一定是int值returnnewQueue("ttl_direct_queue",true,false,false,args);}//2、创建交换机@BeanpublicDirectExchangeDirectTestExchange(){returnnewDirectExchange("direct_test_exchange",true,false);}//3、绑定交换机和队列@BeanpublicBindingbindingDirect1(){returnBindingBuilder.bind(DirectQueue1()).to(DirectTestExchange()).with("ttl");}}
这样我们就创建了一个会过期的队列
ttl_direct_queue
,并且将它绑定在了交换机上。
2.2.2、死信队列配置
编写死信队列相关配置:
DeadRabbitConfig.java
复制
TTLRabbitConfig.java
将其改名为
DeadRabbitConfig.java
,更改方法名、对应的队列名、交换机名称以及
routeKey
publicclassDeadRabbitConfig{//创建一个普通队列存储死信@BeanpublicQueueDeadQueue1(){returnnewQueue("dead_direct_queue",true);}//创建交换机@BeanpublicDirectExchangeDeadExchange(){returnnewDirectExchange("dead_direct_exchange",true,false);}//绑定交换机和队列@BeanpublicBindingdeadBinding(){returnBindingBuilder.bind(DeadQueue1()).to(DeadExchange()).with("dead");}}
2.2.3、在过期队列中设置死信队列
可以看到,我们只是创建了Direct模式下的一个普通队列和交换机并将它们绑定在一起而已。
那么,如何让过期的队列消息进入我们创建的死信队列中?
还记得rabbitmq的工作原理吗?要让消息进入队列,只需要将消息发送到交换机,通过交换机把消息分发即可。
所以我们只需要指定过期队列消息该进入哪个死信交换机就行
在
TTLRabbitConfig.java
文件中创建过期队列的方法中添加以下代码:
//指定进入的死信交换机
args.put("x-dead-letter-exchange","dead_direct_exchange");
**因为我们使用的是
Direct
模式,所以还需要指定对应的
routekey
**
//direct模式需要配置,fanout模式是不需要配置的
args.put("x-dead-letter-routing-key","dead");
配置改完以后:
TTLRabbitConfig.java
@ConfigurationpublicclassTTLRabbitConfig{//创建一个过期时间为5秒的队列@BeanpublicQueueDirectQueue1(){Map<String,Object> args =newHashMap<>();
args.put("x-message-ttl",5000);//这里一定是int值//指定进入的死信交换机
args.put("x-dead-letter-exchange","dead_direct_exchange");//direct模式需要配置,fanout模式是不需要配置的
args.put("x-dead-letter-routing-key","dead");returnnewQueue("ttl_direct_queue",true,false,false,args);}//创建交换机@BeanpublicDirectExchangeDirectTestExchange(){returnnewDirectExchange("direct_test_exchange",true,false);}//绑定交换机和队列@BeanpublicBindingbindingDirect1(){returnBindingBuilder.bind(DirectQueue1()).to(DirectTestExchange()).with("ttl");}}
2.2.4、编写发送消息的Service
Service文件夹下创建
DirectMessageService.java
文件,编写发送消息逻辑,
添加
@Component
注解让
Spring
托管
@ComponentpublicclassDirectMessageService{@AutowiredRabbitTemplate rabbitTemplate;publicvoidSendMessage_Direct_Mode(){//使用uuid生成随机消息String message = UUID.randomUUID().toString();//交换机名称String exchangeName ="direct_test_exchange";//打印发送的消息System.out.println("发送消息:"+message);
rabbitTemplate.convertAndSend(exchangeName,"ttl",message);}}
2.2.5、编写测试的接口
编写controller,
DirectController.java
@RestController//使用RestController返回字符串publicclassDirectController{@AutowiredDirectMessageService directMessageService;//队列过期@RequestMapping(value ="/ttlQ")publicStringdirectMessage(){for(int i =0; i <10; i++){directMessageService.SendMessage_Direct_Mode();}return"ok";}}
2.2.6、主启动类
添加主启动类:
DirectProvider.java
@SpringBootApplicationpublicclassDirectProvider{publicstaticvoidmain(String[] args){SpringApplication.run(DirectProvider.class, args);}}
2.2.7、运行以及报错解决
运行,访问:
http://localhost:8080/ttlQ
报错了
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'ttl_direct_queue' in vhost '/': received the value 'dead_direct_exchange' of type 'longstr' but current is none,class-id=50, method-id=10)
web界面已经有了一个
ttl_direct_queue'
队列,是我之前单独玩过期队列时持久化留下的。
这次是过期队列+死信队列,后面新加的配置是不能覆盖先前的队列配置的。
我们点进去看看,这是我们之前的配置。
**把这个队列删除,重新访问
http://localhost:8080/ttlQ
**
2.2.8、观察结果
观察web管理界面,出现一个新的过期队列
ttl_direct_queue'
队列,和一个
dead_direct_queue
队列
我们点进去看看它们的配置。同时交换机视图也会有新的交换机出现。
- Queue列表界面
ttl_direct_queue
——过期队列dead_direct_queue
——死信队列
死信队列和一般队列没有什么不同。
小结
1、死信队列只是一个普通队列而已。
2、要实现死信队列,可以通过创建过期队列,我们只需要编写一个普通队列来接收过期队列中的消息而已。
只需要在设置过期队列的过期时间的同时设置(设置
x-dead-letter-exchange
)这个普通队列的交换机就行。
//指定进入的死信交换机
args.put("x-dead-letter-exchange","dead_direct_exchange");//direct模式需要配置routeKey,fanout模式是不需要配置的
args.put("x-dead-letter-routing-key","dead");
//创建一个过期时间为5秒的队列@BeanpublicQueueDirectQueue1(){Map<String,Object> args =newHashMap<>();
args.put("x-message-ttl",5000);//这里一定是int值//指定进入的死信交换机
args.put("x-dead-letter-exchange","dead_direct_exchange");//direct模式需要配置,fanout模式是不需要配置的
args.put("x-dead-letter-routing-key","dead");returnnewQueue("ttl_direct_queue",true,false,false,args);}
版权归原作者 阙微辰 所有, 如有侵权,请联系我们删除。