背景
RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。
原理
死信队列和普通队列区别不是很大 普通与死信队列都有自己独立的交换机和路由key、队列和消费者。
区别:
1.生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到
普通队列中缓存起来,普通队列对应有自己独立普通消费者。
2.如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费
的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机
对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。
普通队列中,普通队列发现该消息一直没有被消费者消费
的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机
对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。
产生死信队列的原因
工具类:
public class RabbitMqUtils {
//得到一个连接的 channel
public static Channel getChannel() throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel();
return channel;
}
}
- 消息投递到MQ中存放 消息已经过期 消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。
**消息 TTL 过期 **:
生产者代码:
public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //设置消息的 TTL 时间 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); //该信息是用作演示队列个数限制 for (int i = 1; i <11 ; i++) { String message="info"+i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes()); System.out.println("生产者发送消息:"+message); } } } }
消费端:
普通消费:
public class Consumer01 { //普通交换机名称 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机名称 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明死信和普通交换机 类型为 direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信队列绑定死信交换机与 routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(); //正常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信 routing-key 参数 key 是固定值 params.put("x-dead-letter-routing-key", "lisi"); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer01 接收到消息"+message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); } }
死信消费:
public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println("等待接收死信队列消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer02 接收死信队列的消息" + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); } }
运行结果查询:
** 1.启动消费端->启动死信队列****->启动生产者 查看结果**
** 2.关闭消费端->启动生产者 再次查看结果 **结果原因:消费端队列关闭,消息时间过期会进入死信队列
- 队列达到最大的长度 (队列容器已经满了)
生产者代码:
public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //该信息是用作演示队列个数限制 for (int i = 1; i <11 ; i++) { String message="info"+i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes()); System.out.println("生产者发送消息:"+message); } } } }
消费端:
普通消费:
public class Consumer01 { //普通交换机名称 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机名称 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明死信和普通交换机 类型为 direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信队列绑定死信交换机与 routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(); //正常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信 routing-key 参数 key 是固定值 params.put("x-dead-letter-routing-key", "lisi"); params.put("x-max-length",6); //设置队列长度为6 String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer01 接收到消息"+message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); } }
死信消费:
public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println("等待接收死信队列消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer02 接收死信队列的消息" + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); } }
运行结果查询:
** 1.启动消费端->启动死信队列****->启动生产者 查看结果**
** **** **结果原因:消费端队列只能存储6条消息,剩下的消息进入死信队列
3. 消费者消费多次消息失败,就会转移存放到死信队列中
**注意:与其他不同的是,需要开启手动应答 **
生产者代码:
public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //该信息是用作演示队列个数限制 for (int i = 1; i <11 ; i++) { String message="info"+i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes()); System.out.println("生产者发送消息:"+message); } } } }
消费端:
普通消费:
public class Consumer01 { //普通交换机名称 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机名称 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明死信和普通交换机 类型为 direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信队列绑定死信交换机与 routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(); //正常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信 routing-key 参数 key 是固定值 params.put("x-dead-letter-routing-key", "lisi"); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); if(message.equals("info5")){ System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息"); //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); }else { System.out.println("Consumer01 接收到消息"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> { }); } }
死信消费:
public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println("等待接收死信队列消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer02 接收死信队列的消息" + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); } }
运行结果查询:
** 1.启动消费端->启动死信队列****->启动生产者 查看结果**
** **** 结果原因:消费端队列消息中为info5时,消费端拒绝签收,只能进入死信队列**
版权归原作者 你我约定有三 所有, 如有侵权,请联系我们删除。