文章目录
一、什么是延迟消息
假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现?
RabbitMQ使用死信队列,可以实现消息的延迟接收。
1、队列的属性
队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。
这个属性交:x-message-ttl
所有队列中的消息超过时间未被消费时,都会过期。不管是谁发送的消息都一视同仁。
@Bean("ttlQueue")publicQueuequeue(){Map<String,Object> map =newHashMap<String,Object>();
map.put("x-message-ttl",11000);// 队列中的消息未被消费11秒后过期// map.put("x-expire", 30000); // 队列30秒没有使用以后会被删除returnnewQueue("TTL_QUEUE",true,false,false, map);}
但是这种方式似乎并不是那么的灵活。所以RabbitMQ的消息也有单独的过期时间属性。
2、消息的属性
在生产者发送消息时,可以通过MessageProperties指定消息属性。
MessageProperties messageProperties =newMessageProperties();
messageProperties.setExpiration("4000");// 消息的过期属性,单位ms
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message =newMessage("这条消息4秒后过期".getBytes(), messageProperties);
rabbitTemplate.send("TTL_EXCHANGE","test.ttl", message);
那么问题来了:如果队列的TTL是6秒过期,消息的TTL是10秒过期,这个消息会在什么时候被丢弃?
答:如果同时指定了Message TTL和Queue TTL,那么小的那个会生效。
3、什么是死信
上面我们了解到,rabbitMQ的消息可以设置过期时间,消息过期后会被直接丢弃,我们可以通过配置死信队列,将这种消息变成死信(Dead Letter),然后将这种过期的消息丢入死信队列。
队列在创建的时候可以指定一个死信交换机DLX(Dead Letter Exchange)。死信交换机绑定的队列被称为死信队列DLQ(Dead Letter Queue),DLX实际上也是普通的交换机,DLQ也是普通的队列。

也就是说,如果消息过期了,队列指定了DLX,就会发送到DLX。如果DLX绑定了DLQ,就会路由到DLQ。路由到DLQ之后,我们就可以消费死信队列了。
4、使用死信队列的缺点
(1)如果统一用队列来设置消息的TTL,当梯度非常多的情况下,比如1分钟、2分钟、5分钟、10分钟……需要创建很多交换机和队列来路由消息,这时可以考虑使用消息的TTL。
(2)如果单独设置消息的TTL,则可能会造成队列中的
消息阻塞
——前一条消息没有出队(没有被消费),后面的消息无法投递(比如第一条消息的过期时间是30分钟,第二条消息的过期时间是10分钟。10分钟后,即使第二条消息应该投递了,但是由于第一条消息还未出队,所以无法投递)。
(3)可能存在一定的时间误差。
5、延时消息插件
在RabbitMQ 3.5.7及以后的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延时队列功能(Linux和Windows都可以用)。同时插件依赖Erlang/OPT 18.0及以上。
插件源码地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
找到对应版本的插件,然后下载。
# 下载到plugins目录cd rabbitmq_server-3.7.7/plugins
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
# 启用插件
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 停用插件
./rabbitmq-plugins disable rabbitmq_delayed_message_exchange
此时,在管理界面的创建交换机页面,会出现一个x-delayed-message类型的交换机:
二、JavaAPI利用死信队列实现RabbitMQ延迟消息
1、代码实现
引包:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>
importcom.gupaoedu.util.ResourceUtil;importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.impl.AMQBasicProperties;importjava.util.HashMap;importjava.util.Map;/**
* 消息生产者,通过TTL测试死信队列
*/publicclassDlxProducer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));// rabbitmq.uri=amqp://admin:[email protected]:5672// 建立连接Connection conn = factory.newConnection();// 创建消息通道Channel channel = conn.createChannel();String msg ="Hello world, Rabbit MQ, DLX MSG";// 设置属性,消息10秒钟过期AMQP.BasicProperties properties =newAMQP.BasicProperties.Builder().deliveryMode(2)// 持久化消息.contentEncoding("UTF-8").expiration("5000")// TTL.build();// 发送消息,普通队列
channel.basicPublish("ORI_USE_EXCHANGE","ORI_USE_QUEUE", properties, msg.getBytes());
channel.close();
conn.close();}}
importcom.gupaoedu.util.ResourceUtil;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.HashMap;importjava.util.Map;/**
* 5秒钟后,消息会从正常队列 ORI_USE_QUEUE 到达死信交换机 DEAD_LETTER_EXCHANGE ,然后路由到死信队列 DEAD_LETTER_QUEUE
*
*/publicclassDlxConsumer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));// rabbitmq.uri=amqp://admin:[email protected]:5672// 建立连接Connection conn = factory.newConnection();// 创建消息通道Channel channel = conn.createChannel();// 指定队列的死信交换机Map<String,Object> arguments =newHashMap<String,Object>();
arguments.put("x-dead-letter-exchange","DEAD_LETTER_EXCHANGE");// arguments.put("x-expires",9000L); // 设置队列的TTL// arguments.put("x-max-length", 4); // 如果设置了队列的最大长度,超过长度时,先入队的消息会被发送到DLX// 声明交换机// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare("ORI_USE_EXCHANGE","direct",false,false,null);// 声明队列(默认交换机AMQP Direct)// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare("ORI_USE_QUEUE",false,false,false, arguments);// 绑定队列和交换机,以及routingKey
channel.queueBind("ORI_USE_QUEUE","ORI_USE_EXCHANGE","ORI_USE_QUEUE");// 声明死信交换机
channel.exchangeDeclare("DEAD_LETTER_EXCHANGE","topic",false,false,false,null);// 声明死信队列
channel.queueDeclare("DEAD_LETTER_QUEUE",false,false,false,null);// 绑定,此处 Dead letter routing key 设置为 #
channel.queueBind("DEAD_LETTER_QUEUE","DEAD_LETTER_EXCHANGE","#");System.out.println(" Waiting for message....");// 创建消费者Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body,"UTF-8");System.out.println("Received message : '"+ msg +"'");}};// 开始获取消息,消费死信队列// String queue, boolean autoAck, Consumer callback
channel.basicConsume("DEAD_LETTER_QUEUE",true, consumer);}}
2、基本流程
利用消息的过期时间,过期之后投递到死信交换机(DLX),路由到死信队列(DLQ),我们消费者监听死信队列(DLQ),实现延迟消息。
消息的流转流程:生产者- 原交换机 - 原队列(超过TTL之后) - 死信交换机 - 死信队列 - 最终消费者。

三、JavaAPI利用插件实现RabbitMQ延迟消息
1、代码实现
importcom.rabbitmq.client.*;importjava.io.IOException;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.HashMap;importjava.util.Map;/**
* 使用延时插件实现的消息投递-消费者
* 必须要在服务端安装rabbitmq-delayed-message-exchange插件
* 先启动消费者
*/publicclassDelayPluginConsumer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setUri("amqp://admin:[email protected]:5672");// 建立连接Connection conn = factory.newConnection();// 创建消息通道Channel channel = conn.createChannel();// 声明x-delayed-message类型的exchangeMap<String,Object> argss =newHashMap<String,Object>();
argss.put("x-delayed-type","direct");
channel.exchangeDeclare("DELAY_EXCHANGE","x-delayed-message",false,false, argss);// 声明队列
channel.queueDeclare("DELAY_QUEUE",false,false,false,null);// 绑定交换机与队列
channel.queueBind("DELAY_QUEUE","DELAY_EXCHANGE","DELAY_KEY");// 创建消费者Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body,"UTF-8");SimpleDateFormat sf=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");System.out.println("收到消息:["+ msg +"]\n接收时间:"+sf.format(newDate()));}};// 开始获取消息// String queue, boolean autoAck, Consumer callback
channel.basicConsume("DELAY_QUEUE",true, consumer);}}
importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.text.SimpleDateFormat;importjava.util.Calendar;importjava.util.Date;importjava.util.HashMap;importjava.util.Map;/**
* 使用延时插件实现的消息投递-生产者
* 必须要在服务端安装rabbitmq-delayed-message-exchange插件
* 先启动消费者
*/publicclassDelayPluginProducer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setUri("amqp://admin:[email protected]:5672");// 建立连接Connection conn = factory.newConnection();// 创建消息通道Channel channel = conn.createChannel();// 延时投递,比如延时10秒Date now =newDate();Calendar calendar =Calendar.getInstance();
calendar.add(Calendar.SECOND,+10);// 10秒Date delayTime = calendar.getTime();// 定时投递,把这个值替换delayTime即可// Date exactDealyTime = new Date("2019/01/14,22:30:00");SimpleDateFormat sf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String msg ="发送时间:"+ sf.format(now)+",投递时间:"+ sf.format(delayTime);// 延迟的间隔时间,目标时刻减去当前时刻Map<String,Object> headers =newHashMap<String,Object>();
headers.put("x-delay", delayTime.getTime()- now.getTime());AMQP.BasicProperties.Builder props =newAMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("DELAY_EXCHANGE","DELAY_KEY", props.build(),
msg.getBytes());
channel.close();
conn.close();}}
2、基本原理
rabbitMQ的延迟消息插件,可以有效的避免消息堵塞问题。
相当于投递给一个延迟消息的交换机,并指定延迟时间,大大简化了开发。
四、Springboot利用死信队列实现延迟消息
1、配置实现
importorg.springframework.amqp.core.*;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;/**
* 死信队列 DLX DLQ
*/@ConfigurationpublicclassDlxConfig{@BeanpublicConnectionFactoryconnectionFactory()throwsException{CachingConnectionFactory cachingConnectionFactory =newCachingConnectionFactory();
cachingConnectionFactory.setUri(ResourceUtil.getKey("rabbitmq.uri"));return cachingConnectionFactory;}@BeanpublicRabbitAdminrabbitAdmin(ConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){returnnewRabbitTemplate(connectionFactory);}@Bean("oriUseExchange")publicDirectExchangeexchange(){returnnewDirectExchange("ORI_USE_EXCHANGE",true,false,newHashMap<>());}@Bean("oriUseQueue")publicQueuequeue(){Map<String,Object> map =newHashMap<String,Object>();
map.put("x-message-ttl",10000);// 10秒钟后成为死信
map.put("x-dead-letter-exchange","DEAD_LETTER_EXCHANGE");// 队列中的消息变成死信后,进入死信交换机returnnewQueue("ORI_USE_QUEUE",true,false,false, map);}@BeanpublicBindingbinding(@Qualifier("oriUseQueue")Queue queue,@Qualifier("oriUseExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with("ori.use");}/**
* 队列的死信交换机
* @return
*/@Bean("deatLetterExchange")publicTopicExchangedeadLetterExchange(){returnnewTopicExchange("DEAD_LETTER_EXCHANGE",true,false,newHashMap<>());}@Bean("deatLetterQueue")publicQueuedeadLetterQueue(){// 消费者只监听该队列即可returnnewQueue("DEAD_LETTER_QUEUE",true,false,false,newHashMap<>());}@BeanpublicBindingbindingDead(@Qualifier("deatLetterQueue")Queue queue,@Qualifier("deatLetterExchange")TopicExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with("#");// 无条件路由}}
importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.context.annotation.AnnotationConfigApplicationContext;importorg.springframework.context.annotation.ComponentScan;@ComponentScan(basePackages ="com.dlx.ttl")publicclassDlxSender{publicstaticvoidmain(String[] args){AnnotationConfigApplicationContext context =newAnnotationConfigApplicationContext(DlxSender.class);RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);// 随队列的过期属性过期,单位ms
rabbitTemplate.convertAndSend("ORI_USE_EXCHANGE","ori.use","测试死信消息");}}
五、Springboot利用插件实现延迟消息
1、配置实现
importorg.springframework.amqp.core.*;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;// 配置类@ConfigurationpublicclassDelayPluginConfig{@BeanpublicConnectionFactoryconnectionFactory()throwsException{CachingConnectionFactory cachingConnectionFactory =newCachingConnectionFactory();
cachingConnectionFactory.setUri("amqp://admin:[email protected]:5672");return cachingConnectionFactory;}@BeanpublicRabbitAdminrabbitAdmin(ConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){returnnewRabbitTemplate(connectionFactory);}@Bean("delayExchange")publicTopicExchangeexchange(){Map<String,Object> argss =newHashMap<String,Object>();
argss.put("x-delayed-type","direct");returnnewTopicExchange("DELAY_EXCHANGE",true,false, argss);}@Bean("delayQueue")publicQueuedeadLetterQueue(){returnnewQueue("DELAY_QUEUE",true,false,false,newHashMap<>());}@BeanpublicBindingbindingDead(@Qualifier("delayQueue")Queue queue,@Qualifier("delayExchange")TopicExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with("#");// 无条件路由}}
importcom.rabbitmq.client.*;importjava.io.IOException;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.HashMap;importjava.util.Map;/**
* 消费者
*/publicclassDelayPluginConsumer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setUri("amqp://admin:[email protected]:5672");// 建立连接Connection conn = factory.newConnection();// 创建消息通道Channel channel = conn.createChannel();// 创建消费者Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body,"UTF-8");SimpleDateFormat sf=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");System.out.println("收到消息:["+ msg +"]\n接收时间:"+sf.format(newDate()));}};// 开始获取消息// String queue, boolean autoAck, Consumer callback
channel.basicConsume("DELAY_QUEUE",true, consumer);}}
importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageProperties;importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.context.annotation.AnnotationConfigApplicationContext;importorg.springframework.context.annotation.ComponentScan;importjava.text.SimpleDateFormat;importjava.util.Calendar;importjava.util.Date;/**
* 生产者
* 延时消息插件,去管控台队列看有无收到消息
*/@ComponentScan(basePackages ="com.dlx.delayplugin")publicclassDelayPluginProducer{publicstaticvoidmain(String[] args){AnnotationConfigApplicationContext context =newAnnotationConfigApplicationContext(DelayPluginProducer.class);RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);// 延时投递,比如延时4秒Date now =newDate();Calendar calendar =Calendar.getInstance();
calendar.add(Calendar.SECOND,+4);Date delayTime = calendar.getTime();// 定时投递,把这个值替换delayTime即可// Date exactDealyTime = new Date("2019/06/24,22:30:00");SimpleDateFormat sf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String msg ="延时插件测试消息,发送时间:"+ sf.format(now)+",理论路由时间:"+ sf.format(delayTime);MessageProperties messageProperties =newMessageProperties();// 延迟的间隔时间,目标时刻减去当前时刻
messageProperties.setHeader("x-delay", delayTime.getTime()- now.getTime());Message message =newMessage(msg.getBytes(), messageProperties);// 不能在本地测试,必须发送消息到安装了插件的服务端
rabbitTemplate.send("DELAY_EXCHANGE","#", message);}}
版权归原作者 秃了也弱了。 所有, 如有侵权,请联系我们删除。