文章目录
RabbitMQ消息模型
RabbitMQ 是一个基于 AMQP 协议的开源消息代理软件,用于在分布式系统中进行异步消息传递。
- 基本消息模型。生产者–>1个queue–>1个消费者
- 工作队列模型。生产者–>1个queue–>N个消费者
- 广播-Fanout订阅模型。生产者–>交换机exchange–>N个queue–>N个消费者
- 定向-Direct订阅模型。生产者–>交换机exchange(routing key)–>N个queue–>N个消费者
- 通配符-Topic订阅模型。生产者–>交换机exchange(.routing key.)–>N个queue–>N个消费者
- RPC(远程过程调用)模型。
点对点:基本消息模型、工作队列消息模型。
发布订阅: fanout direct topic。
一 、基本消息模型
P(producer/ publisher):生产者,一个发送消息的用户应用程序。
C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
总之:
生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。
1.生产者
packagecom.usian.util.simple;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.usian.util.ConnectionUtil;publicclassSend{privatefinalstaticStringQUEUE_NAME="simple_queue";publicstaticvoidmain(String[] argv)throwsException{// 获取到连接以及mq通道Connection connection =ConnectionUtil.getConnection();// 从连接中创建通道,这是完成大部分API的地方。Channel channel = connection.createChannel();// 声明(创建)队列,必须声明队列才能够发送消息,我们可以把消息发送到队列中。// 声明一个队列是幂等的 - 只有当它不存在时才会被创建
channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 消息内容String message ="Hello World!";
channel.basicPublish("",QUEUE_NAME,null, message.getBytes());System.out.println(" [x] Sent '"+ message +"'");//关闭通道和连接
channel.close();
connection.close();}}
2.消费者
packagecom.usian.util.simple;importjava.io.IOException;importcom.rabbitmq.client.AMQP.BasicProperties;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;importcom.usian.util.ConnectionUtil;publicclassRecv{privatefinalstaticStringQUEUE_NAME="simple_queue";publicstaticvoidmain(String[] argv)throwsException{// 获取到连接Connection connection =ConnectionUtil.getConnection();// 创建通道Channel channel = connection.createChannel();// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 定义队列的消费者DefaultConsumer consumer =newDefaultConsumer(channel){// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body)throwsIOException{// body 即消息体String msg =newString(body);System.out.println(" [x] received : "+ msg +"!");}};// 监听队列,第二个参数:是否自动进行消息确认。
channel.basicConsume(QUEUE_NAME,true, consumer);}}
3.消息确认机制(ACK)
publicclassRecv2{privatefinalstaticStringQUEUE_NAME="simple_queue";publicstaticvoidmain(String[] argv)throwsException{// 获取到连接Connection connection =ConnectionUtil.getConnection();// 创建通道finalChannel channel = connection.createChannel();// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 定义队列的消费者DefaultConsumer consumer =newDefaultConsumer(channel){// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body)throwsIOException{// body 即消息体String msg =newString(body);System.out.println(" [x] received : "+ msg +"!");/**
* 手动进行ACK
* deliveryTag:该消息的index
* multiple:是否批量处理.true:将一次性ack所有小于deliveryTag的消息。
*/
channel.basicAck(envelope.getDeliveryTag(),false);}};// 监听队列,如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。
channel.basicConsume(QUEUE_NAME,false, consumer);}}
二、work工作队列消息模型
工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们可以稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。
这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。C1C2不可重复消费一个消息。
1.生产者
publicclassSend{privatefinalstaticStringQUEUE_NAME="test_work_queue";publicstaticvoidmain(String[] argv)throwsException{// 获取到连接Connection connection =ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 循环发布任务for(int i =0; i <50; i++){// 消息内容String message ="task .. "+ i;
channel.basicPublish("",QUEUE_NAME,null, message.getBytes());System.out.println(" [x] Sent '"+ message +"'");Thread.sleep(i *2);}// 关闭通道和连接
channel.close();
connection.close();}}
2.消费者一
packagecom.usian.util.simple;importjava.io.IOException;importcom.rabbitmq.client.AMQP.BasicProperties;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;importcom.usian.util.ConnectionUtil;publicclassRecv{privatefinalstaticStringQUEUE_NAME="test_work_queue";publicstaticvoidmain(String[] argv)throwsException{// 获取到连接Connection connection =ConnectionUtil.getConnection();// 创建通道Channel channel = connection.createChannel();// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 定义队列的消费者DefaultConsumer consumer =newDefaultConsumer(channel){// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body)throwsIOException{// body 即消息体String msg =newString(body);System.out.println(" [消费者1] received : "+ msg +"!");}};// 监听队列,第二个参数:是否自动进行消息确认。
channel.basicConsume(QUEUE_NAME,true, consumer);}}
3.消费者二
packagecom.usian.util.simple;importjava.io.IOException;importcom.rabbitmq.client.AMQP.BasicProperties;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.DefaultConsumer;importcom.rabbitmq.client.Envelope;importcom.usian.util.ConnectionUtil;publicclassRecv{privatefinalstaticStringQUEUE_NAME="test_work_queue";publicstaticvoidmain(String[] argv)throwsException{// 获取到连接Connection connection =ConnectionUtil.getConnection();// 创建通道Channel channel = connection.createChannel();// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 定义队列的消费者DefaultConsumer consumer =newDefaultConsumer(channel){// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body)throwsIOException{// body 即消息体String msg =newString(body);System.out.println(" [消费者2] received : "+ msg +"!");}};// 监听队列,第二个参数:是否自动进行消息确认。
channel.basicConsume(QUEUE_NAME,true, consumer);}}
三、广播-Fanout订阅模型
在广播模式下,消息发送流程是这样的:
1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
1.生产者
publicclassSend{privatefinalstaticStringEXCHANGE_NAME="fanout_exchange_test";publicstaticvoidmain(String[] argv)throwsException{// 获取到连接Connection connection =ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 消息内容String message ="Hello everyone";// 发布消息到Exchange
channel.basicPublish(EXCHANGE_NAME,"",null, message.getBytes());System.out.println(" [生产者] Sent '"+ message +"'");
channel.close();
connection.close();}}
2.消费者一
publicclassRecv{privatefinalstaticStringQUEUE_NAME="fanout_exchange_queue_1";privatefinalstaticStringEXCHANGE_NAME="fanout_exchange_test";publicstaticvoidmain(String[] argv)throwsException{// 获取到连接Connection connection =ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定队列到交换机-----重要
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");// 定义队列的消费者DefaultConsumer consumer =newDefaultConsumer(channel){// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body)throwsIOException{// body 即消息体String msg =newString(body);System.out.println(" [消费者1] received : "+ msg +"!");}};// 监听队列,自动返回完成
channel.basicConsume(QUEUE_NAME,true, consumer);}}
3.消费者二
publicclassRecv2{privatefinalstaticStringQUEUE_NAME="fanout_exchange_queue_2";privatefinalstaticStringEXCHANGE_NAME="fanout_exchange_test";publicstaticvoidmain(String[] argv)throwsException{// 获取到连接Connection connection =ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");// 定义队列的消费者DefaultConsumer consumer =newDefaultConsumer(channel){// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body)throwsIOException{// body 即消息体String msg =newString(body);System.out.println(" [消费者2] received : "+ msg +"!");}};// 监听队列,自动返回完成
channel.basicConsume(QUEUE_NAME,true, consumer);}}
四、定向-Direct订阅模型
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息。
有选择性的接收消息
在订阅模式中,生产者发布消息,所有消费者都可以获取所有消息。
在路由模式中,我们将添加一个功能 - 我们将只能订阅一部分消息。 例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。
1.生产者
publicclassSend{privatefinalstaticStringEXCHANGE_NAME="direct_exchange_test";publicstaticvoidmain(String[] argv)throwsException{// 获取到连接Connection connection =ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为direct
channel.exchangeDeclare(EXCHANGE_NAME,"direct");// 消息内容String message ="商品新增了, id = 1001";// 发送消息,并且指定routing key 为:insert ,代表新增商品
channel.basicPublish(EXCHANGE_NAME,"insert",null, message.getBytes());System.out.println(" [商品服务:] Sent '"+ message +"'");
channel.close();
connection.close();}}
2.消费者一
publicclassRecv{privatefinalstaticStringQUEUE_NAME="direct_exchange_queue_1";privatefinalstaticStringEXCHANGE_NAME="direct_exchange_test";publicstaticvoidmain(String[] argv)throwsException{// 获取到连接Connection connection =ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");// 定义队列的消费者DefaultConsumer consumer =newDefaultConsumer(channel){// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body)throwsIOException{// body 即消息体String msg =newString(body);System.out.println(" [消费者1] received : "+ msg +"!");}};// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME,true, consumer);}}
3.消费者二
public class Recv2 {
private final static String QUEUE_NAME = "direct_exchange_queue_2";
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
五、通配符-Topic订阅模型
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词*:匹配不多不少恰好1个词
举例:
audit.#:能够匹配audit.irs.corporate 或者 audit.irsaudit.*:只能匹配audit.irs
1.生产者
publicstaticvoidmain(String[] argv)throwsException{// 获取到连接Connection connection =ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为topic
channel.exchangeDeclare(EXCHANGE_NAME,"topic");// 消息内容String message ="新增商品 : id = 1001";// 发送消息,并且指定routing key 为:insert ,代表新增商品
channel.basicPublish(EXCHANGE_NAME,"item.insert",null, message.getBytes());System.out.println(" [商品服务:] Sent '"+ message +"'");
channel.close();
connection.close();}
2.消费者一
public class Recv {
private final static String QUEUE_NAME = "topic_exchange_queue_1";
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3.消费者二
/**
* 消费者2
*/
public class Recv2 {
private final static String QUEUE_NAME = "topic_exchange_queue_2";
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
六、使用场景
RabbitMQ 是一种广泛使用的开源消息代理软件,它提供了一种在分布式系统中进行异步通信的方法。以下将详细描述 RabbitMQ 的几种典型应用场景:
- 异步处理 - 场景说明:用户注册后,需要发送注册邮件和注册短信。传统的做法有串行和并行两种方式。- 串行方式:将注册信息写入数据库后,先发送注册邮件,再发送注册短信,所有任务完成后才返回给客户端。这种方法的问题在于邮件和短信并非必须即时完成,却让客户端长时间等待。- 并行方式:将注册信息写入数据库后,同时发送邮件和短信,所有任务完成后返回给客户端。这种方式虽然提高了处理时间,但客户端仍然需要等待这些非关键任务的完成。- 引入消息队列:把发送邮件和短信等非关键业务逻辑异步处理。这样客户端的响应时间仅等于写入数据库的时间,大大提升了用户体验。
- 系统解耦 - 场景说明:例如双11购物节,用户下单后,订单系统需要通知库存系统。- 传统做法:订单系统直接调用库存系统的接口。这种做法在库存系统出现故障时会导致订单失败,从而影响整个购物体验。- 引入消息队列:订单系统在用户下单后,完成持久化处理,将消息写入消息队列并返回成功。库存系统订阅下单消息,获取消息后进行库存操作。这样即使库存系统出现故障,消息队列也能保证消息的可靠投递,不会丢失消息。
- 流量削峰 - 场景说明:例如秒杀活动,因为流量过大可能导致应用挂掉。- 作用 1. 控制活动人数,超过一定阈值的订单直接丢弃。2. 缓解短时间的高流量压垮应用,按最大处理能力获取订单。- 实现方法:用户的请求先写入消息队列,如果消息队列长度超过最大值,则直接抛弃或跳转到错误页面。秒杀业务根据消息队列中的请求信息进行后续处理。
- 任务分发 - Round-robin dispatching:RabbitMQ 的分发机制适合并发程序的扩展。如果负载加重,可以创建更多的 Consumer 来进行任务处理。- Message acknowledgment:为了保证数据不被丢失,RabbitMQ 支持消息确认机制。Consumer 在处理完数据之后发送 ack,告诉 RabbitMQ 数据已经被正确处理,可以安全删除。
- 消息持久化 - 必要性:确保在系统重启后消息不会丢失。- 实现方法包括 exchange、queue 和 message 本身的持久化 1. 声明时指定
durable => true
。2. 投递时指定delivery_mode => 2
(持久化消息)。 - 公平分发 - 问题:默认状态下,RabbitMQ 将第 n 个 Message 分发给第 n 个 Consumer,不考虑 Consumer 的当前负载。这可能导致某些 Consumer 空闲,而另一些 Consumer 过载。- 解决方法:通过
basic.qos
方法设置prefetch_count=1
,确保每个 Consumer 在任何时刻只处理一个消息。 - RPC模式 - 场景说明:适用于需要远程计算并等待结果返回的场景。- 实现步骤 1. 客户端创建一个匿名独享的回调队列。2. 在 RPC 请求中携带
reply_to
属性设置回调队列,和一个唯一值的correlation_id
属性。3. 将请求发送到一个特定的rpc_queue
。4. RPC 工作者(服务器)等待请求到达队列并执行相应的处理,处理完成后将结果发送到reply_to
指定的队列。5. 客户端等待回调队列中的数据并根据correlation_id
匹配请求并返回应用结果。
综上所述,RabbitMQ 提供了多种应用场景的解决方案,包括异步处理、系统解耦、流量削峰、任务分发、消息持久化、公平分发和 RPC 模式。这些场景基本涵盖了分布式系统中的常见需求,使得 RabbitMQ 成为构建可靠、高效、可扩展系统的有力工具。
版权归原作者 猿来如此啊 所有, 如有侵权,请联系我们删除。