文章目录
RabbitMQ是一个被广泛使用的开源消息队列。它是轻量级且易于部署的,它能支持多种消息协议。RabbitMQ可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。
基本概念
RabbitMQ的内部结构图
Message
- 消息,由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等;body是真正需要发送的数据内容;
Publisher
- 消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange
- 消息交换机,作用是接收来自生产者的消息,并根据路由键转发消息到所绑定的队列。生产者发送上的消息,就是先通过 Exchnage 按照绑定 (binding) 规则转发到队列的。
- 交换机类型 (Exchange Type) 有四种:fanout、direct、topic,headers,其中 headers 并不常用。
Binding
- 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表,Binding 操作一般用于 RabbitMQ 的路由工作模式和主题工作模式。
Queue
- 消息队列,内部用于存储消息的对象,是真正用来存储消息的结构。它是消息的容器,也是消息的终点。在生产端,生产者的消息最终发送到指定队列,而消费者也是通过订阅某个队列,达到获取消息的目的。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Connection
- 网络连接,是 RabbitMQ 内部对象之一,用于管理每个到 RabbitMQ 的 TCP 网络连接。
Channel
- 信道,多路复用连接中的一条独立的双向数据流通道,也是我们与 RabbitMQ 打交道的最重要的一个接口,我们大部分的业务操作是在 Channel 这个接口中完成的,包括定义 Queue、定义 Exchange、绑定 Queue 与 Exchange、发布消息等。
- 信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
Consumer
- 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host
- 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
- 一个 VirtualHost 下面有一组不同 Exchnage 与 Queue,不同的 Virtual host 的 Exchnage 与 Queue 之间互相不影响。应用隔离与权限划分,Virtual host 是 RabbitMQ 中最小颗粒的权限单位划分。
- 如果要类比的话,我们可以把 Virtual host 比作 MySQL 中的数据库,通常我们在使用 MySQL 时,会为不同的项目指定不同的数据库,同样的,在使用 RabbitMQ 时,我们可以为不同的应用程序指定不同的 Virtual host。
Broker
- 表示消息队列服务器实体。
RabbitMQ 的常见模式
- 简单 (simple) 模式
- 工作 (work) 模式
- 发布 / 订阅 (pub/sub) 模式
- 路由 (routing) 模式
- 主题 (Topic) 模式
Docker部署RabbitMQ
镜像下载
docker pull rabbitmq
启动容器
docker run -d --name rabbitmq --restart always --hostname rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq
- -p 指定宿主机和容器端口映射(5672:服务应用端口,15672:管理控制台端口)
安装管理控制台插件
# 进入容器内部dockerexec -it rabbitmq /bin/bash
## 安装插件
rabbitmq-plugins enable rabbitmq_management
启动验证
- 访问RabbitMQ控制台:
http://{host}:15672/
,初始默认用户名/密码:guest/guest - 至此,RabbitMQ的安装和配置完成。
Spring项目集成RabbitMQ
添加AMQP相关依赖
- 在pom.xml文件中添加AMQP相关依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
添加RabbitMQ的相关配置
- 在application.yml添加RabbitMQ的相关配置
spring:rabbitmq:host: ip地址 # rabbitmq的连接地址port:5672# rabbitmq的连接端口号username: guest# rabbitmq的用户名password: guest# rabbitmq的密码
动态创建队列、交换机初始化器
- 创建RabbitMQ的Java配置,主要用于配置交换机、队列和绑定关系;
@Configuration@Slf4jpublicclassRabbitConfig{/**
* 使用json序列化机制,进行消息转换
*/@BeanpublicMessageConverterjackson2MessageConverter(){returnnewJackson2JsonMessageConverter();}/**
* 动态创建队列、交换机初始化器
*/@Bean@ConditionalOnMissingBeanpublicRabbitModuleInitializerrabbitModuleInitializer(AmqpAdmin amqpAdmin,RabbitModuleProperties rabbitModuleProperties){returnnewRabbitModuleInitializer(amqpAdmin, rabbitModuleProperties);}}
/**
* RabbitMQ 交换机类型枚举
*/publicenumRabbitExchangeTypeEnum{/**
* 直连交换机
* <p>
* 根据routing-key精准匹配队列(最常使用)
*/
DIRECT,/**
* 主题交换机
* <p>
* 根据routing-key模糊匹配队列,*匹配任意一个字符,#匹配0个或多个字符
*/
TOPIC,/**
* 扇形交换机
* <p>
* 直接分发给所有绑定的队列,忽略routing-key,用于广播消息
*/
FANOUT,/**
* 头交换机
* <p>
* 类似直连交换机,不同于直连交换机的路由规则建立在头属性上而不是routing-key(使用较少)
*/
HEADERS;}
@ConfigurationProperties(prefix ="spring.rabbitmq")@DatapublicclassRabbitModuleProperties{privateList<RabbitModuleInfo> modules;}
实现SmartInitializingSingleton的接口后,当所有单例 bean 都初始化完成以后, Spring的IOC容器会回调该接口的 afterSingletonsInstantiated()方法。主要应用场合就是在所有单例 bean 创建完成之后,可以在该回调中做一些事情。
/**
* RabbitMQ队列初始化器
*/@Slf4jpublicclassRabbitModuleInitializerimplementsSmartInitializingSingleton{privateAmqpAdmin amqpAdmin;privateRabbitModuleProperties rabbitModuleProperties;publicRabbitModuleInitializer(AmqpAdmin amqpAdmin,RabbitModuleProperties rabbitModuleProperties){this.amqpAdmin = amqpAdmin;this.rabbitModuleProperties = rabbitModuleProperties;}@OverridepublicvoidafterSingletonsInstantiated(){
log.info("RabbitMQ 根据配置动态创建和绑定队列、交换机");declareRabbitModule();}/**
* RabbitMQ 根据配置动态创建和绑定队列、交换机
*/privatevoiddeclareRabbitModule(){List<RabbitModuleInfo> rabbitModuleInfos = rabbitModuleProperties.getModules();if(CollectionUtil.isEmpty(rabbitModuleInfos)){return;}for(RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos){configParamValidate(rabbitModuleInfo);// 队列Queue queue =convertQueue(rabbitModuleInfo.getQueue());// 交换机Exchange exchange =convertExchange(rabbitModuleInfo.getExchange());// 绑定关系String routingKey = rabbitModuleInfo.getRoutingKey();String queueName = rabbitModuleInfo.getQueue().getName();String exchangeName = rabbitModuleInfo.getExchange().getName();Binding binding =newBinding(queueName,Binding.DestinationType.QUEUE, exchangeName, routingKey,null);// 创建队列
amqpAdmin.declareQueue(queue);// 创建交换机
amqpAdmin.declareExchange(exchange);// 队列 绑定 交换机
amqpAdmin.declareBinding(binding);}}/**
* RabbitMQ动态配置参数校验
*
* @param rabbitModuleInfo
*/publicvoidconfigParamValidate(RabbitModuleInfo rabbitModuleInfo){String routingKey = rabbitModuleInfo.getRoutingKey();Assert.isTrue(StrUtil.isNotBlank(routingKey),"RoutingKey 未配置");Assert.isTrue(rabbitModuleInfo.getExchange()!=null,"routingKey:{}未配置exchange", routingKey);Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()),"routingKey:{}未配置exchange的name属性", routingKey);Assert.isTrue(rabbitModuleInfo.getQueue()!=null,"routingKey:{}未配置queue", routingKey);Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getQueue().getName()),"routingKey:{}未配置exchange的name属性", routingKey);}/**
* 转换生成RabbitMQ队列
*
* @param queue
* @return
*/publicQueueconvertQueue(RabbitModuleInfo.Queue queue){Map<String,Object> arguments = queue.getArguments();// 转换ttl的类型为longif(arguments !=null&& arguments.containsKey("x-message-ttl")){
arguments.put("x-message-ttl",Convert.toLong(arguments.get("x-message-ttl")));}// 是否需要绑定死信队列String deadLetterExchange = queue.getDeadLetterExchange();String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();if(StrUtil.isNotBlank(deadLetterExchange)&&StrUtil.isNotBlank(deadLetterRoutingKey)){if(arguments ==null){
arguments =newHashMap<>(4);}
arguments.put("x-dead-letter-exchange", deadLetterExchange);
arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);}returnnewQueue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);}/**
* 转换生成RabbitMQ交换机
*
* @param exchangeInfo
* @return
*/publicExchangeconvertExchange(RabbitModuleInfo.Exchange exchangeInfo){AbstractExchange exchange =null;RabbitExchangeTypeEnum exchangeType = exchangeInfo.getType();String exchangeName = exchangeInfo.getName();boolean isDurable = exchangeInfo.isDurable();boolean isAutoDelete = exchangeInfo.isAutoDelete();Map<String,Object> arguments = exchangeInfo.getArguments();switch(exchangeType){case DIRECT:// 直连交换机
exchange =newDirectExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case TOPIC:// 主题交换机
exchange =newTopicExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case FANOUT://扇形交换机
exchange =newFanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case HEADERS:// 头交换机
exchange =newHeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);break;}return exchange;}}
/**
* RabbitMQ 队列和交换机机绑定关系实体对象
*/@DatapublicclassRabbitModuleInfo{/**
* 路由Key
*/privateString routingKey;/**
* 队列信息
*/privateQueue queue;/**
* 交换机信息
*/privateExchange exchange;/**
* 交换机信息类
*/@DatapublicstaticclassExchange{/**
* 交换机类型
*/privateRabbitExchangeTypeEnum type =RabbitExchangeTypeEnum.DIRECT;// 默认直连交换机/**
* 交换机名称
*/privateString name;/**
* 是否持久化
*/privateboolean durable =true;// 默认true持久化,重启消息不会丢失/**
* 当所有队绑定列均不在使用时,是否自动删除交换机
*/privateboolean autoDelete =false;// 默认false,不自动删除/**
* 交换机其他参数
*/privateMap<String,Object> arguments;}/**
* 队列信息类
*/@DatapublicstaticclassQueue{/**
* 队列名称
*/privateString name;/**
* 是否持久化
*/privateboolean durable =true;// 默认true持久化,重启消息不会丢失/**
* 是否具有排他性
*/privateboolean exclusive =false;// 默认false,可多个消费者消费同一个队列/**
* 当消费者均断开连接,是否自动删除队列
*/privateboolean autoDelete =false;// 默认false,不自动删除,避免消费者断开队列丢弃消息/**
* 绑定死信队列的交换机名称
*/privateString deadLetterExchange;/**
* 绑定死信队列的路由key
*/privateString deadLetterRoutingKey;privateMap<String,Object> arguments;}}
动态创建队列,交换机
添加配置,动态创建队列,交换机
rabbitmq:
# 动态创建和绑定队列、交换机的配置
modules:
# 延时队列,到了过期的时间会被转发到订单死信队列
- routing-key: log.inbound.operation.queue.key
queue:
name: log.inbound.operation.queue
# dead-letter-exchange: order.exchange# dead-letter-routing-key: order.close.routing.key
arguments:
# 1分钟(测试),单位毫秒
x-message-ttl: 60000
exchange:
name: log.exchange
生产者
生产者发送消息
@AutowiredprivateRabbitTemplate rabbitTemplate;
rabbitTemplate.convertAndSend("log.exchange","log.inventory.operation.queue.key", inventoryChangeLogDTO);
消费者
消费者接收消息
@RabbitListener(queues ="log.inventory.operation.queue")publicvoidhandleInventoryOperation(InventoryChangeLogDTO inventoryDTO){System.out.println(inventoryDTO);}
你知道的越多,你不知道的越多。
版权归原作者 carroll18 所有, 如有侵权,请联系我们删除。