0


RabbitMQ

RabbitMQ

1.什么是中间件

中间件可以理解为一个帮助不同软件、应用或系统之间交流和数据传输的工具或服务。就像一个翻译员在两个讲不同语言的人之间传递信息,让他们能够互相理解和沟通。中间件位于客户端(比如你的电脑或手机应用)和服务器(存放数据和运行服务的强大计算机)之间,确保数据顺利传输,同时还可以提供额外的功能,比如安全性、数据管理和消息服务等。
简单来说,中间件是一种软件服务,用于连接不同的应用程序和系统,帮助它们更好地工作和交流,无论它们是在同一个地方还是分布在全球不同的位置。这就像是建立在不同软件组件之间的桥梁,确保信息能够顺畅、安全地流动。

2.为什么需要使用消息中间件

具体地说,中间件屏蔽了底层操作系统的复杂性,使程序开发人员面对一个简单而统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必再为程序在不同系统软件上的移植而重复工作,从而大大减少了技术上的负担。中间件带给应用系统的,不只是开发的简便、开发周期的缩短,也减少了系统的维护、运行和管理的工作量,还减少了计算机总体费用的投入。

3.中间件特性

1.互操作性和兼容性: 中间件使得不同平台、不同技术和不同语言编写的应用程序能够相互交流和协作。它提供了一种标准的方法来促进不同系统之间的通信,无论这些系统的底层技术如何不同。
2.抽象层: 中间件提供了一个抽象层,隐藏了底层网络和数据通信的复杂性,使得开发者可以专注于应用逻辑的开发,而不需要深入了解底层技术的细节。
3.可扩展性: 中间件设计时考虑到了系统的可扩展性,可以支持从几个到成千上万个并发用户和服务。这使得应用程序可以根据需要轻松扩展,以适应不断增长的用户需求。
4.安全性: 中间件还提供了安全机制,包括认证、授权、加密和数据完整性验证等,以保证数据传输的安全和防止未经授权的访问。
5.高可用性和可靠性: 通过提供故障转移、负载均衡和事务管理等功能,中间件确保了应用程序的高可用性和可靠性。即使在部分系统组件失败的情况下,也能保证服务的连续性和数据的完整性。
6.异步通信和消息队列: 中间件支持异步通信模式,允许应用程序在不直接等待响应的情况下发送和接收消息。这通过使用消息队列等技术实现,提高了应用程序的性能和响应速度。

4.什么是消息中间件

消息中间件是一种特定类型的中间件,专注于消息的传递和交换。它允许不同的应用程序、系统或软件组件之间通过消息来进行通信,而不是直接调用对方的接口或方法。这种通信方式可以是异步的,也就是说,发送消息的一方不需要等待接收方立即处理消息和回应。消息中间件通常提供了一系列功能,以支持复杂的消息处理模式,包括但不限于消息队列、发布/订阅模型、消息路由和消息持久化。

4.1基于消息中间件的分布式系统的架构

在这里插入图片描述

4.2消息中间件应用的场景

1.系统解耦:在复杂的应用架构中,各个系统或服务之间的直接依赖会导致维护和扩展变得困难。消息中间件通过提供一个中介层,允许服务之间通过消息进行通信,从而减少了它们之间的直接耦合。这样,即使某个服务发生变化,也不会直接影响到其他服务。
2.异步处理:在处理耗时操作(如发送电子邮件、生成报告等)时,可以将这些任务异步化,即发送一个消息到消息队列,然后继续处理其他任务。后台服务可以从队列中取出消息并处理这些耗时操作,这样可以提高应用的响应速度和用户体验。
3.负载均衡:通过消息中间件,可以将任务分散到多个处理单元上执行,从而实现负载均衡。这对于处理大量并发请求或大数据处理尤为重要,可以有效分配系统资源,避免某个服务因过载而崩溃。
4.数据同步:在分布式系统中,确保数据在不同的服务或数据库之间保持一致性是一项挑战。通过消息中间件,可以实现数据的变更通知和同步,当一个服务更新了数据后,可以通过消息通知其他服务进行相应的更新。
5.事件驱动架构:在事件驱动架构中,系统的行为是由事件触发的。消息中间件允许服务发布事件(消息)到一个共享的事件通道,其他服务可以订阅这些事件并作出响应。这种模式支持高度模块化和动态的系统行为,使得应用能够灵活地响应各种事件。
6.微服务架构:在微服务架构中,应用被分解成许多小型、独立的服务,它们通过网络进行通信。消息中间件是实现服务间通信的理想选择,因为它支持异步消息传递、服务解耦和弹性扩展。
7.大数据处理和实时流处理:在大数据和实时数据流处理场景中,消息中间件(如Apache Kafka)能够处理高吞吐量的数据流。它允许数据被实时捕获、存储、处理和分析,支持复杂的数据处理管道和实时分析应用。

4.3消息中间件的本质及设计

它是一种接受数据,接受请求、存储数据、发送数据等功能的技术服务。
在这里插入图片描述

4.4核心组成部分

1.消息队列:消息队列是消息中间件的基本组成部分,它暂时存储在发送者和接收者之间传递的消息。队列确保消息能够按照发送的顺序被逐一处理,即使接收者暂时无法处理消息,也能保证消息不会丢失。
2.交换器(Exchange):在一些消息中间件系统中,特别是使用发布/订阅模式的系统里,交换器负责接收生产者发送的消息,并根据路由规则决定消息应该发送到哪个队列。交换器使得消息分发更加灵活和强大。
3.消息通道:消息通道是连接应用程序和消息中间件的通信路径。它可以是点对点的,也可以是发布/订阅模式下的多对多通信。消息通道抽象了底层的网络通信细节,为消息的发送和接收提供了一个简单的接口。
4.消息代理(Broker):消息代理是消息中间件的中心组件,它管理着消息队列、处理消息的路由、传递和可能的持久化。消息代理还负责处理消息的接收、存储和转发给目标接收者。
5.客户端库:客户端库提供了一套API,使得应用程序能够与消息中间件系统交互,包括发送和接收消息。这些库通常是针对特定编程语言设计的,简化了消息中间件的集成和使用。
6.管理和监控工具:为了确保消息中间件的稳定运行和高效性能,管理和监控工具是必不可少的。这些工具可以监控消息流量、队列长度、处理延迟等指标,并提供配置管理、故障诊断和性能调优的功能。
7.持久化存储:为了保证消息不会因为系统故障而丢失,许多消息中间件支持将消息持久化到磁盘或其他存储系统。这确保了即使在发生故障的情况下,消息也能够被安全地恢复。

4.5 RabbitMQ使用的协议

RabbitMQ 主要使用的协议是 AMQP(高级消息队列协议,Advanced Message Queuing Protocol)。AMQP 是一个应用层协议,专为异步消息队列提供标准化的通信协议,旨在确保兼容性和可靠性,支持多种消息中间件产品之间的互操作性。
AMQP 定义了消息传递的各种方面,包括消息的排队、路由(包括点对点和发布/订阅模式)、安全性、事务和其他消息服务。RabbitMQ 作为一个遵循 AMQP 标准的消息队列系统,充分利用了这些特性,提供了一个高效、可靠、可扩展的消息中间件解决方案。

除了 AMQP,RabbitMQ 还支持其他的协议,例如:
MQTT(消息队列遥测传输):一个轻量级的消息协议,适用于小型设备和移动应用,常用于物联网(IoT)场景。
STOMP(简单文本导向的消息协议):一种简单的互操作协议,设计用于异步消息传递,支持多种语言和环境。
HTTP和WebSockets:通过插件支持,RabbitMQ 可以接受HTTP和WebSockets协议的消息,使其能够更容易地集成到Web应用中。
这种多协议支持使得 RabbitMQ 能够在多种不同的应用场景中使用,从企业级应用到物联网和实时Web应用,都能提供稳定可靠的消息传递服务。

4.5消息队列持久化

简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。
常见的持久化方式:文件存储和数据库存储
在这里插入图片描述

4.6分发策略

  1. 轮询分发(Round-Robin Dispatching) 描述:这是最基本的分发策略,消息按顺序轮流分发给每个消费者。当有多个消费者从同一个队列中获取消息时,RabbitMQ 会按照消费者的顺序依次将消息分发给它们,每个消费者轮流接收一个消息。 应用场景:适用于处理相似工作量的消息时,可以保证工作负载在不同消费者之间大致平均分配。
  2. 公平分发(Fair Dispatching) 描述:为了避免某些消费者因处理较慢的任务而积压大量消息,RabbitMQ 允许通过设置预取值(prefetch count)来控制在消费者发送 ack(确认)之前,队列最多发送给消费者的消息数量。 应用场景:适用于处理工作量不均匀的任务,确保没有单个消费者被过度负载。
  3. 优先级队列 描述:RabbitMQ 支持优先级队列,允许为消息设置优先级。队列会根据消息的优先级决定分发顺序,高优先级的消息会先于低优先级的消息被消费。 应用场景:适用于那些需要紧急处理的消息,确保关键任务能够得到优先执行。
  4. 基于发布/订阅的分发 描述:在发布/订阅模型中,消息被发送到交换器(exchange),然后根据绑定(binding)规则分发到一个或多个队列。消费者从各自订阅的队列中接收消息。 应用场景:适用于需要将消息广播到多个消费者的场景,如日志收集、消息通知等。
  5. 路由分发 描述:使用具有选择性路由能力的交换器(如直连交换器direct exchange,主题交换器topic exchange),可以基于消息的路由键(routing key)将消息精确地分发到一个或多个队列。 应用场景:适用于复杂的条件下的消息分发,如基于特定规则或属性将消息路由到不同的消费者。 这些分发策略提供了灵活的方式来控制消息如何在生产者和消费者之间流动,使得RabbitMQ能够适应各种不同的消息传递需求和应用场景。选择合适的分发策略可以显著提高应用的性能和响应能力。

4.7消息队列高可用和高可靠

所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。
所谓高可用是指:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠。
在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。
如何保证中间件消息的可靠性呢?可以从两个方面考虑:
1:消息的传输:通过协议来保证系统间数据解析的正确性。
2:消息的存储可靠:通过持久化来保证消息的可靠性。

4.8集群模式

  1. Master-Slave主从共享数据的部署方式 解释:在这种模式下,主节点(Master)和从节点(Slave)共享相同的数据存储。主节点处理写操作,而从节点可以处理读操作。如果主节点出现故障,可以手动或自动切换到一个从节点作为新的主节点,以此来保证服务的可用性。 例子:传统的数据库系统,如MySQL的复制功能,可以配置成主从共享同一个物理存储,或者通过网络文件系统(NFS)共享数据。
  2. Master-Slave主从同步部署方式 解释:在主从同步模式中,数据从主节点同步到一个或多个从节点。主节点负责处理所有写操作,同时,写操作的数据会被同步到从节点,确保数据的一致性。如果主节点失败,从节点可以被提升为新的主节点。 例子:PostgreSQL的流复制(Streaming Replication)允许从节点实时地接收主节点的更新,实现数据的同步。
  3. 多主集群同步部署模式 解释:这种模式允许多个主节点同时接收和处理写操作。所有的主节点之间会相互同步数据,以保持数据的一致性。这种模式可以提高系统的写入吞吐量和可用性。 例子:Galera Cluster为MySQL提供了多主节点的集群支持,其中每个节点都可以处理写操作,并且写操作会被同步到其他节点。
  4. 多主集群转发部署模式 解释:在这个模式下,多个主节点可以接收写操作,但写操作会被转发到一个指定的节点进行实际的数据修改。这个指定的节点随后会将数据更改同步到其他节点。这种方式可以简化数据同步的逻辑,但增加了写操作的延迟。 例子:某些自定义的数据库集群可能采用这种模式,以便集中管理数据的更新,然后通过自定义的同步机制将更新推送到其他节点。
  5. Master-Slave与Broker-Cluster组合的方案 解释:这种方案结合了Master-Slave模式的数据同步特性和Broker-Cluster的消息队列功能。数据的写操作首先在Master上进行,然后通过消息队列(Broker-Cluster)分发到Slave节点进行数据同步。这种方式既保证了数据的实时同步,又提高了系统的可扩展性和容错能力。 例子:可以将RabbitMQ用作Broker-Cluster,结合数据库的Master-Slave复制,实现高效的数据同步和分发。例如,一个Web应用的数据库更新操作在主数据库(Master)上执行,然后通过RabbitMQ将更新事件异步分发给其他从数据库(Slave),实现数据的快速同步。 举例:传统的数据库系统和某些消息队列系统经常使用主从模式来提高数据的可用性和读取性能。
  6. 对等(Peer-to-Peer)模式 描述:在这个模式下,每个节点都具有相同的角色和责任,节点之间相互协作,没有明确的主从关系。这种模式可以提高系统的可扩展性和容错能力,因为每个节点都可以处理消息,并且节点之间可以共享状态信息。 举例:某些分布式文件系统和P2P网络使用对等模式来提高系统的可扩展性和数据的可用性。
  7. 分片(Sharding)模式 描述:通过将数据分散存储到多个节点上,每个节点只负责处理一部分数据或消息。这种方式可以提高系统的吞吐量和可扩展性,因为它允许并行处理大量的消息。 举例:RabbitMQ和其他一些高性能消息队列系统支持分片插件或配置,以此来分散负载并提高处理能力。
  8. 镜像队列(Mirrored Queues)模式 描述:在RabbitMQ中,镜像队列是一种特殊的集群模式,用于在多个节点上复制队列的完整状态,以实现高可用性。如果一个节点失败,其他节点上的镜像队列可以继续处理消息,无需手动干预。 举例:RabbitMQ的镜像队列功能允许在不同服务器上创建队列的精确副本,这样即使某个服务器发生故障,消息也不会丢失,系统可以继续运行。
  9. 自动容错(Autonomous Failover)模式 描述:这种模式下,集群能够自动检测节点故障,并将故障节点上的工作负载自动迁移到健康节点上,以此来保证服务的连续性和数据的完整性。 举例:Apache Kafka通过使用Zookeeper来管理集群状态,实现了自动的故障转移和领导者选举机制,从而提高了整个系统的可靠性和可用性。

5.RabbitMQ使用

5.1一些基本配置

1.默认情况下,rabbitmq是没有安装web端的客户端插件,需要安装才可以生效

rabbitmq-plugins enable rabbitmq_management

安装完毕以后,重启服务即可

systemctl restart rabbitmq-server

在浏览器访问
http://localhost:15672
账号密码都是guest

新增用户

rabbitmqctl add_user admin admin

设置用户分配操作权限

rabbitmqctl set_user_tags admin administrator

用户级别:

1、administrator 可以登录控制台、查看所有信息、可以对rabbitmq进行管理
2、monitoring 监控者 登录控制台,查看所有信息
3、policymaker 策略制定者 登录控制台,指定策略
4、managment 普通管理员 登录控制台

其他配置

rabbitmqctl add_user 账号 密码
rabbitmqctl set_user_tags 账号 administrator
rabbitmqctl change_password UsernameNewpassword 修改密码
rabbitmqctl delete_user Username 删除用户
rabbitmqctl list_users 查看用户清单
rabbitmqctl set_permissions -p / 用户名 ".*"".*"".*" 为用户设置administrator角色
rabbitmqctl set_permissions -p / root ".*"".*"".*"

6.RabbitMQ角色分类

1.none:
不能访问management plugin,也就是不能显示如下界面
在这里插入图片描述
2.management:查看自己相关节点信息
列出自己可以通过AMQP登入的虚拟机
查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息
查看和关闭自己的channels和connections
查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息。
3.Policymaker
包含management所有权限
查看和创建和删除自己的virtual hosts所属的policies和parameters信息。
4.Monitoring
包含management所有权限
罗列出所有的virtual hosts,包括不能登录的virtual hosts。
查看其他用户的connections和channels信息
查看节点级别的数据如clustering和memory使用情况
查看所有的virtual hosts的全局统计信息。
5.Administrator
最高权限
可以创建和删除virtual hosts
可以查看,创建和删除users
查看创建permisssions
关闭所有用户的connections

7.什么是AMQP

AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计。
AMQP生产者流转过程
在这里插入图片描述
AMQP消费者流转过程
在这里插入图片描述

8.RabbitMQ的核心组成部分

在这里插入图片描述
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

9.RabbitMQ的运行流程

在这里插入图片描述

11.RabbitMQ支持消息的模式

RabbitMQ 是一种消息代理,它支持不同的消息传递模式,这些模式定义了消息如何从发送者传递到接收者。下面是一些通俗易懂的解释和例子:

  1. 简单模式(Simple):- 描述:一对一的消息发送。一个生产者发送消息到队列,一个消费者从队列接收消息。- 例子:一个用户界面应用(生产者)发送用户请求到后端服务器(消费者)处理。
  2. 工作队列模式(Work Queues):- 描述:将任务分配给多个工作者(消费者)。一位生产者发送任务,多位消费者平分这些任务并独立工作。- 例子:一个应用发送大量的图像处理请求到一个队列,多个图像处理服务(消费者)从队列获取并处理这些图像。
  3. 发布/订阅模式(也叫Fanout模式)(Publish/Subscribe):- 描述:消息一对多分发。一个生产者发送消息到交换机,然后被转发到多个队列,每个队列有一个消费者。- 例子:一个新闻发布系统将新闻推送到不同的频道,订阅了相应频道的用户都能接收到新闻。
  4. 路由模式(也叫direct模式)(Routing):- 描述:根据一定的规则(路由键)将消息路由到一个或多个队列。生产者将消息发送到交换机,并指定一个路由键,交换机根据路由键决定消息的去向。- 例子:日志系统根据日志级别(如“error”、“info”)发送消息到不同的处理队列。
  5. 主题模式(Topics):- 描述:消息根据模式匹配(而不是完全相等,也就是模糊匹配)路由到一个或多个队列。主题交换机可以根据通配符进行模式匹配,路由到不同队列。- 例子:股市应用,将股票更新发送到交换机,交换机根据股票代码模式(如stock.us.*)转发到对应的队列。
  6. 参数模式(Headers):- 描述:而不是根据路由键,消费者从交换机接收消息是基于发送消息的头信息(参数)。- 例子:一个系统根据消息头信息中的“x-device-type”和“x-user-language”参数,决定将推送的广告发送到哪个用户的队列。

RabbitMQ 通过这些模式提供了灵活的消息路由选项,以适应不同的消息传递场景和需求。

10.简单代码案例

10.1simple

生产者

importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassProducer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 5: 申明队列queue存储消息/*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            channel.queueDeclare("queue1",false,false,false,null);// 6: 准备发送消息的内容String message ="你好,啊啊啊!!!";// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routing// @params3: 属性配置// @params4: 发送消息的内容
            channel.basicPublish("","queue1",null, message.getBytes());System.out.println("消息发送成功!");}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}

消费者

importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者");// 4: 从连接中获取通道channel
            channel=connection.createChannel();
            channel.basicConsume("queue1",true,newDeliverCallback(){@Overridepublicvoidhandle(String s,Delivery delivery)throwsIOException{System.out.println("收到消息是"+newString(delivery.getBody(),"utf-8"));}},newCancelCallback(){@Overridepublicvoidhandle(String s)throwsIOException{System.out.println("消息接收失败");}});System.out.println("开始接收消息");System.in.read();}catch(Exception ex){
            ex.printStackTrace();}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}

10.2Fanout模式

生产者

importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassProducer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost7");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 6: 准备发送消息的内容String message ="你好!!!";String  exchangeName ="fanout-exchange";String routingKey ="";// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容
            channel.basicPublish(exchangeName, routingKey,null, message.getBytes());System.out.println("消息发送成功!");}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}

消费者

importjava.io.IOException;publicclassConsumer{privatestaticRunnable runnable =()->{// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");//获取队列的名称finalString queueName =Thread.currentThread().getName();Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 5: 申明队列queue存储消息/*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;
            finalChannel.basicConsume(queueName,true,newDeliverCallback(){@Overridepublicvoidhandle(String s,Delivery delivery)throwsIOException{System.out.println(queueName +":收到消息是:"+newString(delivery.getBody(),"UTF-8"));}},newCancelCallback(){@Overridepublicvoidhandle(String s)throwsIOException{}});System.out.println(queueName +":开始接受消息");System.in.read();}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}};publicstaticvoidmain(String[] args){// 启动三个线程去执行newThread(runnable,"queue-1").start();newThread(runnable,"queue-2").start();newThread(runnable,"queue-3").start();}}

10.3direct模式

生产者

importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassProducer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 6: 准备发送消息的内容String message ="你好!!!";String  exchangeName ="direct-exchange";String routingKey1 ="testkey";String routingKey2 ="testkey2";// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容
            channel.basicPublish(exchangeName, routingKey1,null, message.getBytes());
            channel.basicPublish(exchangeName, routingKey2,null, message.getBytes());System.out.println("消息发送成功!");}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}

消费者

importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumer{privatestaticRunnable runnable =()->{// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");//获取队列的名称finalString queueName =Thread.currentThread().getName();Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 5: 申明队列queue存储消息/*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;
            finalChannel.basicConsume(queueName,true,newDeliverCallback(){@Overridepublicvoidhandle(String s,Delivery delivery)throwsIOException{System.out.println(queueName +":收到消息是:"+newString(delivery.getBody(),"UTF-8"));}},newCancelCallback(){@Overridepublicvoidhandle(String s)throwsIOException{}});System.out.println(queueName +":开始接受消息");System.in.read();}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}};publicstaticvoidmain(String[] args){// 启动三个线程去执行newThread(runnable,"queue-1").start();newThread(runnable,"queue-2").start();newThread(runnable,"queue-3").start();}}
10.4 topic模式

生产者

importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassProducer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 6: 准备发送消息的内容String message ="你好!!!";String  exchangeName ="topic-exchange";String routingKey1 ="com.course.order";//都可以收到 queue-1 queue-2String routingKey2 ="com.order.user";//都可以收到 queue-1 queue-3// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容
            channel.basicPublish(exchangeName, routingKey1,null, message.getBytes());System.out.println("消息发送成功!");}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}

消费者

importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumer{privatestaticRunnable runnable =()->{// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");//获取队列的名称finalString queueName =Thread.currentThread().getName();Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 5: 申明队列queue存储消息/*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;
            finalChannel.basicConsume(queueName,true,newDeliverCallback(){@Overridepublicvoidhandle(String s,Delivery delivery)throwsIOException{System.out.println(queueName +":收到消息是:"+newString(delivery.getBody(),"UTF-8"));}},newCancelCallback(){@Overridepublicvoidhandle(String s)throwsIOException{}});System.out.println(queueName +":开始接受消息");System.in.read();}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}};publicstaticvoidmain(String[] args){// 启动三个线程去执行newThread(runnable,"queue-1").start();newThread(runnable,"queue-2").start();newThread(runnable,"queue-3").start();}}

以上代码都没有绑定关系,因为我在可视化界面完成了绑定关系,所以我偷了个懒

10.5 work模式

10.5.1轮询模式(平分:99个消息,三个消费者,每人33个)

生产者

importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassProducer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 6: 准备发送消息的内容//===============================end topic模式==================================for(int i =1; i <=20; i++){//消息的内容String msg ="消息:"+ i;// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容
                channel.basicPublish("","queue1",null, msg.getBytes());}System.out.println("消息发送成功!");}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}

消费者1

importcom.rabbitmq.client.*;importjava.io.IOException;publicclassWork1{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者-Work1");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 5: 申明队列queue存储消息/*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */// 这里如果queue已经被创建过一次了,可以不需要定义//            channel.queueDeclare("queue1", false, false, false, null);// 同一时刻,服务器只会推送一条消息给消费者// 6: 定义接受消息的回调Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1",true,newDeliverCallback(){@Overridepublicvoidhandle(String s,Delivery delivery)throwsIOException{try{System.out.println("Work1-收到消息是:"+newString(delivery.getBody(),"UTF-8"));Thread.sleep(2000);}catch(Exception ex){
                        ex.printStackTrace();}}},newCancelCallback(){@Overridepublicvoidhandle(String s)throwsIOException{}});System.out.println("Work1-开始接受消息");System.in.read();}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}

消费者2

importcom.rabbitmq.client.*;importjava.io.IOException;publicclassWork2{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者-Work2");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 5: 申明队列queue存储消息/*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, true, false, null);// 同一时刻,服务器只会推送一条消息给消费者//channel.basicQos(1);// 6: 定义接受消息的回调Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1",true,newDeliverCallback(){@Overridepublicvoidhandle(String s,Delivery delivery)throwsIOException{try{System.out.println("Work2-收到消息是:"+newString(delivery.getBody(),"UTF-8"));Thread.sleep(200);}catch(Exception ex){
                        ex.printStackTrace();}}},newCancelCallback(){@Overridepublicvoidhandle(String s)throwsIOException{}});System.out.println("Work2-开始接受消息");System.in.read();}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}
10.5.2公平分发模式(能者多劳:在一定时间里你处理的多就多给你点)

生产者

importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassProducer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 6: 准备发送消息的内容//===============================end topic模式==================================for(int i =1; i <=20; i++){//消息的内容String msg ="消息:"+ i;// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容
                channel.basicPublish("","queue1",null, msg.getBytes());}System.out.println("消息发送成功!");}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}

消费者1

importcom.rabbitmq.client.*;importjava.io.IOException;publicclassWork1{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者-Work1");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 5: 申明队列queue存储消息/*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */// 这里如果queue已经被创建过一次了,可以不需要定义//            channel.queueDeclare("queue1", false, false, false, null);// 同一时刻,服务器只会推送一条消息给消费者// 6: 定义接受消息的回调Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1",false,newDeliverCallback(){@Overridepublicvoidhandle(String s,Delivery delivery)throwsIOException{try{System.out.println("Work1-收到消息是:"+newString(delivery.getBody(),"UTF-8"));Thread.sleep(2000);
                        finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}catch(Exception ex){
                        ex.printStackTrace();}}},newCancelCallback(){@Overridepublicvoidhandle(String s)throwsIOException{}});System.out.println("Work1-开始接受消息");System.in.read();}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}

消费者2

importcom.rabbitmq.client.*;importjava.io.IOException;publicclassWork2{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者-Work2");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 5: 申明队列queue存储消息/*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, true, false, null);// 同一时刻,服务器只会推送一条消息给消费者//channel.basicQos(1);// 6: 定义接受消息的回调Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1",false,newDeliverCallback(){@Overridepublicvoidhandle(String s,Delivery delivery)throwsIOException{try{System.out.println("Work2-收到消息是:"+newString(delivery.getBody(),"UTF-8"));Thread.sleep(200);
                        finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}catch(Exception ex){
                        ex.printStackTrace();}}},newCancelCallback(){@Overridepublicvoidhandle(String s)throwsIOException{}});System.out.println("Work2-开始接受消息");System.in.read();}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}

11.SpringBoot案例代码

11.1fanout模式

代码思路如下图
在这里插入图片描述

生产者
在pom.xml引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

在application.yml进行配置

# 服务端口
server:
  port:8080
# 配置rabbitmq服务
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host:/
    host: localhost
    port:5672

定义订单的生产者

importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.util.UUID;@ComponentpublicclassOrderService{@AutowiredprivateRabbitTemplate rabbitTemplate;// 1: 定义交换机privateString exchangeName ="fanout_order_exchange";// 2: 路由keyprivateString routeKey ="";publicvoidmakeOrder(Long userId,Long productId,int num){// 1: 模拟用户下单String orderNumer =UUID.randomUUID().toString();// 2: 根据商品id productId 去查询商品的库存// int numstore = productSerivce.getProductNum(productId);// 3:判断库存是否充足// if(num >  numstore ){ return  "商品库存不足..."; }// 4: 下单逻辑// orderService.saveOrder(order);// 5: 下单成功要扣减库存// 6: 下单完成以后System.out.println("用户 "+ userId +",订单编号是:"+ orderNumer);// 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);}}

绑定关系

importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDirectRabbitConfig{//队列 起名:TestDirectQueue@BeanpublicQueueemailQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturnnewQueue("email.fanout.queue",true);}@BeanpublicQueuesmsQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturnnewQueue("sms.fanout.queue",true);}@BeanpublicQueueweixinQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturnnewQueue("weixin.fanout.queue",true);}//Direct交换机 起名:TestDirectExchange@BeanpublicDirectExchangefanoutOrderExchange(){//  return new DirectExchange("TestDirectExchange",true,true);returnnewDirectExchange("fanout_order_exchange",true,false);}//绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting@BeanpublicBindingbindingDirect1(){returnBindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");}@BeanpublicBindingbindingDirect2(){returnBindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");}@BeanpublicBindingbindingDirect3(){returnBindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");}}

消费者
引入的依赖和配置同上
邮件服务

importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;// bindings其实就是用来确定队列和交换机绑定关系@RabbitListener(bindings =@QueueBinding(// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value =@Queue(value ="email.fanout.queue",autoDelete ="false"),// order.fanout 交换机的名字 必须和生产者保持一致
        exchange =@Exchange(value ="fanout_order_exchange",// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type =ExchangeTypes.FANOUT)))@ComponentpublicclassEmailService{// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublicvoidmessagerevice(String message){// 此处省略发邮件的逻辑System.out.println("email-------------->"+ message);}}

短信服务

importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;// bindings其实就是用来确定队列和交换机绑定关系@RabbitListener(bindings =@QueueBinding(// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value =@Queue(value ="sms.fanout.queue",autoDelete ="false"),// order.fanout 交换机的名字 必须和生产者保持一致
        exchange =@Exchange(value ="fanout_order_exchange",// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type =ExchangeTypes.FANOUT)))@ComponentpublicclassSMSService{// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublicvoidmessagerevice(String message){// 此处省略发邮件的逻辑System.out.println("sms-------------->"+ message);}}

微信服务

importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;// bindings其实就是用来确定队列和交换机绑定关系@RabbitListener(bindings =@QueueBinding(// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value =@Queue(value ="weixin.fanout.queue",autoDelete ="false"),// order.fanout 交换机的名字 必须和生产者保持一致
        exchange =@Exchange(value ="fanout_order_exchange",// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type =ExchangeTypes.FANOUT)))@ComponentpublicclassWeixinService{// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublicvoidmessagerevice(String message){// 此处省略发邮件的逻辑System.out.println("weixin-------------->"+ message);}}

进行测试

packagecom.rabbitmq.springbootrabbitmqfanoutproducer;importcom.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer.service.OrderService;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassSpringbootRabbitmqFanoutProducerApplicationTests{@AutowiredOrderService orderService;@TestpublicvoidcontextLoads()throwsException{for(int i =0; i <10; i++){Thread.sleep(1000);Long userId =100L+ i;Long productId =10001L+ i;int num =10;
            orderService.makeOrder(userId, productId, num);}}}

11.2direct模式

依赖和配置跟上面一样

生产者

importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.util.UUID;@ComponentpublicclassOrderService{@AutowiredprivateRabbitTemplate rabbitTemplate;// 1: 定义交换机privateString exchangeName ="direct_order_exchange";// 2: 路由keyprivateString routeKey ="";publicvoidmakeOrder(Long userId,Long productId,int num){// 1: 模拟用户下单String orderNumer =UUID.randomUUID().toString();// 2: 根据商品id productId 去查询商品的库存// int numstore = productSerivce.getProductNum(productId);// 3:判断库存是否充足// if(num >  numstore ){ return  "商品库存不足..."; }// 4: 下单逻辑// orderService.saveOrder(order);// 5: 下单成功要扣减库存// 6: 下单完成以后System.out.println("用户 "+ userId +",订单编号是:"+ orderNumer);// 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);}}

绑定关系

importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDirectRabbitConfig{//队列 起名:TestDirectQueue@BeanpublicQueueemailQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturnnewQueue("email.fanout.queue",true);}@BeanpublicQueuesmsQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturnnewQueue("sms.fanout.queue",true);}@BeanpublicQueueweixinQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturnnewQueue("weixin.fanout.queue",true);}//Direct交换机 起名:TestDirectExchange@BeanpublicDirectExchangedirectOrderExchange(){//  return new DirectExchange("TestDirectExchange",true,true);returnnewDirectExchange("direct_order_exchange",true,false);}//绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting@BeanpublicBindingbindingDirect1(){returnBindingBuilder.bind(weixinQueue()).to(directOrderExchange()).with("");}@BeanpublicBindingbindingDirect2(){returnBindingBuilder.bind(smsQueue()).to(directOrderExchange()).with("");}@BeanpublicBindingbindingDirect3(){returnBindingBuilder.bind(emailQueue()).to(directOrderExchange()).with("");}}

消费者
邮件服务

packagecom.rabbitmq.springbootrabbitmqfanoutconsumer.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;// bindings其实就是用来确定队列和交换机绑定关系@RabbitListener(bindings =@QueueBinding(// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value =@Queue(value ="email.fanout.queue",autoDelete ="false"),// order.fanout 交换机的名字 必须和生产者保持一致
        exchange =@Exchange(value ="fanout_order_exchange",// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type =ExchangeTypes.FANOUT)))@ComponentpublicclassEmailService{// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublicvoidmessagerevice(String message){// 此处省略发邮件的逻辑System.out.println("email-------------->"+ message);}}

短信服务

packagecom.rabbitmq.springbootrabbitmqfanoutconsumer.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;// bindings其实就是用来确定队列和交换机绑定关系@RabbitListener(bindings =@QueueBinding(// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value =@Queue(value ="sms.fanout.queue",autoDelete ="false"),// order.fanout 交换机的名字 必须和生产者保持一致
        exchange =@Exchange(value ="fanout_order_exchange",// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type =ExchangeTypes.FANOUT)))@ComponentpublicclassSMSService{// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublicvoidmessagerevice(String message){// 此处省略发邮件的逻辑System.out.println("sms-------------->"+ message);}}

微信服务

importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;// bindings其实就是用来确定队列和交换机绑定关系@RabbitListener(bindings =@QueueBinding(// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value =@Queue(value ="weixin.fanout.queue",autoDelete ="false"),// order.fanout 交换机的名字 必须和生产者保持一致
        exchange =@Exchange(value ="fanout_order_exchange",// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type =ExchangeTypes.FANOUT)))@ComponentpublicclassWeixinService{// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublicvoidmessagerevice(String message){// 此处省略发邮件的逻辑System.out.println("weixin-------------->"+ message);}}

11.3topic模式

生产者

importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.util.UUID;@ComponentpublicclassOrderService{@AutowiredprivateRabbitTemplate rabbitTemplate;// 1: 定义交换机privateString exchangeName ="direct_order_exchange";// 2: 路由keyprivateString routeKey ="";publicvoidmakeOrder(Long userId,Long productId,int num){// 1: 模拟用户下单String orderNumer =UUID.randomUUID().toString();// 2: 根据商品id productId 去查询商品的库存// int numstore = productSerivce.getProductNum(productId);// 3:判断库存是否充足// if(num >  numstore ){ return  "商品库存不足..."; }// 4: 下单逻辑// orderService.saveOrder(order);// 5: 下单成功要扣减库存// 6: 下单完成以后System.out.println("用户 "+ userId +",订单编号是:"+ orderNumer);// 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);}}

绑定

importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDirectRabbitConfig{//队列 起名:TestDirectQueue@BeanpublicQueueemailQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturnnewQueue("email.fanout.queue",true);}@BeanpublicQueuesmsQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturnnewQueue("sms.fanout.queue",true);}@BeanpublicQueueweixinQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//   return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturnnewQueue("weixin.fanout.queue",true);}//Direct交换机 起名:TestDirectExchange@BeanpublicDirectExchangedirectOrderExchange(){//  return new DirectExchange("TestDirectExchange",true,true);returnnewDirectExchange("direct_order_exchange",true,false);}//绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting@BeanpublicBindingbindingDirect1(){returnBindingBuilder.bind(weixinQueue()).to(directOrderExchange()).with("");}@BeanpublicBindingbindingDirect2(){returnBindingBuilder.bind(smsQueue()).to(directOrderExchange()).with("");}@BeanpublicBindingbindingDirect3(){returnBindingBuilder.bind(emailQueue()).to(directOrderExchange()).with("");}}

消费者
邮件服务

packagecom.rabbitmq.springbootrabbitmqfanoutconsumer.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;// bindings其实就是用来确定队列和交换机绑定关系@RabbitListener(bindings =@QueueBinding(// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value =@Queue(value ="email.fanout.queue",autoDelete ="false"),// order.fanout 交换机的名字 必须和生产者保持一致
        exchange =@Exchange(value ="fanout_order_exchange",// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type =ExchangeTypes.FANOUT)))@ComponentpublicclassEmailService{// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublicvoidmessagerevice(String message){// 此处省略发邮件的逻辑System.out.println("email-------------->"+ message);}}

短信服务

importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;// bindings其实就是用来确定队列和交换机绑定关系@RabbitListener(bindings =@QueueBinding(// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value =@Queue(value ="sms.fanout.queue",autoDelete ="false"),// order.fanout 交换机的名字 必须和生产者保持一致
        exchange =@Exchange(value ="fanout_order_exchange",// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type =ExchangeTypes.FANOUT)))@ComponentpublicclassSMSService{// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublicvoidmessagerevice(String message){// 此处省略发邮件的逻辑System.out.println("sms-------------->"+ message);}}

微信服务

packagecom.rabbitmq.springbootrabbitmqfanoutconsumer.consumer;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.*;importorg.springframework.stereotype.Component;// bindings其实就是用来确定队列和交换机绑定关系@RabbitListener(bindings =@QueueBinding(// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value =@Queue(value ="weixin.fanout.queue",autoDelete ="false"),// order.fanout 交换机的名字 必须和生产者保持一致
        exchange =@Exchange(value ="fanout_order_exchange",// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type =ExchangeTypes.FANOUT)))@ComponentpublicclassWeixinService{// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublicvoidmessagerevice(String message){// 此处省略发邮件的逻辑System.out.println("weixin-------------->"+ message);}}

测试

importcom.rabbitmq.springbootrabbitmqfanoutproducer.service.OrderService;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassSpringbootRabbitmqFanoutProducerApplicationTests{@AutowiredOrderService orderService;@TestpublicvoidcontextLoads()throwsException{for(int i =0; i <10; i++){Thread.sleep(1000);Long userId =100L+ i;Long productId =10001L+ i;int num =10;
            orderService.makeOrder(userId, productId, num);}}}

12.过期时间TTL

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。

12.1设置队列TTL

importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageProperties;importjava.util.HashMap;importjava.util.Map;/**
 * @author: 学相伴-飞哥
 * @description: Producer 简单队列生产者
 * @Date : 2021/3/2
 */publicclassProducer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 5: 申明队列queue存储消息/*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */Map<String,Object> args2 =newHashMap<>();
            args2.put("x-message-ttl",5000);
            channel.queueDeclare("ttl.queue",true,false,false, args2);// 6: 准备发送消息的内容String message ="你好!!!";Map<String,Object> headers =newHashMap<String,Object>();
            headers.put("x","1");
            headers.put("y","1");AMQP.BasicProperties basicProperties =newAMQP.BasicProperties().builder().deliveryMode(2)// 传送方式.priority(1).contentEncoding("UTF-8")// 编码方式.expiration("3000")// 过期时间.headers(headers).build();//自定义属性// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routing// @params3: 属性配置// @params4: 发送消息的内容for(int i =0; i <100; i++){
                channel.basicPublish("","ttl.queue", basicProperties, message.getBytes());System.out.println("消息发送成功!");Thread.sleep(1000);}}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}

测试

importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 5: 申明队列queue存储消息/*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;
            finalChannel.basicConsume("ttl.queue",true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println(properties);System.out.println("获取的消息是:"+newString(body,"UTF-8"));}});System.out.println("开始接受消息");System.in.read();}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}

12.2消息的过期时间

消息的过期时间;只需要在发送消息(可以发送到任何队列,不管该队列是否属于某个交换机)的时候设置过期时间即可。在测试类中编写如下方法发送消息并设置过期时间到队列:

importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.util.HashMap;importjava.util.Map;publicclassMessageTTLProducer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
            channel = connection.createChannel();// 5: 申明队列queue存储消息/*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            channel.queueDeclare("ttl.queue2",true,false,false,null);// 6: 准备发送消息的内容String message ="你好!!!";Map<String,Object> headers =newHashMap<String,Object>();
            headers.put("x","1");
            headers.put("y","1");AMQP.BasicProperties basicProperties =newAMQP.BasicProperties().builder().deliveryMode(2)// 传送方式.priority(1).contentEncoding("UTF-8")// 编码方式.expiration("5000")// 过期时间.headers(headers).build();//自定义属性// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routing// @params3: 属性配置// @params4: 发送消息的内容for(int i =0; i <10; i++){
                channel.basicPublish("","ttl.queue2", basicProperties, message.getBytes());System.out.println("消息发送成功!");}}catch(Exception ex){
            ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
                    channel.close();}catch(Exception ex){
                    ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
                    connection.close();}catch(Exception ex){
                    ex.printStackTrace();}}}}}

13.RabbitMQ使用场景

  1. 异步处理:- 场景:当一个操作需要一段时间完成,而你不想让用户等待。- 例子:用户上传视频后,系统需要时间进行处理(比如转码)。用户上传视频后立即得到响应,而处理工作在后台异步进行。
  2. 应用解耦:- 场景:在复杂系统中,减少各个组件之间的依赖性。- 例子:电商系统中,订单系统和库存系统是独立的。当用户下单时,订单系统只需向消息队列发送一条消息,库存系统订阅这条消息后独立处理库存更新,两个系统互不影响。
  3. 负载均衡:- 场景:分散处理压力到多个工作实例,以提高处理能力和效率。- 例子:一个网站有大量的图像需要压缩,这些任务被发送到 RabbitMQ,多个工作者并行从队列中取任务进行图像压缩,加快处理速度。
  4. 消息缓冲:- 场景:处理突发的大量请求,通过缓冲来平滑系统负载。- 例子:在促销活动期间,电商网站可能会收到大量订单,RabbitMQ 可以作为缓冲区,暂存这些订单消息,然后逐渐处理,避免系统过载。
  5. 跨语言通信:- 场景:不同编程语言编写的系统需要交换数据。- 例子:前端系统使用 JavaScript 发送用户行为数据,后端系统用 Python 接收并处理这些数据。RabbitMQ 作为中间件,帮助不同语言编写的系统之间通信。
  6. 日志收集:- 场景:集中处理来自系统各个部分的日志信息。- 例子:多个服务分布在不同服务器上,它们将日志发送到 RabbitMQ,然后一个或多个日志处理服务从队列中读取并处理这些日志,比如存储到数据库或搜索引擎中。
  7. 分布式事务:- 场景:需要在多个服务间同步进行的操作,保证数据一致性。- 例子:在银行转账操作中,需要从一个账户扣款同时另一个账户增款。通过 RabbitMQ 确保两个操作要么都成功,要么都不执行。

RabbitMQ 的这些使用场景展示了它在现代软件架构中的多样性和强大功能,使得它成为系统间通信和集成的重要工具。

14.死信队列

死信队列(DLQ,Dead-Letter Queue)在消息队列系统中是一个重要的概念,它用来收集无法被正常消费的消息。在 RabbitMQ 中,死信队列的设置需要通过几个步骤配置,包括指定死信交换机(DLX,Dead-Letter Exchange)和死信路由键。消息可能因为以下几种情况进入死信队列:

  1. 消息被拒绝(Nack):消费者显式地通过 basic.rejectbasic.nack 拒绝消息,并且设置 requeue 参数为 false,表示不重新排队。
  2. 消息过期:消息在队列中存活的时间超过了它的 TTL(Time-To-Live)。
  3. 队列达到最大长度:消息被推送到一个已经达到最大长度的队列中。

14.1 为什么使用死信队列?

死信队列提供了一种机制,允许开发者处理那些无法正常处理的消息,而不是简单地丢弃这些消息,从而增加了系统的健壮性和可调试性。利用死信队列,开发者可以:

  • 分析为什么消息无法被处理。
  • 手动处理这些消息,可能是修复后重新发送,或者进行其他补救措施。

14.2举例

假设你有一个电子商务系统,其中包含一个处理订单的消息队列。当订单服务从队列接收消息并试图处理订单时,可能会遇到以下问题导致无法处理某些订单消息:

  • 订单信息不完整或格式错误。
  • 处理服务因为临时故障无法处理订单。
  • 订单处理超时。

在这种情况下,你可以配置一个死信队列,所有无法处理的订单消息都会被路由到这个队列。然后,你可以定期检查死信队列,分析问题原因,并对这些失败的订单进行手动处理或重试。

14.3配置示例

在 RabbitMQ 中配置死信队列涉及以下步骤:

  1. 创建一个死信交换机。
  2. 创建一个死信队列,并将其绑定到死信交换机。
  3. 配置你的主队列(比如订单处理队列),设置其死信交换机为步骤1中创建的交换机。这样,从主队列中被拒绝的消息就会自动路由到死信队列。

通过这种方式,死信队列为系统提供了一种安全网,确保所有消息都可以被追踪和处理,即使它们在第一次处理时失败了。

15 思考题

15.1Rabbitmq 为什么需要信道,为什么不是TCP直接通信

RabbitMQ 使用信道(Channels)而非直接通过 TCP 连接进行通信的主要原因是效率和资源管理。

  1. TCP连接的成本:建立 TCP 连接涉及三次握手过程,这在频繁创建和销毁连接的场景中是非常昂贵的。而且,操作系统对同时打开的 TCP 连接数量有限制,这可能成为系统的瓶颈。
  2. 多路复用:信道允许在一个 TCP 连接上多路复用,即多个信道共享一个 TCP 连接。这样,多个信道可以并行运行在一个连接上,每个信道可以执行不同的任务(如发送消息、接收消息等),而无需为每个信道建立新的 TCP 连接。这大大减少了网络开销和资源消耗。
  3. 隔离和安全:信道在逻辑上隔离了不同的消息流。即使多个信道共享同一个 TCP 连接,它们也可以独立工作,互不干扰。这提供了一种有效的方式来隔离不同类型的消息传递活动,并确保它们的独立性。
  4. 灵活的资源管理:通过使用信道,RabbitMQ 能够更灵活地管理资源和控制消息流。例如,可以对不同的信道设置不同的消息流控制策略,或者根据需要动态地添加或删除信道,而不影响到现有的 TCP 连接。
  5. 简化客户端连接:对于客户端来说,维护一个稳定的 TCP 连接比频繁地打开和关闭连接要简单得多。通过信道,客户端可以在不重新建立 TCP 连接的情况下,动态地增加消息发布或消费的并行度。

总之,信道提供了一种高效、灵活且资源友好的方式来处理大量并发的消息传递活动,这是直接使用 TCP 连接所无法实现的。

15.2queue队列到底在消费者创建还是生产者创建?

在 RabbitMQ 中,队列(Queue)的创建可以由生产者(Producer)、消费者(Consumer)或者两者都进行,这取决于应用程序的设计和需求。关键点在于确保队列在被生产者发送消息或消费者尝试从中读取消息之前已经存在。

  1. 生产者创建队列:- 在某些场景下,生产者知道它将发送消息的目的地,因此它负责创建队列,并将消息发送到这个队列。这通常发生在生产者启动时,它会确保所需的队列存在,然后再开始发送消息。
  2. 消费者创建队列:- 在其他场景下,消费者创建队列更为常见。消费者知道它们需要从哪个队列读取消息,因此在开始读取之前,它们会检查队列是否存在,如果不存在,则创建它。这种方式确保了消费者总是有一个队列可以从中读取消息。
  3. 两者都可以创建队列:- 在一些系统中,为了确保队列的存在,生产者和消费者都会尝试创建队列(如果队列不存在的话)。RabbitMQ 允许多次声明同一个队列(即尝试创建已存在的队列),只要每次声明的参数都是相同的,这样就不会有错误发生。

最佳实践

  • 在实际应用中,通常推荐由消费者创建队列,并进行相关的绑定操作(例如,将队列绑定到交换机)。这样做的原因是消费者最清楚自己的需求,包括队列的名称、是否需要持久化、是否独占等属性。
  • 同时,确保队列存在的责任也可以由部署脚本或应用程序启动脚本来承担,特别是在复杂的部署环境中。

最终,谁创建队列取决于你的具体应用场景和架构设计。无论哪种方式,关键是确保在尝试使用队列之前,队列已经被正确创建并配置。


本文转载自: https://blog.csdn.net/weixin_73000974/article/details/136125930
版权归原作者 小白不想秃头 所有, 如有侵权,请联系我们删除。

“RabbitMQ”的评论:

还没有评论