目录
简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
历史
Rabbit科技有限公司开发了RabbitMQ,并提供对其的支持。起初,Rabbit科技是LSHIFT和CohesiveFT在2007年成立的合资企业,2010年4月被VMware旗下的SpringSource收购。RabbitMQ在2013年5月成为GoPivotal的一部分。
特性
- 可伸缩性:集群服务
- 消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存
优点
- 解耦(为面向服务的架构(SOA)提供基本的最终一致性实现)
- 异步提升效率
- 流量削峰
缺点
- 系统的可用性降低:系统引入的外部依赖越多,系统越容易挂掉,本来只是A系统调用BCD三个系统接口就好,ABCD四个系统不报错整个系统会正常运行。引入了MQ之后,虽然ABCD系统没出错,但MQ挂了以后,整个系统也会崩溃。
- 系统的复杂性提高:引入了MQ之后,需要考虑的问题也变得多了,如何保证消息没有重复消费?如何保证消息不丢失?怎么保证消息传递的顺序?
- 一致性问题:A系统发送完消息直接返回成功,但是BCD系统之中若有系统写库失败,则会产生数据不一致的问题。
网址
版本对应网址:http://www.rabbitmq.com/which-erlang.html
Erlang官网:https://www.erlang-solutions.com/downloads/
RabbitMQ官网:https://www.rabbitmq.com/
Erlang下载(Weindows):https://www.erlang.org/downloads
Erlang下载(Linux):https://packagecloud.io/rabbitmq/erlang
RabbitMQ下载:https://www.rabbitmq.com/download.html
RabbitMQ教程:https://www.rabbitmq.com/getstarted.html
安装
RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写,即需要先安装部署Erlang环境再安装RabbitMQ环境。另外RabbitMQ所需的Erlang版本是有要求的,可根据两者版本号对应表安装相应版本的Erlang和RabbitMQ。
Windows
1、Erlang下载:https://www.erlang.org/downloads
2、安装
3、无脑下一步即可,中途可以选择安装的位置
4、安装完成后,打开高级系统设置,配置环境变量
5、配置
ERLANG_HOME
6、在path下添加
%ERLANG_HOME%\bin
7、下载RabbitMQ:https://www.rabbitmq.com/download.html
8、双击执行
9、无脑下一步,中途更改安装目录
10、配置环境变量,与Erlang的配置类似
11、激活Rabbitmq Mnagement plugins,可以通过可视化界面管理RabbitMQ。
#下载插件
rabbitmq-plugins.bat enable rabbitmq_management
#查看是否安装成功
rabbitmqctl.bat status
#如果查看报错,可能是后台服务器没有开启,开启后台服务器再重试
net start RabbitMQ
#查看RabbitMQ已有用户以及用户对应的角色信息
rabbitmqctl.bat list_users
#RabbitMQ的默认用户名和密码是guest,只能在本机访问,新增一个用户,并赋予超级管理员角色
rabbitmqctl add_user 用户名 密码
#设置用户角色
rabbitmqctl set_user_tags 用户名 administrator
12、访问网址:localhost:15672,并使用刚才创建的账号登录
Linux
1、Erlang下载:https://packagecloud.io/rabbitmq/erlang
2、RabbitMQ下载:https://www.rabbitmq.com/download.html
3、通过xftp上传到Linux服务器上
4、按顺序执行命令
rpm -ivh erlang-23.2.7-2.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.10.2-1.el8.noarch.rpm
#启动RabbitMQ服务
systemctl start rabbitmq-server.service
#查看状态
systemctl status rabbitmq-server.service
5、安装可视化插件,与Windows的安装方式类似
#下载插件
rabbitmq-plugins enable rabbitmq_management
#查看是否安装成功
rabbitmqctl status
#安装完成后重启服务
systemctl restart rabbitmq-server
#查看已有用户以及用户对应的角色信息
rabbitmqctl list_users
#RabbitMQ的默认用户名和密码是guest,只能在本机访问,新增一个用户,并赋予超级管理员角色
rabbitmqctl add_user 用户名 密码
#设置用户角色
rabbitmqctl set_user_tags 用户名 administrator
6、输入公网ip地址:15672,出现rabbitmq的登录页面,登录后如下
小结
RabbitMQ的默认端口是5672,而可视化插件的访问端口则是15672,另外RabbitMQ有一个默认的用户,账号和密码都是guest,只能在本机访问。以下是一些用户相关的命令,以Linux为例,Windows下的命令与Linux的命令差别不大,以rabbitmqctl.bat开头
#添加用户
rabbitmqctl add_user username password
#设置用户角色
rabbitmqctl set_user_tags username administrator
#为用户添加root权限
rabbitmqctl set_permissions -p / 用户名 ".*"".*"".*"#修改密码
rabbitmqctl change_password 用户名 新密码
#删除用户
rabbitmqctl delete_user 用户名
#查看已有用户以及用户对应的角色信息
rabbitmqctl list_users
# 用户权限# administrator:超级管理员,可以查看所有信息,并且能够对它们进行操作# monitoring:监控者,可以查看所有的信息# policymaker:策略制定者,包含managment所有权限,可以查看删除自己的节点信息# managment:普通管理员,只能查看自己的节点的信息# none:无法登录可视化工具
概念
在编写代码之前,我们需要对RabbitMQ的一些基本概念进行了解。
AMQP协议
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),用Erlang语言编写的。
工作流程
RabbitMQ中消息的发送机制:生产者发送消息到交换机,然后交换机将消息分发给队列,消费者通过队列拿到消息。在RabbitMQ中,交换机是必定存在的,哪怕为空,也会有一个默认的交换机。
交换机类型
某些文章会把交换机类型称为交换机模式,有时候这会让初学者感到困惑,混淆队列和交换机模式的概念,所以这里把交换机模式称为类型,以便区分。
- FanOut(扇形交换机):不管消息的路由键是什么,它直接将消息发送到与该交换机绑定的所有队列中,类似于广播机制。
- Direct(直连交换机):根据消息的路由键routingkey,将消息以完全匹配的方式发送到指定的队列中。
- Topic(主题交换机):根据消息的路由键routingkey,将消息以模糊匹配的方式发送到指定的队列中。
- Headers(头部交换机):不常用
- Dead Letter(死信):类型随意,它的作用只是存放死信
- 默认交换机:,如果交换机名称写空字符串,那么就是默认交换机,类型是Direct,路由键是队列的名称
队列模式
RabbitMQ有7种队列模式,但是常用的只有前五种,RPC模式和Publisher Confirms不常用,仅作为拓展。
- P:生产者
- C:消费者
- X:交换机
- 红色:队列
简单队列
一个队列只有一个消费者,生产者将消息发送到队列,消费者从队列中取出消息。
案例
因为在实际的开发中,不会使用原生的Java方式使用RabbitMQ,并且由于RabbitMQ和Spring是同一家母公司,所以Spring能够很好的整合RabbitMQ,故这里就使用Spring Boot整合RabbitMQ作为示例。
1、使用到的依赖,或者使用Spring Initializr创建Spring Boot项目的时候选择依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、配置文件
#需要注意的是,如果RabbitMQ安装在本地,则默认配置就是本地的,可以不用写配置#如果是安装在远程服务器上,则需要写配置,并且远程服务器需要开启安全组或防火墙的端口号spring:rabbitmq:username: admin
password:123456host: 43.142.61.105
port:5672#虚拟host 可以不设置,使用server默认hostvirtual-host: /
3、创建一个SpringBoot服务,使用的依赖和配置如上,首先定义一个生产者,先编写一个配置类。
packagecom.th.producer.config;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{//创建一个queue的bean@BeanpublicQueuequeue(){returnnewQueue("simpleQueue");}}
4、发送一条消息到队列中
packagecom.th.producer;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importjavax.annotation.Resource;@SpringBootTestclassProducerApplicationTests{@ResourceprivateRabbitTemplate rabbitTemplate;@AutowiredprivateQueue queue;//Queue和这个默认交换机之间的Binding Key是Queue的名字,所以要注入Queue@TestvoidcontextLoads(){String message ="Hello Simple RabbitMQ";//要发送到队列中的消息
rabbitTemplate.convertAndSend(queue.getName(),message);}}
5、查看是否发送成功
6、接下来创建一个消费者,将队列中的信息给消费掉,同样的,创建一个SpringBoot服务,使用的依赖和配置如上
packagecom.th.rabbitmq03consumer.service;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Service;//监听队列simpleQueue@Service@RabbitListener(queues ={"simpleQueue"})publicclassConsumerService{@RabbitHandlerpublicvoidgetMessage(String message){System.out.println("接受到的消息是:"+message);}}
7、启动SpringBoot的启动类,查看控制台打印的信息
8、查看web管理页面
9、这就是简单模式的基本模型,在代码中我们并没有注入交换机,但是消息仍旧可以发送,这是因为RabbitMQ提供了一个默认的交换机,交换机的名字是空字符串,交换机的类型是Direct,绑定到所有的Queue上,每一个Queue和这个默认交换机之间的Binding Key是Queue的名字,所以这里不是没有使用交换机,而是使用了默认的交换机。
工作模式
多个消费者监听同一个队列,但多个消费者中只会有一个会成功地消费消息。这里有两种消费机制:
- 轮询模式:平均分发模式,不会因为服务器性能低就有所偏袒
- 公平分发:能者多劳模式,性能高的服务器,处理的消息就多
案例
这里需要创建三个微服务,一个生产者,两个消费者,配置同简单模式,接下来的示例配置也是与简单模式一样,之后就不再阐述。另外这里就不再使用默认的交换机了,而是使用fanout交换机进行讲解。当然,其他的交换机同样可以完成工作模式,可以自行尝试。
生产者
1、配置类
packagecom.th.producer.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{//交换机publicfinalstaticString fanoutExchange ="fanout_work_exchange";//创建交换机@BeanpublicFanoutExchangefanoutExchange(){/**
* 三个参数
* param1:交换机的名称
* param2:是否持久化,默认是false,表示暂存队列,只有当前链接有效;
* 如果为true,表示持久化队列,会被存储在磁盘上
* param3:是否自动删除,默认是false。当没有生产者或消费者使用这个交换机时将会被删除
* 一般设置一下队列的持久化就好
*/returnnewFanoutExchange(fanoutExchange,true,false);}//创建队列@BeanpublicQueueworkQueue(){/**
* param1:队列名称
* param2:是否持久化,默认是false
* 持久化队列:会被存储在磁盘上,当消息代理重启时,仍然存在
* 暂存队列:当前连接有效
*/returnnewQueue("workQueue",true);}//绑定交换机和队列@BeanpublicBindingbindingWorkQueue(){returnBindingBuilder.bind(workQueue()).to(fanoutExchange());}}
2、发送20条消息到队列中,请务必先运行生产者,创建交换机和队列,否则接下来运行消费者会因为找不到队列而报错
packagecom.th.producer;importcom.th.producer.config.RabbitMQConfig;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.boot.test.context.SpringBootTest;importjavax.annotation.Resource;@SpringBootTestpublicclassWorkTest{@ResourceprivateRabbitTemplate rabbitTemplate;@Testvoidsend(){/**
* 1、第一个参数是交换机,与配置文件中的交换机名称要一样,因为我们要通过这个交换机发送消息到队列中
* 2、第二个参数是routingKey,fanout交换机是一种广播机制,会把消息发送到所有与它绑定的队列中
* 没有routingKey的概念,所以这里用空字符串代替
* 3、i就是发送到队列中的消息,这里循环20次,发送20条消息到队列中
*/for(int i =0; i <20; i++){
rabbitTemplate.convertAndSend(RabbitMQConfig.fanoutExchange,"",i);}}}
3、查看信息
消费者C1
packagecom.th.consumer01.service;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Service;@Service@RabbitListener(queues ="workQueue")publicclass C1 {/**
* 注意,生产者生产的是Integer类型,所以这里的参数应该接受的是Integer类型
*/@RabbitHandlerpublicvoidc1(Integer message)throwsInterruptedException{//这里休眠100毫秒Thread.sleep(100);System.out.println(message);}}
消费者C2
packagecom.th.consumer02.service;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Service;@Service@RabbitListener(queues ="workQueue")publicclass C2 {/**
* 注意,生产者生产的是Integer类型,所以这里的参数应该接受的是Integer类型
*/@RabbitHandlerpublicvoidc2(Integer message)throwsInterruptedException{//这里休眠一秒,性能会比C1差Thread.sleep(1000);System.out.println(message);}}
测试
C1的控制台
C2的控制台
能者多劳
在轮询模式中,我们发现C2处理消息的速度比C1慢上很多,这显然是对资源的浪费,完全可以让C1继续处理剩余的消息,这就是能者多劳模式,使用它很简单,只需要修改C1和C2的配置文件即可,如下:
spring:rabbitmq:username: admin
password: jiege8058*
host: 43.142.61.105
port:5672virtual-host: /
listener:simple:prefetch:2#这里可以往大了写,可以根据自己的内存进行调整,写1会有坑
结果如下
小结
- 工作模式默认的是轮询模式,可以修改配置文件将其改为公平分发模式
- 如果服务器性能有高低,可采用公平分发模式,让资源 得到充分利用
订阅模式
一个交换机绑定多个消息队列,每个消息队列有一个消费者监听,生产者发送的消息可以被每一个消费者接收。
案例
前面两个例子中,我们必须先运行生产者,创建交换机和队列,然后才能运行消费者,否则消费者就会因为找不到队列而报错。这是因为我们使用的是代码的方式创建的交换机和队列,创建交换机和队列的配置类在生产者中,所以只有先运行生产者才能创建交换机和队列。那么我们是否可以把配置类在消费者中创建呢?接下来让我们试试看吧。
消费者C1
配置类
packagecom.th.consumer01.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{/**
* 因为发布与订阅模式最少需要两条队列,所以这里需要创建两条队列,并且将它们绑定同一个交换机
*///交换机publicfinalstaticString fanoutExchange ="fanout_publish_exchange";//队列publicfinalstaticString publishQueue1 ="publishQueue1";publicfinalstaticString publishQueue2 ="publishQueue2";//创建交换机@BeanpublicFanoutExchangefanoutExchange(){/**
* 三个参数
* param1:交换机的名称
* param2:是否持久化,默认是false,表示暂存队列,只有当前链接有效;
* 如果为true,表示持久化队列,会被存储在磁盘上
* param3:是否自动删除,默认是false。当没有生产者或消费者使用这个交换机时将会被删除
* 一般设置一下队列的持久化就好
*/returnnewFanoutExchange(fanoutExchange,true,false);}//创建队列1@BeanpublicQueuepublishQueue1(){/**
* param1:队列名称
* param2:是否持久化,默认是false
* 持久化队列:会被存储在磁盘上,当消息代理重启时,仍然存在
* 暂存队列:当前连接有效
*/returnnewQueue(publishQueue1,true);}//创建队列2@BeanpublicQueuepublishQueue2(){/**
* param1:队列名称
* param2:是否持久化,默认是false
* 持久化队列:会被存储在磁盘上,当消息代理重启时,仍然存在
* 暂存队列:当前连接有效
*/returnnewQueue(publishQueue2,true);}//绑定交换机和队列@BeanpublicBindingbindingPublishQueue1(){returnBindingBuilder.bind(publishQueue1()).to(fanoutExchange());}//绑定交换机和队列@BeanpublicBindingbindingPublishQueue2(){returnBindingBuilder.bind(publishQueue2()).to(fanoutExchange());}}
业务层
packagecom.th.consumer01.service;importcom.th.consumer01.config.RabbitMQConfig;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Service;//监听队列1@Service@RabbitListener(queues =RabbitMQConfig.publishQueue1)publicclass C1 {@RabbitHandlerpublicvoidc1(String message){System.out.println("C1:"+message);}}
消费者C2
配置类同上,业务层如下
packagecom.th.consumer02.service;importcom.th.consumer02.config.RabbitMQConfig;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Service;//监听队列2@Service@RabbitListener(queues =RabbitMQConfig.publishQueue2)publicclass C2 {@RabbitHandlerpublicvoidc2(String message){System.out.println("C2:"+message);}}
生产者
packagecom.th.producer;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.boot.test.context.SpringBootTest;importjavax.annotation.Resource;@SpringBootTestpublicclassPublishTest{@ResourceprivateRabbitTemplate rabbitTemplate;//交换机publicfinalstaticString fanoutExchange ="fanout_publish_exchange";@Testvoidsend(){String message ="Hello publish RabbitMQ";
rabbitTemplate.convertAndSend(fanoutExchange,"",message);}}
测试
1、启动两个消费者
2、启动生产者,发送一条消息,查看两个消费者的控制台
小结
通过这个案例可以发现配置类创建在消费者中同样是可行的,但是这样一来,因为生产者中没有配置类,那么先启动生产者的时候,会因为没有交换机和队列出现异常。那么是否可以在生产者和消费者中都编写配置类呢?这是可以的,如果在两者中配置了,那么之后不管启动哪一个,都不会出现异常,当然这样会产生代码冗余,可以按照实际情况把配置类放在最先启动微服务中。同样的,可以在可视化界面直接就把交换机和队列创建好,并且将它们绑定起来,不过作为程序员还是需要学会使用代码来创建和绑定,并且代码的灵活度更高。
路由模式
一个交换机绑定多个消息队列,每个消息队列都有自己唯一的key,每一个消息队列有一个消费者监听。
多个绑定,使用相同的路由绑定多个队列是完全合法的,在这种情况下,类似于fanout交换机,如果需要将消息发送到全部的队列中,请务必使用fanout交换机,因为这是显而易见的,中间隔了一层,判断路由键是需要消耗一些性能的。
案例
这里使用direct交换机,因为它是根据路由键进行匹配的,前面使用的默认交换机和fanout交换机都忽略了路由键的作用。
配置类
packagecom.th.consumer01.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{/**
* 队列1绑定两个路由键
* 队列2和队列3绑定相同的路由键
*/publicfinalstaticString directExchange ="direct_routing_exchange";publicfinalstaticString routingQueue1 ="routingQueue1";publicfinalstaticString routingQueue2 ="routingQueue2";publicfinalstaticString routingQueue3 ="routingQueue3";publicfinalstaticString routing1 ="routing1";publicfinalstaticString routing2 ="routing2";publicfinalstaticString routing3 ="routing3";//创建交换机@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange(directExchange,true,false);}//创建队列1@BeanpublicQueueroutingQueue1(){returnnewQueue(routingQueue1,true);}//创建队列2@BeanpublicQueueroutingQueue2(){returnnewQueue(routingQueue2,true);}//创建队列3@BeanpublicQueueroutingQueue3(){returnnewQueue(routingQueue3,true);}//绑定交换机和队列1,并指定队列的路由键1@BeanpublicBindingbindingRoutingQueue1(){returnBindingBuilder.bind(routingQueue1()).to(directExchange()).with(routing1);}//绑定交换机和队列1,并指定队列的路由键3,即队列1绑定了两个路由键@BeanpublicBindingbindingRoutingQueue1_3(){returnBindingBuilder.bind(routingQueue1()).to(directExchange()).with(routing3);}//绑定交换机和队列2,并指定队列的路由键2@BeanpublicBindingbindingRoutingQueue2(){returnBindingBuilder.bind(routingQueue2()).to(directExchange()).with(routing2);}//绑定交换机和队列3,并指定队列的路由键2@BeanpublicBindingbindingRoutingQueue3(){returnBindingBuilder.bind(routingQueue3()).to(directExchange()).with(routing2);}}
消费者C1
packagecom.th.consumer01.service;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Service;//监听队列1@Service@RabbitListener(queues ="routingQueue1")publicclass C1 {@RabbitHandlerpublicvoidc1(String message){System.out.println("C1:"+message);}}
消费者C2
packagecom.th.consumer02.service;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Service;//监听队列2@Service@RabbitListener(queues ="routingQueue2")publicclass C2 {@RabbitHandlerpublicvoidc2(String message){System.out.println("C2:"+message);}}
消费者C3
packagecom.th.consumer03.service;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Service;//监听队列3@Service@RabbitListener(queues ="routingQueue3")publicclass C3 {@RabbitHandlerpublicvoidc3(String message){System.out.println("C3:"+message);}}
生产者
packagecom.th.producer;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.boot.test.context.SpringBootTest;importjavax.annotation.Resource;@SpringBootTestpublicclassRoutingTest{@ResourceprivateRabbitTemplate rabbitTemplate;//交换机publicfinalstaticString fanoutExchange ="direct_routing_exchange";@Testvoidsend1(){String message ="Hello routing1 RabbitMQ";
rabbitTemplate.convertAndSend(fanoutExchange,"routing1",message);}@Testvoidsend2(){String message ="Hello routing2 RabbitMQ";
rabbitTemplate.convertAndSend(fanoutExchange,"routing2",message);}@Testvoidsend3(){String message ="Hello routing3 RabbitMQ";
rabbitTemplate.convertAndSend(fanoutExchange,"routing3",message);}}
测试
1、启动三个消费者,首先运行send1,将消息发送到路由键为routing1的队列中,查看控制台,只有消费者C1拿到了消息
2、运行send2,消费者C1和C2拿到了消息
3、运行send3,消费者C1拿到了消息
小结
- 直连(direct)交换机是根据路由键发送消息到队列中的
- 一个队列可以有多个路由键
- 多个队列可绑定同一个路由键
主题模式
主题模式又叫通配符模式,与路由模式类似,但主题模式可以通过通配符匹配到多个队列,是一种模糊匹配。
案例
虽然直接交换机改进了我们的系统,但它仍然有局限性,不能基于多个标准进行路由,主题交换机可以很好的解决这个问题。
发送到主题交换机的消息不能具有任意routing_key,它必须是单词列表,由点分隔。这些单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由关键示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。
路由密钥中可以有任意数量的单词,最大限制为 255 个字节。绑定键也必须采用相同的形式。
主题交换机背后的逻辑类似于直接交换机,使用特定路由键发送的消息将被传递到使用匹配绑定键绑定的所有队列。但是,绑定键有两种重要的特殊情况:
*
代表的是最少要有一级单词#
代表的是0级或多级单词
配置类
packagecom.th.consumer01.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{publicfinalstaticString topicExchange ="topic_top_exchange";publicfinalstaticString topQueue1 ="topQueue1";publicfinalstaticString topQueue2 ="topQueue2";publicfinalstaticString topQueue3 ="topQueue3";/**
* #.top1.#
* 表示top1前面后面可以有多个单词
*/publicfinalstaticString routing1 ="#.top1.#";/**
* *.top2.#
* 表示top2前面有且只有一个单词,后面则是任意个单词
*/publicfinalstaticString routing2 ="*.top2.#";/**
* *.top3.*
* 表示top3前面和后面有且只有一个单词
*/publicfinalstaticString routing3 ="*.top3.*";//创建交换机@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange(topicExchange,true,false);}//创建队列1@BeanpublicQueuetopQueue1(){returnnewQueue(topQueue1,true);}//创建队列2@BeanpublicQueuetopQueue2(){returnnewQueue(topQueue2,true);}//创建队列3@BeanpublicQueuetopQueue3(){returnnewQueue(topQueue3,true);}//绑定交换机和队列1,并指定队列的路由键1@BeanpublicBindingbindingTopQueue1(){returnBindingBuilder.bind(topQueue1()).to(topicExchange()).with(routing1);}//绑定交换机和队列2,并指定队列的路由键2@BeanpublicBindingbindingTopQueue2(){returnBindingBuilder.bind(topQueue2()).to(topicExchange()).with(routing2);}//绑定交换机和队列3,并指定队列的路由键3@BeanpublicBindingbindingTopQueue3(){returnBindingBuilder.bind(topQueue3()).to(topicExchange()).with(routing3);}}
消费者C1
packagecom.th.consumer01.service;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Service;//监听队列1@Service@RabbitListener(queues ="topQueue1")publicclass C1 {@RabbitHandlerpublicvoidc1(String message){System.out.println("C1:"+message);}}
消费者C2
packagecom.th.consumer02.service;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Service;//监听队列2@Service@RabbitListener(queues ="topQueue2")publicclass C2 {@RabbitHandlerpublicvoidc2(String message){System.out.println("C2:"+message);}}
消费者C3
packagecom.th.consumer03.service;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Service;//监听队列3@Service@RabbitListener(queues ="topQueue3")publicclass C3 {@RabbitHandlerpublicvoidc3(String message){System.out.println("C3:"+message);}}
生产者
packagecom.th.producer;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.boot.test.context.SpringBootTest;importjavax.annotation.Resource;@SpringBootTestpublicclassTopicTest{@ResourceprivateRabbitTemplate rabbitTemplate;//交换机publicfinalstaticString fanoutExchange ="topic_top_exchange";/**
* top1.top2.top3
* 匹配top1和top2,top3前后只能有一个单词,所以不会匹配到
*/@Testvoidsend1(){String message ="Hello top1 RabbitMQ";
rabbitTemplate.convertAndSend(fanoutExchange,"top1.top2.top3",message);}/**
* top2.top3.top1
* 匹配top3和top1,因为top2必须要前面有一个单词
*/@Testvoidsend2(){String message ="Hello top2 RabbitMQ";
rabbitTemplate.convertAndSend(fanoutExchange,"top2.top3.top1",message);}/**
* top1.top3.top2
* 匹配top3和top1,因为top2前面有两个单词
*/@Testvoidsend3(){String message ="Hello top3 RabbitMQ";
rabbitTemplate.convertAndSend(fanoutExchange,"top1.top3.top2",message);}/**
* top3.top1.top2
* 匹配top1,因为top2前面有两个单词,top3前面没单词,且后面有两个单词
*/@Testvoidsend4(){String message ="Hello top4 RabbitMQ";
rabbitTemplate.convertAndSend(fanoutExchange,"top3.top1.top2",message);}}
测试
1、运行send1,C1和C2收到消息
2、运行send2,C1和C3收到消息
3、运行send3,C1和C3收到消息
4、运行send3,C1收到消息
小结
- 使用主题交换机可将路由键定义为通配符的形式
- 生产者发送消息的时候,路由键以
.
分割单词,去匹配符合要求的路由键
RPC模式
生存时间
RabbitMQ 允许您为消息和队列设置 TTL(生存时间)。消息 TTL 可以应用于单个队列、一组队列或逐条消息应用。例如在一段时间未支付的订单,会自动消失,就可以设置消息的生存时间。
TTL队列
将队列设置为TTL,这样的话,整个队列中的消息都会有一个过期的时间。
1、配置类
packagecom.th.producer.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassRabbitConfig{publicfinalstaticString TTL_FANOUT_EXCHANGE ="ttl_fanout_exchange";@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange(TTL_FANOUT_EXCHANGE,true,false);}@BeanpublicQueuefanouttlQueue(){Map<String,Object> map =newHashMap<>();
map.put("x-message-ttl",6000);//这里必定是一个int类型的参数returnnewQueue("ttl_fanout_queue",true,false,false, map);}@BeanpublicBindingttlBinding(){returnBindingBuilder.bind(fanouttlQueue()).to(fanoutExchange());}}
2、生产者
packagecom.th.producer;importcom.th.producer.config.RabbitConfig;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.boot.test.context.SpringBootTest;importjavax.annotation.Resource;@SpringBootTestpublicclassTTLTest{@ResourceprivateRabbitTemplate rabbitTemplate;@Testvoidttl(){String message ="Hello ttl RabbitMQ";
rabbitTemplate.convertAndSend(RabbitConfig.TTL_FANOUT_EXCHANGE,"", message);}}
3、运行生产者往消息队列发送一条消息,6秒后会消息会自动删除
TTL消息
让消息在一定时间后过期,这种方法是细粒度的,它可以在一条队列中,给不同的时候设置不同的过期时间,更加灵活。
1、配置类
packagecom.th.producer.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassRabbitConfig{publicfinalstaticString TTL_FANOUT_EXCHANGE ="ttl_message_fanout_exchange";@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange(TTL_FANOUT_EXCHANGE,true,false);}@BeanpublicQueuefanouttlMessageQueue(){returnnewQueue("ttl_message_queue",true);}@BeanpublicBindingttlBinding(){returnBindingBuilder.bind(fanouttlMessageQueue()).to(fanoutExchange());}}
2、生产者
packagecom.th.producer;importcom.th.producer.config.RabbitConfig;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.boot.test.context.SpringBootTest;importjavax.annotation.Resource;@SpringBootTestpublicclassTTLTest{@ResourceprivateRabbitTemplate rabbitTemplate;@Testvoidttl(){String message ="Hello ttl message RabbitMQ";MessagePostProcessor messagePostProcessor=newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
message.getMessageProperties().setExpiration("6000");//这里是字符串
message.getMessageProperties().setContentEncoding("UTF-8");//设定一下编码格式return message;}};
rabbitTemplate.convertAndSend(RabbitConfig.TTL_FANOUT_EXCHANGE,"", message,messagePostProcessor);}}
3、运行之后,队列中的消息会在6秒后被移除。
死信队列
死信,顾名思义就是无法被消费的消息,比如上面过期的消息就是死信,不过这些消息全部都被抛弃了,如果因为某些需求,需要使用到这些死信,却因为这些死信已经被抛弃无法找回,就会出现很大的麻烦。这个时候可以把死信放到死信队列中,在需要使用到这些死信的时候,也可以轻松找回。
1、配置类
packagecom.th.deadqueue.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassRabbitConfig{/**
* 关于路由
* 如果业务交换机是fanout模式,则绑定死信交换机的时候,无需额外写上路由参数
* 如果业务交换机是direct模式,则绑定死信交换机的时候,必需额外写上路由参数
* 是否写上路由参数,与死信交换机的类型无关
* 不管死信交换机的类型是fanout还是direct,需不需要添加路由参数只与业务交换机类型有关
* 如果业务交换机和死信交换机都是direct类型,请务必让他们绑定相同的路由键
* 最好的情况是死信交换机和业务交换机的类型一致,统一的类型更便于区分
*/publicfinalstaticString DEAD_DIRECT_EXCHANGE ="dead_direct_exchange";publicfinalstaticString DIRECT_EXCHANGE ="direct_exchange";publicfinalstaticString DEAD_QUEUE ="dead_queue";publicfinalstaticString DIRECT_QUEUE ="direct_queue";publicfinalstaticString DIRECT_ROUTING_KEY ="direct_key";//死信交换机@BeanpublicDirectExchangedeadExchange(){returnnewDirectExchange(DEAD_DIRECT_EXCHANGE,true,false);}//直连交换机@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange(DIRECT_EXCHANGE,true,false);}//死信队列@BeanpublicQueuedeadQueue(){returnnewQueue(DEAD_QUEUE,true);}//直连队列@BeanpublicQueuedirectQueue(){Map<String,Object> args =newHashMap<>();
args.put("x-message-ttl",6000);
args.put("x-max-length",5);//限制队列中最大只能存放五条消息
args.put("x-dead-letter-exchange", DEAD_DIRECT_EXCHANGE);//绑定死信交换机//路由参数,如果业务交换机类型是fanout,则可以不用写
args.put("Dead letter routing key", DIRECT_ROUTING_KEY);returnnewQueue(DIRECT_QUEUE,true,false,false, args);}//绑定死信交换机和死信队列@BeanpublicBindingdeadBinding(){returnBindingBuilder.bind(deadQueue()).to(deadExchange()).with(DIRECT_ROUTING_KEY);}//绑定直连交换机和直连队列@BeanpublicBindingdirectBinding(){returnBindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTING_KEY);}}
2、生产者
@ResourceprivateRabbitTemplate rabbitTemplate;/**
* 发送10条消息到队列中
*/@TestvoidcontextLoads(){for(int i =0; i <10; i++){
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE,RabbitConfig.DIRECT_ROUTING_KEY, i);}}
3、启动测试类,会发现十条消息中5条在队列中,5条在死信队列中,6秒过后,10条消息全部到了死信队列中
小结
- TTL队列是让队列中的消息全部都有一个过期时间
- TTL消息是让消息本身存在一个过期时间,而不会去管其他的消息
- 只有TTL队列中过期的消息才能放到死信队列中,TTL消息是不能的
- 同时使用TTL队列和TTL消息,则以最小的过期时间为准
- 已经创建好的交换机和队列,在之后又进行结构的更改,会出现报错的情况,所以在确定好队列和消息之间的关系后,最好不要随意的修改结构。如果必须要修改,可以将队列和交换机删了,重新创建。但是在实际业务中,如果出现这种情况,不要删除交换机和队列,因为真实的线上环境,队列中还存在消息,如果删除会影响业务,最好的办法是重新创建交换机和队列,将消息引入新的队列中。
版权归原作者 千乐居士 所有, 如有侵权,请联系我们删除。