0


RabbitMQ速通入门

一.RabbitMQ快速入门

  1. 1.MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。
  2. 2.消息队列示意图

  1. 大致流程就是消费者(consumer)订阅某个队列。生产者(product)创建消息,然后发布到队列中(queue),最终将消息发送到监听的消费者。
  2. 上图中(组成):

.Broker:标识消息队列服务器实体

.Virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象

.Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

.Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

.Banding:绑定,用于消息队列和交换机之间的关联。

.Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

.Connection:网络连接,比如一个TCP连接。

.Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。

.Consumer:消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。

二.RabbitMQ下载安装

  1. RabbitMQ分为windows版本的安装和linux版本的安装,举例windows的下载和安装,要先安装erlang并且配置环境变量再安装,环境搭建过程详情见下文

RabbitMQ的下载与安装以及Erlang环境的搭建 - 简书:简书

三.为什么要使用RabbitMQ

3.1.使用RabbitMQ的好处

在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

3.2.RabbitMQ使用场景

开发中消息队列通常有如下应用场景:

任务异步处理

  1. 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。

应用程序解耦合

  1. MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

削峰填谷

  1. 如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。

  1. 消息被MQ保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个数据,这样慢慢写入数据库,这样就不会卡死数据库了。

  1. 但是使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”

3.3.RabbitMQ端口号

3.4.AMQP 和 JMS

3.4.1.AMQP

  1. AMQP是一种协议,更准确来说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质区别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
  2. AMQP是一个二进制协议,拥有一些现代化特点:多信道、协商式,异步,安全,扩平台,中立,高效。

3.4.2.JMS

  1. JMSJava消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

3.4.3.AMQP和JMS的区别
JMSAMPQ
定义

Java api

Wire-protocol

跨语言

跨平台

支持消息类型

多种消息类型:

TextMessage

MapMessage

BytesMessage

StreamMessage

ObjectMessage

Message (只有消息头和属性)

byte[]

当实际应用时,有复杂的消息,可以将消息序列化后发送。

综合评价

JMS 定义了JAVA API层面的标准;在java体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差;

AMQP定义了wire-level层的协议标准;天然具有跨平台、跨语言特性。

3.5.消息队列产品

  1. 市面上常见的消息队列有如下:

■ActiveMQ:基于JMS

■ZeroMQ:基于C语言开发

■RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好

■RocketMQ:基于JMS,阿里巴巴产品

■Kafka:类似MQ的产品;分布式消息系统,高吞吐量

  1. 各自的特点(看图自行感受,总结,分析):

特性ActiveMQRabbitMQKafkaRocketMQPRODUCER-COMSUMER支持支持支持支持PUBLISH-SUBSCRIBE支持支持支持支持REQUEST-REPLY支持支持-支持API完备性高高高低(静态配置)多语言支持支持,JAVA优先语言无关支持,JAVA优先支持单机呑吐量万级万级十万级单机万级消息延迟-微秒级毫秒级-可用性高(主从)高(主从)非常高(分布式)高消息丢失-低理论上不会丢失-消息重复-可控制理论上会有重复-文档的完备性高高高中提供快速入门有有有无首次部署难度-低中高
注: - 表示尚未查找到准确数据

3.6/RabbitMQ(是AMQP协议的Erlang的实现)

  1. RabbitMQ是由erlang语言开发,基于AMQPAdvanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
  2. RabbitMQ官方地址:http://www.rabbitmq.com/
  3. RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);
  4. 官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

四.入门案例(小白玩家撸码前记得先把服务打开)

4.1.简单模式

4.1.1.运行环境创建工程

4.1.2.添加依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.6.0</version>
  5. </dependency>

4.1.3.编写生产者

  1. public class Producer {
  2. static final String QUEUE_NAME = "simple_queue";
  3. public static void main(String[] args) throws IOException, TimeoutException {
  4. //创建连接工厂
  5. Connection connection = ConnectionUtil.getConnection();
  6. //创建信道
  7. Channel channel = connection.createChannel();
  8. // 声明(创建)队列
  9. channel.queueDeclare(QUEUE_NAME,true,false,false,null);
  10. //要发送的信息
  11. String message = "Hello World";
  12. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  13. System.out.println("已发送消息:"+message);
  14. //关闭资源
  15. channel.close();
  16. connection.close();
  17. }
  18. }
  1. 注:在执行上述的消息发送之后;可以登录rabbitMQ的管理控制台,可以发现队列和其消息,在这里不再演示

4.1.4.编写消费者

  1. public class Consumer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. Connection connection = ConnectionUtil.getConnection();
  4. //创建频道
  5. Channel channel = connection.createChannel();
  6. // 声明(创建)队列
  7. channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
  8. //创建消费者:并设置消息处理
  9. DefaultConsumer consumer = new DefaultConsumer(channel){
  10. @Override
  11. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  12. //路由key
  13. System.out.println("路由key为:"+envelope.getRoutingKey());
  14. //交换机
  15. System.out.println("交换机为:"+envelope.getExchange());
  16. //消息id
  17. System.out.println("消息id为:"+envelope.getDeliveryTag());
  18. //收到的消息
  19. System.out.println("接收到的消息为:"+new String(body));
  20. }
  21. };
  22. //监听消息
  23. channel.basicConsume(Producer.QUEUE_NAME,true,consumer);
  24. //不关闭资源,一直监听
  25. }
  26. }

4.1.5.小结

  1. 上述的入门案例中中其实使用的是如下的简单模式:

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

4.2.RabbitQM运转流程(以4.1简单模式案例分析)

  1. 生产者发送消息
  1. 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;

  2. 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;

  3. 将路由键(空字符串)与队列绑定起来;

  4. 发送消息至RabbitMQ Broker;

  5. 关闭信道;

  6. 关闭连接;

    1. ■消费者接收信息
  7. 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker

  8. 向Broker 请求消费相应队列中的消息,设置相应的回调函数;

  9. 等待Broker回应闭关投递响应队列中的消息,消费者接收消息;

  10. 确认(ack,自动确认)接收到的消息;

  11. RabbitMQ从队列中删除相应已经被确认的消息;

  12. 关闭信道;

  13. 关闭连接;

4.3.生产者流转过程说明

  1. 客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
  2. 客户端调用connection.createChannel方法。此方法开启信道,其包装的channel.open命令发送给Broker,等待channel.basicPublish方法,对应的AMQP命令为Basic.Publish,这个命令包含了content Header 和content Body()。content Header 包含了消息体的属性,例如:投递模式,优先级等,content Body 包含了消息体本身。
  3. 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。

4.4.消费者流转过程说明

  1. 消费者客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
  2. 消费者客户端调用connection.createChannel方法。和生产者客户端一样,协议涉及Channel . Open/Open-Ok命令。
  3. 在真正消费之前,消费者客户端需要向Broker 发送Basic.Consume 命令(即调用channel.basicConsume 方法〉将Channel 置为接收模式,之后Broker 回执Basic . Consume - Ok 以告诉消费者客户端准备好消费消息。
  4. Broker 向消费者客户端推送(Push) 消息,即Basic.Deliver 命令,这个命令和Basic.Publish 命令一样会携带Content Header 和Content Body。
  5. 消费者接收到消息并正确消费之后,向Broker 发送确认,即Basic.Ack 命令。
  6. 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。

五.RabbitMQ工作模式

5.1.简单模式见入门案例

5.2.Work queues工作队列模式

5.2.1.模式说明

  1. Work Queues

与入门程序的

  1. 简单模式

相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

5.2.2.码来

  1. ■生产者
  1. public class Producer {
  2. static final String QUEUE_NAME = "work_queue";
  3. public static void main(String[] args) throws IOException, TimeoutException {
  4. //本人将获取连接封装成工具类,你们可以把入门案例的连接代码复制过来
  5. Connection connection = ConnectionUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 声明(创建)队列
  8. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  9. for (int i = 1; i <= 30; i++) {
  10. // 发送信息
  11. String message = "work模式--" + i;
  12. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  13. System.out.println("已发送消息:" + message);
  14. }
  15. // 关闭资源
  16. channel.close();
  17. connection.close();
  18. }
  19. }
  1. ■消费者(这里用两个消费者演示,你们可以把消费者代码复制成两份)
  1. public class Consumer1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. Connection connection = ConnectionUtil.getConnection();
  4. // 创建频道
  5. final Channel channel = connection.createChannel();
  6. // 声明(创建)队列
  7. channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
  8. //一次只能接收并处理一个消息
  9. channel.basicQos(1);
  10. //创建消费者:并设置消息处理
  11. DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
  12. @Override
  13. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  14. try {
  15. //路由key
  16. System.out.println("路由key为:" + envelope.getRoutingKey());
  17. //交换机
  18. System.out.println("交换机为:" + envelope.getExchange());
  19. //消息id
  20. System.out.println("消息id为:" + envelope.getDeliveryTag());
  21. //收到的消息
  22. System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
  23. Thread.sleep(1000);//单纯增强实验效果
  24. //确认信息
  25. channel.basicAck(envelope.getDeliveryTag(),false);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. };
  31. //监听消息
  32. channel.basicConsume(Producer.QUEUE_NAME,false,defaultConsumer);
  33. }
  34. }

5.2.3.测试

  1. ■消费者1

  1. ■消费者2

  1. 小结:可以看出,在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是**竞争**的关系。

5.3.Publish/Subscribe 发布订阅模式

5.3.1.模式介绍

  1. ■每个消费者监听自己的队列
  2. ■生产者将消息发给broker,由交换机将消息转发到绑定此交换机的队列,每个绑定交换机的队列都将接收到消息

相较于工作队列模式,订阅模式在原来的结构上稍加不同,多了交换机角色,而且过程也略有变化:

P:生产者,这里发送信息到交换机(X)中,而不再直接发送到队列

C1(C2):消费者,等待并接受消息

Queue(红色方块):消息队列,接收、缓存消息

X(Exchange): 交换机,接收并处理消息,不具备储存消息的功能,如果没有队列与其绑定,或者没有符合路由规则的队列,消息就会丢失(gg)...

  1. 常见的交换机有如下3种:
  2. Fanout:广播型,将消息发送给所有绑定交换机的队列
  3. Direct :定向型,把消息发送给指定路由key的队列
  4. Topic :通配符,个人理解就是在路由模式(下面会讲到)的队列

5.3.2.码来

  1. ■生产者
  1. /**
  2. * 发布与订阅使用的交换机类型为:fanout
  3. */
  4. public class Producer {
  5. //交换机名称
  6. static final String FANOUT_EXCHAGE = "fanout_exchange";
  7. //队列名称
  8. static final String FANOUT_QUEUE_1 = "fanout_queue_1";
  9. //队列名称
  10. static final String FANOUT_QUEUE_2 = "fanout_queue_2";
  11. public static void main(String[] args) throws IOException, TimeoutException {
  12. //创建连接
  13. Connection connection = ConnectionUtil.getConnection();
  14. // 创建信道
  15. Channel channel = connection.createChannel();
  16. channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
  17. //创建队列
  18. channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
  19. channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
  20. //队列绑定交换机
  21. channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHAGE,"");
  22. channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHAGE,"");
  23. for (int i = 1; i <= 10; i++) {
  24. // 发送信息
  25. String message = "岳某人 发布订阅模式--" + i;
  26. channel.basicPublish(FANOUT_EXCHAGE,"",null,message.getBytes());
  27. System.out.println("已发送信息:"+message);
  28. }
  29. // 关闭资源
  30. channel.close();
  31. connection.close();
  32. }
  33. }
  1. ■消费者(累了,自行复制一份)
  1. public class Consumer1 {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. // 创建频道
  5. Channel channel = connection.createChannel();
  6. //声明交换机
  7. channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
  8. // 声明(创建)队列
  9. channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);
  10. //队列绑定交换机
  11. channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, "");
  12. //创建消费者;并设置消息处理
  13. DefaultConsumer consumer = new DefaultConsumer(channel){
  14. @Override
  15. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  16. //路由key
  17. System.out.println("路由key为:" + envelope.getRoutingKey());
  18. //交换机
  19. System.out.println("交换机为:" + envelope.getExchange());
  20. //消息id
  21. System.out.println("消息id为:" + envelope.getDeliveryTag());
  22. //收到的消息
  23. System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
  24. }
  25. };
  26. //监听消息
  27. channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);
  28. }
  29. }

5.3.3.测试

  1. ■消费者1

  1. ■消费者2

小结:与交换机绑定的队列,发布的消息队列都能收到

5.4.Routing路由模式

5.4.1.模式介绍

  • 发送方向交换机发送消息时,必须指定消息的RoutingKey
  • 交换机不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey判断筛选,只有队列的RoutingKey与消息的RoutingKey一致,才会接收到消息

  1. 由上图也可看出,基本机构与发布订阅模式基本一致,只是多了种规则(个人理解)。

5.4.2.码来

  1. ■生产者
  1. /**
  2. * 路由模式的交换机类型为:direct
  3. */
  4. public class Producer {
  5. //交换机名称
  6. static final String DIRECT_EXCHAGE = "direct_exchange";
  7. //队列名称
  8. static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
  9. //队列名称
  10. static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
  11. public static void main(String[] args) throws IOException, TimeoutException {
  12. //创建连接
  13. Connection connection = ConnectionUtil.getConnection();
  14. // 创建频道
  15. Channel channel = connection.createChannel();
  16. //声明交换机
  17. channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
  18. //声明队列
  19. channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
  20. channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
  21. //与队列绑定交换机
  22. channel.queueBind(DIRECT_QUEUE_INSERT,DIRECT_EXCHAGE,"insert");
  23. channel.queueBind(DIRECT_QUEUE_UPDATE,DIRECT_EXCHAGE,"update");
  24. //发送信息
  25. String message = "cxk 路由模式;routing key 为 insert";
  26. channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());
  27. System.out.println("已发送信息:" + message);
  28. //发送信息
  29. message = "Chicken you are too beautiful 路由模式;routing key 为 update";
  30. channel.basicPublish(DIRECT_EXCHAGE,"update",null,message.getBytes());
  31. System.out.println("已发送信息:" + message);
  32. // 关闭资源
  33. channel.close();
  34. connection.close();
  35. }
  36. }
  1. ■消费者
  1. public class Consumer1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. Connection connection = ConnectionUtil.getConnection();
  4. // 创建频道
  5. Channel channel = connection.createChannel();
  6. //声明交换机
  7. channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
  8. //声明队列
  9. channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);
  10. //队列绑定交换机
  11. channel.queueBind(Producer.DIRECT_QUEUE_INSERT,Producer.DIRECT_EXCHAGE,"insert");
  12. //创建消费者,并设置消息处理
  13. DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
  14. @Override
  15. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  16. //路由key
  17. System.out.println("路由key为:" + envelope.getRoutingKey());
  18. //交换机
  19. System.out.println("交换机为:" + envelope.getExchange());
  20. //消息id
  21. System.out.println("消息id为:" + envelope.getDeliveryTag());
  22. //收到的消息
  23. System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
  24. }
  25. };
  26. //监听消息
  27. channel.basicConsume(Producer.DIRECT_QUEUE_INSERT,true,defaultConsumer);
  28. }
  29. }

5.4.3.测试

  1. ■消费者1

  1. ■消费者2

  1. 小结:Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。

5.5. Topics通配符模式

5.5.1.模式介绍

  1. 个人理解:和Routing模式很类似,只不过Routing模式的路由键是固定的,Topic类型的交换机可以模糊匹配路由键(非固定)
  2. 开发中路由键一般都是多个单词拼凑的,中间用.连接,例如:ys.add
  3. 通配符规则:

'#':匹配一个或多个词

'*':只能匹配一个词

5.5.2.码来

  1. ■生产者
  1. public class Producer {
  2. //交换机名称
  3. static final String TOPIC_EXCHAGE = "topic_exchange";
  4. //队列名称
  5. static final String TOPIC_QUEUE_1 = "topic_queue_1";
  6. //队列名称
  7. static final String TOPIC_QUEUE_2 = "topic_queue_2";
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. //创建连接
  10. Connection connection = ConnectionUtil.getConnection();
  11. // 创建频道
  12. Channel channel = connection.createChannel();
  13. //声明交换机
  14. channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
  15. //发送消息
  16. String message = "新增了商品。Topic模式;routing key 为 item.insert " ;
  17. channel.basicPublish(TOPIC_EXCHAGE,"item.insert",null,message.getBytes());
  18. // 发送信息
  19. message = "修改了商品。Topic模式;routing key 为 item.update" ;
  20. channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
  21. System.out.println("已发送消息:" + message);
  22. // 发送信息
  23. message = "删除了商品。Topic模式;routing key 为 item.delete" ;
  24. channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());
  25. System.out.println("已发送消息:" + message);
  26. // 关闭资源
  27. channel.close();
  28. connection.close();
  29. }
  30. }
  1. ■消费者
  1. public class Consumer1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. Connection connection = ConnectionUtil.getConnection();
  4. // 创建频道
  5. Channel channel = connection.createChannel();
  6. //声明交换机
  7. channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
  8. //声明队列
  9. channel.queueDeclare(Producer.TOPIC_QUEUE_1,true,false,false,null);
  10. //队列绑定交换机
  11. channel.queueBind(Producer.TOPIC_QUEUE_1,Producer.TOPIC_EXCHAGE,"item.update");
  12. channel.queueBind(Producer.TOPIC_QUEUE_1,Producer.TOPIC_EXCHAGE,"item.delete");
  13. //创建消费者
  14. DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
  15. @Override
  16. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  17. //路由key
  18. System.out.println("路由key为:" + envelope.getRoutingKey());
  19. //交换机
  20. System.out.println("交换机为:" + envelope.getExchange());
  21. //消息id
  22. System.out.println("消息id为:" + envelope.getDeliveryTag());
  23. //收到的消息
  24. System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
  25. }
  26. };
  27. //监听消息
  28. channel.basicConsume(Producer.TOPIC_QUEUE_1,true,defaultConsumer);
  29. }
  30. }

5.5.3. 测试

  1. 小结:topic 通配符模式又叫主题模式,可以实现发布订阅模式和路由模式的功能,只是topic在配置routingkey时更加灵活

六.模式总结

RabbitMQ工作模式:

1、简单模式 HelloWorld 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

2、工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

3、发布订阅模式 Publish/subscribe 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

4、路由模式 Routing 需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

5、通配符模式 Topic 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

**6.RPC模式 **RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现

希望对你们有所帮助

标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/yueyoucai/article/details/122247235
版权归原作者 岳有才 所有, 如有侵权,请联系我们删除。

“RabbitMQ速通入门”的评论:

还没有评论