Rabbitmq+Springboot详解(附带代码地址)
1.rabbitmq介绍
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
- 相关概念
通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。
- 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。- 中间即是 RabbitMQ,其中包括了 交换机 和 队列。- 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。
那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。
- 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。- 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。- 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。
- 交换机(Exchange)
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout
- Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的 routing_key 匹配时, 才会被交换器投送到绑定的队列中去.- Topic:按规则转发消息(最灵活)- Headers:设置header attribute参数类型的交换机- Fanout:转发消息到所有绑定队列
- Direct Exchange
Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
第一个 X - Q1 就有一个 binding key,名字为 orange; X - Q2 就有 2 个 binding key,名字为 black 和 green。当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。
Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗? - 这个主要是因为可能又有 Q3,而Q3只接受 black 的信息,而Q2不仅接受black 的信息,还接受 green 的信息。
- Topic Exchange
Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
在这种交换机模式下:
- 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。- 路由模式必须包含一个 星号(* ),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements…b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin开头的路由键都是可以的。
具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示routing key,第三个参数即消息。如下:
rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is RabbitMQ!");
topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符:
- *** **表示一个词- **# **表示零个或多个词
- Headers Exchange
headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.
- Fanout Exchange
Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。
交换机类型
上面的
订阅发布模式
、
路由模式
以及
主题模式
使用到了不同的交换机,分别是:
- 直连交换机 Direct
- 扇形交换机 Fanout
- 主题交换器 Topic
Direct Exchange(直连)
直连交换机
被应用在
路由模式
下,该交换机需要通过特定的
routingKey
来绑定队列,交换机只有接收到了匹配的
routingKey
才会将消息转发到对应的队列中,否则就不会转发消息。
路由模式
使用
直连交换机
,该模式下根据
routingKey
绑定特定的队列。
Fanout Exchange(扇形)
扇形交换机
没有路由键的概念,只需将队列绑定在交换机上,发送到交换机上的消息会转发到交换机所以绑定的队列里面,类似广播,只要打开收音机都能接收到广播消息。
扇形交换机
应用于
发布订阅模式
。
Topic Exchange(主题)
主题模式
是将路由键根据一个主题进行分类,和
直连模式
不同的是,
直连模式
绑定
特定
的路由键,而
主题模式
使用通配符绑定路由键,绑定键有两种:
*
表示可以匹配仅一个
。#
表示可以匹配零个或多个
。
2.rabbitmq安装(docker安装)
+ps:这里说下,截止到作者发稿,目前docker很多站点被封(由于特殊原因导致),博主目前已经研究了一套方法去实现快速拉取镜像,如果您有需要,可在博客下面留言,人数多的话博主会考虑出一期教程帮助大家解决问题。
2.1. 拉取镜像
docker pull rabbitmq:3.13.0
2.2. 启动镜像
注意修改用户名和密码
docker run -d-p15672:15672 -p5672:5672 \-eRABBITMQ_DEFAULT_VHOST=my_vhost \-eRABBITMQ_DEFAULT_USER=admin \-eRABBITMQ_DEFAULT_PASS=admin \--hostname myRabbit \--name smkj-rabbitmq \
rabbitmq:3.13.0
参数说明:
- -d:表示在后台运行容器;
- -p:将容器的端口 5672(应用访问端口)和 15672 (控制台Web端口号)映射到主机中;
- -e:指定环境变量:
- RABBITMQ_DEFAULT_VHOST:默认虚拟机名;
- RABBITMQ_DEFAULT_USER:默认的用户名;
- RABBITMQ_DEFAULT_PASS:默认的用户密码;
- –hostname:指定主机名(RabbitMQ 的一个重要注意事项是它根据所谓的 节点名称 存储数据,默认为主机名);
- –name rabbitmq:设置容器名称;
- rabbitmq:容器使用的镜像名称;
启动完成后可以通过 docker ps 命令来查看容器是否启动 还可以设置docker启动时自动启动
docker update smkj-rabbitmq --restart=always
2.3.启动 rabbitmq_management (RabbitMQ后台管理)
//进入容器内部 我这里使用容器名字进入 也可以使用容器id
dockerexec-it smkj-rabbitmq /bin/bash
----------------------------------
//开启web后台管理界面
rabbitmq-plugins enable rabbitmq_management
2.4打开web管理界面
- 浏览器输入地址 hhtp://ip:15672 即可以访问后台管理界面 这里的ip为运行RabbitMQ的服务器ip
- 默认的用户名和密码都是guest
- 但由于我们启动的时候设置了默认的用户名和密码,所以我们可以使用设置的用户名和密码登录。
如果无法访问 可以尝试打开防火墙 如果是在阿里或者腾讯之类的服务器 要打开安全组的端口!! 到这里 我们docker安装RabbitMQ就完成了 接下来进行延迟插件的安装
输入账号密码 都是 admin 登录
2.5安装延迟插件(可选)
插件下载
下载地址 https://www.rabbitmq.com/community-plugins.html
因为我刚才拉的版本是3.13.0 这里也要对应上
将插件上传至服务器,我是放在下面的路径
将刚刚上传的插件拷贝到容器内plugins目录下
docker cp /home/mydata/rabbitmq/rabbitmq_delayed_message_exchange-3.13.0.ez smkj-rabbitmq:/plugins
上传之后进入容器内部
//进入容器 我这里使用容器名字 也可以用容器id进入
dockerexec-it smkj-rabbitmq /bin/bash
-------------------------------------
//移动到plugins目录下
cd plugins
-------------------------------------
//查看是否上传成功
ls
启动延时队列
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
退出容器,然后重启容器
//我这里还是使用容器名称 也可以使用容器id
docker restart smkj-rabbitmq
容器启动成功之后,登录RabbitMQ的管理界面(ip:15672 访问web界面),找到ExchangesTab页。点击Add a new exchange,在Type里面查看是否有x-delayed-message选项,如果存在就代表插件安装成功。
3.rabbitmq+Springboot
3.1. 简单(simple)模式
最简单的消息发送
特点
- 生产者是消费者是一一对应,也叫做
点对点模式
,生产者发送消息经过队列直接发送给消费者。 - 生产者和消费者在发送和接收消息时,只需要指定队列名称,而不需要指定
Exchange
交换机。
生产消息:
@GetMapping("/simple-send")public String simpleSend(){
rabbitTemplate.convertAndSend("simple","this is news");return"ok";}
消费消息
@RabbitListener(queuesToDeclare = @Queue("simple"))
public void consume(String message) {
System.out.println(message);
}
输出:
thisis news
无需创建交换机和绑定队列,只需要匹配发送端和消费端的队列名称就能成功发送消息。
3.2. 工作模式
在多个消费者之间分配任务
特点
工作模式
和简单模式
差不多,只需要生产端、消费端、队列。- 不同在于一个生产者、一个队列对应
多个消费者
,也就是一对多的关系。 - 在多个消费者之间分配消息,类似轮询发送消息,每个消息都只发给一个消费者。
- 生产消息:
@GetMapping("/work-send")public String simpleSend(){
rabbitTemplate.convertAndSend("work","this is news");return"ok";}
- 消费消息:
@RabbitListener(queuesToDeclare = @Queue("work"))
public void consume(String message) {
System.out.println("first:" + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void consumeSecond(String message) {
System.out.println("second:" + message);
}
创建一个生产者,两个消费者,发送两条消息,两个消费者分别接收到消息,输出:
first:thisis news
second:thisis news
两个消费者,轮流消费消息。类似
nginx负载均衡
。
3.3. 发布订阅模式
一次向多个消费者发送消息
特点
- 发布订阅类似广播消息,每个消息可以同时发送给订阅该消息的消费者,
- 上图中的
X
表示交换机,使用的扇形交换机
(fanout),它将发送的消息发送到所有绑定交换机的队列。 - 创建队列、交换机以及绑定:
@Beanpublic FanoutExchange fanoutExchange(){returnnewFanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE");}@Beanpublic Queue psFirstQueue(){returnnewQueue("psFirstQueue");}@Beanpublic Queue psSecondQueue(){returnnewQueue("psSecondQueue");}@Beanpublic Queue psThirdQueue(){returnnewQueue("psThirdQueue");}@Beanpublic Binding routingFirstBinding(){returnBindingBuilder.bind(psFirstQueue()).to(fanoutExchange());}@Beanpublic Binding routingSecondBinding(){returnBindingBuilder.bind(psSecondQueue()).to(fanoutExchange());}@Beanpublic Binding routingThirdBinding(){returnBindingBuilder.bind(psThirdQueue()).to(fanoutExchange());}
- 上面定义一个交换机
fanoutExchange
。 - 分别绑定三个队列
psFirstQueue
、psSecondQueue
、psThirdQueue
。 - 队列绑定交换机不需要
routingKey
,直接绑定即可。
- 生产端:
@GetMapping("/publish-sub-send")public String publishSubSend(){
rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE",null,"publish/subscribe hello");return"ok";}
无需指定
routingKey
,设置为
null
。
- 消费端:
@RabbitListener(queues ="psFirstQueue")publicvoidpubsubQueueFirst(String message){
System.out.println("【first】:"+ message);}@RabbitListener(queues ="psSecondQueue")publicvoidpubsubQueueSecond(String message){
System.out.println("【second】:"+ message);}@RabbitListener(queues ="psThirdQueue")publicvoidpubsubQueueThird(String message){
System.out.println("【third】:"+ message);}
- 输出:
【first】: publish/subscribe hello
【second】: publish/subscribe hello
【third】: publish/subscribe hello
发送一条消息,绑定的队列都能接收到消息。
3.4. 路由模式
根据
routingKey
有选择性的接收消息
特点
- 每个队列根据不同
routingKey
绑定交换机 - 消息发送到交换机后通过
routingKey
发送给特定的队列,然后传到消费者消费。 - 交换由
扇形交换机
(fanout)改成直连交换机
(direct)。 - 创建队列、交换机以及绑定:
@Beanpublic Queue routingFirstQueue(){returnnewQueue("routingFirstQueue");}@Beanpublic Queue routingSecondQueue(){returnnewQueue("routingSecondQueue");}@Beanpublic Queue routingThirdQueue(){returnnewQueue("routingThirdQueue");}@Beanpublic DirectExchange routingExchange(){returnnewDirectExchange("routingExchange");}@Beanpublic Binding routingFirstBind(){returnBindingBuilder.bind(routingFirstQueue()).to(routingExchange()).with("firstRouting");}@Beanpublic Binding routingSecondBind(){returnBindingBuilder.bind(routingSecondQueue()).to(routingExchange()).with("secondRouting");}@Beanpublic Binding routingThirdBind(){returnBindingBuilder.bind(routingThirdQueue()).to(routingExchange()).with("thirdRouting");}
- 创建一个交换机,根据不同的路由规则匹配不同的队列
routingExchange
,根据不同的routingKey
绑定不同的队列: firstRouting
路由键绑定routingFirstQueue
队列。secondRouting
路由键绑定routingSecondQueue
队列。thirdRouting
路由键绑定routingThirdQueue
队列。
- 生产消息:
@GetMapping("/routing-first")public String routingFirst(){// 使用不同的routingKey 转发到不同的队列
rabbitTemplate.convertAndSend("routingExchange","firstRouting"," first routing message");
rabbitTemplate.convertAndSend("routingExchange","secondRouting"," second routing message");
rabbitTemplate.convertAndSend("routingExchange","thirdRouting"," third routing message");return"ok";}
- 消费消息:
@RabbitListener(queues ="routingFirstQueue")publicvoidroutingFirstListener(String message){
System.out.println("【routing first】"+ message);}@RabbitListener(queues ="routingSecondQueue")publicvoidroutingSecondListener(String message){
System.out.println("【routing second】"+ message);}@RabbitListener(queues ="routingThirdQueue")publicvoidroutingThirdListener(String message){
System.out.println("【routing third】"+ message);}
输出:
【routing first】first routing message
【routing second】second routing message
【routing third】third routing message
分析:
rabbitTemplate.convertAndSend("routingExchange","firstRouting"," first routing message");
消息从生产者指定
firstRouting
路由键,找到对应的绑定队列
routingFirstQueue
,就被
routingFirstQueue
队列消费了。
3.5. 主题模式
基于某个主题接收消息
特点
路由模式
发送的消息,是需要指定固定的
routingKey
,如果想要针对一类路由。
比如:
- 只接收以
.com
结尾的消息。 www.
开头的消息。
主题模式
就派上场了,
路由模式
和
主题模式
类似,
路由模式
是设置特定的
routingKey
绑定唯一的队列,而
主题模式
的是使用
通配符
匹配
一个或者多个
队列。
- 创建交换机和队列:
@Beanpublic Queue topicFirstQueue(){returnnewQueue("topicFirstQueue");}@Beanpublic Queue topicSecondQueue(){returnnewQueue("topicSecondQueue");}@Beanpublic Queue topicThirdQueue(){returnnewQueue("topicThirdQueue");}@Beanpublic TopicExchange topicExchange(){returnnewTopicExchange("topicExchange");}
- 使用
通配符
绑定交换机和交换机:
@Bean
publicBindingtopicFirstBind(){// .com 为结尾return BindingBuilder.bind(topicFirstQueue()).to(topicExchange()).with("*.com");}
@Bean
publicBindingtopicSecondBind(){// www.为开头return BindingBuilder.bind(topicSecondQueue()).to(topicExchange()).with("www.#");}
通配符
有两种,
*
和
#
,
*
表示可以匹配一个
。#
表示可以匹配多个
。
比如:
#.com
表示接收多个
以.com
结尾的字段。 - 例如:taobao.com
、www.taobao.com
、www.jd.com
。*.com
表示接收一个
以.com
结尾的字段。 - 例如:taobao.com
、jd.com
。- 多个字段是无法匹配的,比如www.taobao.com
、cn.taobao.com
。www.#
可以匹配多个
以www
开头的字段。 - 例如www.taobao
、www.jd
。www.*
可以匹配一个
以www
开头的字段。 - 例如:www.taobao
、www.jd
。- 多个字段是无法匹配的,比如www.taobao.com
、www.jd.com
。- 生产消息:
@GetMapping("/topic-first-send")public String topicFirstSend(){
rabbitTemplate.convertAndSend("topicExchange","www.taobao.com","www.taobao.com");
rabbitTemplate.convertAndSend("topicExchange","taobao.com","taobao.com");
rabbitTemplate.convertAndSend("topicExchange","www.jd","www.jd");return"topic ok";}
- 消费消息:
@RabbitListener(queues ="topicFirstQueue")publicvoidtopicFirstListener(String message){
System.out.println("【topic first】"+ message);}@RabbitListener(queues ="topicSecondQueue")publicvoidtopicSecondListener(String message){
System.out.println("【topic second】"+ message);}
- 输出:
【topic second】www.taobao.com
【topic first】taobao.com
【topic second】www.jd
www.#
可以匹配多个以
www.
开头的路由键,例如
www.taobao.com
、
www.jd
。而
*.com
只能匹配一个以
.com
结尾的路由键,例如
taobao.com
,而无法匹配
www.taobao.com
。
3.6. RPC模式
消息有返回值
特点
PRC
模式和上面的几种模式唯一不同的点在于,该模式可以收到消费端的返回值
。- 生成端接收消费端的返回值。
- 消费端添加返回值:
@RabbitListener(queuesToDeclare =@Queue("rpcQueue"))public String rpcListener(String message){
System.out.println("【rpc接收消息】"+ message);return"rpc 返回"+ message;}
- 生产端发送消息:
@GetMapping("/rpc-send")publicvoidrpcSend(){
Object receive = rabbitTemplate.convertSendAndReceive("rpcQueue","rpc send message");
System.out.println("【发送消息消息】"+ receive);}
- 输出:
【rpc接收消息】rpc send message
【发送端接收消息】rpc 返回rpc send message
3.7总结
整合
SpringBoot
实现
RabbitMQ
六种工作模式,并详细讲解
RabbitMQ
六种工作模式:
- 简单模式 - 无需创建交换机,匹配生产端和消费的
routingKey
即可。 - 工作模式 - 多个消费端公平竞争同一个消息。
- 发布订阅模式 - 一次向多个消费者发送消息。
- 路由模式 - 根据特定的路由键转发消息。
- 主题模式 - 根据通配符,匹配路由键转发消息。
- RPC模式 - 生产端接收消费端发送的返回值。
4.界面认识
连接数
交换机
5.私信队列和延迟队列(重点)
死信
顾名思义,就是死掉的信息,英文是Dead Letter。
死信交换机(Dead-Letter-Exchange)
和普通交换机没有区别,都是可以接受信息并转发到与之绑定并能路由到的队列,区别在于
死信交换机
是转发
死信
的,而和该
死信交换机
绑定的队列就是
死信队列
。说的再通俗一点,死信交换机和死信队列其实都只是普通的交换机和队列,只不过接受、转发的信息是
死信
,其他操作并没有区别。
1.1.死信的条件
称为
死信
的信息,需要如下几个条件:
- 消息被消费者拒绝(通过basic.reject 或者 back.nack),并且设置 requeue=false。
- 消息过期,因为队列设置了TTL(Time To Live)时间。 - 发送消息时设置消息的生存时间,如果生存时间到了,还没有被消费。- 也可以指定某个队列中所有消息的生产时间,如果生存时间到了,还没有被消费
- 消息被丢弃,因为超过了队列的长度限制。
死信队列的应用:
- 基于死信队列在队列消息已满的情况下,消息也不会丢失
- 实现延迟消费的效果。比如:下订单时,有15分钟的付款时间
代码如下:先创建死信队列和死信交换机 topic,普通队列和普通交换机,topic
packagecom.hjt.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/***
* 死信队列和普通队列配置
*/@ConfigurationpublicclassDeadLetterQueueConfig{// 普通的交换机和队列publicstaticfinalStringNORMAL_EXCHANGE="normal-exchange";publicstaticfinalStringNORMAL_QUEUE="normal-queue";publicstaticfinalStringNORMAL_ROUTING_KEY="normal.#";//死信队列和交换机publicstaticfinalStringDEAD_EXCHANGE="dead-exchange";publicstaticfinalStringDEAD_QUEUE="dead-queue";publicstaticfinalStringDEAD_ROUTING_KEY="dead.#";/**
* 创建普通的交换机
* @return
*/@BeanpublicExchangenormalExchange(){returnExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();}/**
* 创建普通的队列,然后绑定死信队列和交换机
* @return
*/@BeanpublicQueuenormalQueue(){//普通队列这里需要绑定死信交换机returnQueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE)//死信交换机.deadLetterRoutingKey("dead.hello")//绑定的routing_key.build();}/**
* 绑定交换机和队列
* @param normalQueue 队列
* @param normalExchange 交换机
* @return
*/@BeanpublicBindingnormalBinding(Queue normalQueue,Exchange normalExchange){returnBindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();}/**
* 创建死信交换机
* @return
*/@BeanpublicExchangedeadExchange(){returnExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();}/**
* 创建死信队列
* @return
*/@BeanpublicQueuedeadQueue(){returnQueueBuilder.durable(DEAD_QUEUE).build();}/**
* 死信队列绑定死信交换机,并且设置死信topic
* @param deadQueue 队列
* @param deadExchange 交换机
* @return
*/@BeanpublicBindingdeadBinding(Queue deadQueue,Exchange deadExchange){returnBindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();}}
创建普通生成者
packagecom.hjt.producer;importcom.hjt.config.DeadLetterQueueConfig;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;/***
* 死信队列生成者发送端
*/@ComponentpublicclassNormalProducer{@ResourceprivateRabbitTemplate rabbitTemplate;publicvoidpublishDead(){
rabbitTemplate.convertAndSend(DeadLetterQueueConfig.NORMAL_EXCHANGE,"normal.topic","今天晚上有什么安排?");System.out.println("消息发送成功");}}
创建普通消费者
package com.hjt.consumer;
import com.hjt.config.DeadLetterQueueConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/***
* 创建死信消费队列
*/
@Component
public class NormalConsumer {
@RabbitListener(queues = DeadLetterQueueConfig.NORMAL_QUEUE)
public void consume(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到normal-queue队列的消息:"+msg);
//消息被消费者拒绝(通过basic.reject 或者 back.nack),并且设置 requeue=false。
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
/**
* channel.basicNack(deliveryTag, multiple, requeue) 是 RabbitMQ 的 Java 客户端中用于拒绝(Nack)一条或多条消息的方法。下面是对该方法的参数进行解释:
* deliveryTag:消息的交付标签(delivery tag),用于唯一标识一条消息。通过 message.getMessageProperties().getDeliveryTag() 获取消息的交付标签。
* multiple:是否拒绝多条消息。如果设置为 true,则表示拒绝交付标签小于或等于 deliveryTag 的所有消息;如果设置为 false,则只拒绝交付标签等于 deliveryTag 的消息。
* requeue:是否重新入队列。如果设置为 true,则被拒绝的消息会重新放回原始队列中等待重新投递;如果设置为 false,则被拒绝的消息会被丢弃。
*/
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
创造死信队列消费者
package com.hjt.consumer;
import com.hjt.config.DeadLetterQueueConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
@Component
public class DeadConsumer {
@RabbitListener(queues = DeadLetterQueueConfig.DEAD_QUEUE)
public void consume(String msg, Channel channel, Message message) throws IOException {
System.out.println("我是死信队列:"+msg);
//消息被消费者拒绝(通过basic.reject 或者 back.nack),并且设置 requeue=false。
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
执行test测试
package com.hjt.controller;
import com.hjt.producer.NormalProducer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/dead")
public class DealController {
@Resource
private NormalProducer normalProducer;
@GetMapping(value = "/test")
public void test(){
normalProducer.publishDead();
}
}
调用test的时候,运行结果如下所示
消息发送成功
接收到normal-queue队列的消息:今天晚上有什么安排?
我是死信队列:今天晚上有什么安排?
1.2.延迟队列
代码如下:
packagecom.hjt.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;@ConfigurationpublicclassDelayedConfig{publicstaticfinalStringDELAYED_EXCHANGE="delayed-exchange";publicstaticfinalStringDELAYED_QUEUE="delayed-queue";publicstaticfinalStringDELAYED_ROUTING_KEY="delayed.#";/**
* 创建一个延迟交换机(Exchange)并返回该交换机对象。
* @return
*/@BeanpublicExchangedelayedExchange(){//创建了一个HashMap对象,用于存储交换机的属性。然后,将一个名为x-delayed-type的属性和值为"topic"的键值对添加到HashMap中。HashMap<String,Object> map =newHashMap<>();
map.put("x-delayed-type","topic");/**
* 使用CustomExchange类创建一个自定义交换机对象。CustomExchange是Spring AMQP库提供的一个类,用于创建自定义的交换机。构造方法的参数依次为交换机的名称、类型、是否持久化、是否自动删除和属性。
* ,交换机的名称为DELAYED_EXCHANGE,类型为"x-delayed-message",持久化为true,自动删除为false,属性为之前创建的HashMap对象。
*/returnnewCustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map);}/**
* 创建队列
* @return
*/@BeanpublicQueuedelayedQueue(){returnQueueBuilder.durable(DELAYED_QUEUE).build();}/**
* 绑定交换机和队列
* @param delayedQueue
* @param delayedExchange
* @return
*/@BeanpublicBindingdelayedBinding(Queue delayedQueue,Exchange delayedExchange){returnBindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}}
延迟队列生成者
packagecom.hjt.producer;importcn.hutool.core.date.DateUtil;importcom.hjt.config.DelayedConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.Map;@Component@Slf4jpublicclassDelayProducer{@ResourceprivateRabbitTemplate rabbitTemplate;publicvoidpublishDelay(){
rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE,"delayed.hello","大江东去浪淘尽",newMessagePostProcessor(){//创建了一个匿名内部类实现了MessagePostProcessor接口,并重写了postProcessMessage()方法。在该方法中,设置了消息的延迟时间为50,000毫秒(即50秒)@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
log.error("延迟队列发送的时间{}",DateUtil.now());// //设置消息的延迟时间,单位为毫秒。// message.getMessageProperties().setDelay();// 使用x-delay头设置延迟时间(单位为毫秒)Map<String,Object> headers = message.getMessageProperties().getHeaders();
headers.put("x-delay",5000);return message;}});System.out.println("消息发送成功");}}
输出结果
2024-08-08 11:19:18.416 ERROR 23148 --- [nio-8184-exec-1] com.hjt.producer.DelayProducer : 延迟队列发送的时间2024-08-08 11:19:18
消息发送成功
2024-08-08 11:19:23.434 ERROR 23148 --- [ntContainer#1-1] com.hjt.consumer.DelayConsumer : 延迟队列消费的时间2024-08-08 11:19:23
2024-08-08 11:19:23.434 ERROR 23148 --- [ntContainer#1-1] com.hjt.consumer.DelayConsumer : 我是延迟消费,消息为大江东去浪淘尽
这里需要主要,如果是 message.getMessageProperties().setDelay(); 这种设置方式的话,是int类型,因为单位是毫秒,最长时间不会超过
24.8天,如果是 Map<String, Object> headers = message.getMessageProperties().getHeaders(); headers.put(“x-delay”,5000);这种方式的话,因为单位是long,最长时间可以设置大约相当于 292,471,208 年。
ps:这里需要注意有一个问题:
就是队列的先进先出原则导致的问题,当先进入队列的消息的过期时间比后进入消息中的过期时间长的时候,消息是串行被消费的,所以必然是等到先进入队列的消息的过期时间结束, 后进入队列的消息的过期时间才会被监听,然而实际上这个消息早就过期了,这就导致了本来过期时间为3秒的消息,实际上过了13秒才会被处理,这在实际应用场景中肯定是不被允许的;
解决方法:下载延迟交换机插件(上面有介绍)
上述代码地址
版权归原作者 有点东西且很多 所有, 如有侵权,请联系我们删除。