RabbitMQ 快速入门七种简单模式
起步
七种模式
这七种模型其实是 RabbitMQ 官方入门文档中给出的消息队运行模式
分别是:
- “Hello World!” (最初入门,一发一收)
- Work Queues (工作队列,一发多收)
- Publish/Subscribe (发布/订阅)
- Routing(消息路由)
- Topics(话题)
- RPC(请求/回复)
- Publisher Confirms(发布者确认)
官网地址:https://www.rabbitmq.com/getstarted.html
项目依赖
推荐创建两个 SpringBoot 项目,一个作为生产者,另一个作为消费者
也可使用 Maven 的继承聚合模式管理两个项目
项目中需要引入下面的依赖
<!-- AMQP 依赖,包含了 RabbitMQ 的相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
1、“Hello World!”
最简单的模式就是一发一收,也是最基础的模式,所以官方称之为 “Hello World!”,表示最初的入门
(1) Connection 方式
RabbitMQ 中有几种概念,分别是:虚拟主机(virtualHost),通道(channel),队列(queue),还有一个交换机(exchanges)的概念在之后会遇到
服务先与虚拟主机建立连接,然后创建通道,声明或创建队列之后发送或接收消息,消息最终会在队列中传输
下面使用 connection 的方式来实现接发消息,以便于理解 RabbitMQ 的模式(不常用到,了解即可)
生产者测试类
@SpringBootTestpublicclassPublisherTest{@TestpublicvoidtestSendMessage()throwsIOException,TimeoutException{// 1.建立连接ConnectionFactory factory =newConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.0.102");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道 ChannelChannel channel = connection.createChannel();// 3. 声明队列(不存在则创建)String queueName ="simple.queue";// 队列名
channel.queueDeclare(queueName,false,false,false,null);// 4.发送消息String message ="hello, rabbitmq!";
channel.basicPublish("", queueName,null, message.getBytes());System.out.println("发送消息成功:"+ message);// 5.关闭通道和连接
channel.close();
connection.close();}}
消费者测试类
@SpringBootTestpublicclassConsumerTest{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactory factory =newConnectionFactory();// 1.1 设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.0.102");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");// 1.2 建立连接Connection connection = factory.newConnection();// 2. 创建通道 ChannelChannel channel = connection.createChannel();// 3. 声明队列(不存在则创建)String queueName ="simple.queue";// 队列名
channel.queueDeclare(queueName,false,false,false,null);// 4. 获取消息
channel.basicConsume(queueName,true,newDefaultConsumer(channel){@SneakyThrows@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){// 5.处理消息String message =newString(body);System.out.println("接收到消息:"+ message);}});System.out.println("等待接收消息......");}}
我们可以在 RabbitMQ 的管理页面查看相关信息:
注意:
- 消费者接收消息是异步过程,而不会阻塞主线程
- queueDeclare 在不存在该队列时会创建队列,否则不创建
- 队列不会被自动删除,可以在管理页删除(点击队列名称,点击 Delete 选项)
- 消息只会被读取一次,未被读取的消息存放在队列中等待被消费
- 上例消费者没有关闭通道和连接的操作,不会只读取一条消息,而是一直等待不停读取
- RabbitMQ 重启后,队列因未持久化被删除,将 queueDeclare 第二个参数改为 true 以创建持久化队列(已存在的队列不可更改)
- RabbitMQ 重启后,虽有队列但消息没了,因为消息未持久化,发送消息时将 basicPublish 方法第三个参数改为
MessageProperties.PERSISTENT_TEXT_PLAIN
以持久化消息
持久化的队列在 Features 栏会有字母 D 标示,如图:
有持久化的消息可以看到 Properties 信息,未持久化则没有,如图:
(2) RabbitTemplate 方式
上面的例子可以看出,大多数代码是重复的,所以 SpringAMQP 中封装了 RabbitTemplate 以便于进行消息队列的操作
首先在项目 yaml 配置文件中假如 RabbitMQ 的连接相关配置
spring:rabbitmq:host: 192.168.0.102 # RabbitMQ 服务 ip 地址port:5672# 消息服务端口username: root # 用户名password:"123456"# 密码virtual-host: / #虚拟主机
然后就能自动装配 RabbitTemplate 类了
生产者测试类
@RunWith(SpringRunner.class)@SpringBootTest()publicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidhelloWorldModeTest(){String queueName ="hello.world.queue";String message ="Hello, springAMQP!";
rabbitTemplate.convertAndSend(queueName, message);}}
只需调用 convertAndSend 方法即可发送消息
注意:此操作不会创建队列,如果队列不存在则没有效果
若要创建队列,需要声明一个 Queue 类型的 bean 并受到 Spring 的管理
通常放在一个 Configuration 配置类中,示例如下:
@ConfigurationpublicclassRabbitMqConfig{@BeanpublicQueuesimpleQueue(){returnnewQueue("hello.world.queue");// 队列名与函数名无关}}
如此启动项目时,bean 被创建,就会创建一个队列(若已存在则不再创建)
消费者不再在测试类中演示,而是使用监听队列的方式
只需在一个方法上注解 @RabbitListener,并指定队列名
同时方法所在的类也要被 Spring 管理(注解 @Component)
@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues ="hello.world.queue")publicvoidlistenSimpleQueue(String message){System.out.printf("消费者接收到 hello.world.queue 的消息:【 %s 】\n", message);}}
启动项目即可监听队列并处理接收到的消息
**注意:如果监听的队列名不存在,则会报错
Failed to declare queue(s):[hello.world.queue]
,解决方法同前面的配置里创建队列**
注意:
- 此方式创建的队列默认持久化
- 此方式生产的消息默认持久化
2、Work Queues
Work Queues 工作队列其实就是一发多收的模式,当然也可以是多发多收
主要在于每一个消息可能就是一个要处理任务,所以多个消费者处理任务可以提高任务的执行效率
创建队列
创建队列的配置类可以只在生产者和消费者其一中配置,但有配置的一方应先启动,否则另一方会因不存在队列而失效
当然都配置也没问题,若队列已存在,也可以都不配置
@ConfigurationpublicclassRabbitMqConfig{@BeanpublicQueuesimpleQueue(){returnnewQueue("work.queue");}}
生产者
这里循环 50 次模拟多个任务的发布
@RunWith(SpringRunner.class)@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSendMessage2WorkQueue(){String queueName ="work.queue";String message ="work message ---- ";for(int i =1; i <=50; i ++){
rabbitTemplate.convertAndSend(queueName, message + i +"th");}}}
消费者
使用两个监听方法来模拟两个消费者,用线程的 sleep() 方法模拟处理任务花费的时间
@ComponentpublicclassSpringRabbitListener{privateint count1 =0;privateint count2 =0;@RabbitListener(queues ="work.queue")publicvoidlistenSimpleQueue1(String msg)throwsInterruptedException{System.out.printf("消费者 1 第 %d 次接收消息:【 %s 】 %s",++count1, msg,LocalTime.now().toString());Thread.sleep(50);}@RabbitListener(queues ="work.queue")publicvoidlistenSimpleQueue2(String msg)throwsInterruptedException{System.err.printf("消费者 2 第 %d 次接收消息:【 %s 】 %s",++count2, msg,LocalTime.now().toString());Thread.sleep(200);}}
由于两个消费者睡眠时间不同,应该时速度快的消费者处理的任务更多些
但实际运行起来结果是两种处理了相同的任务数量,速度较快消费者 1 处理了一半任务便不在处理了
这和消息队列的预取机制有关
当消费者端存在未处理完成的消息时,队列依然会派发消息给消费者,这些消息会存放在消费者端的缓存中
解决方法就是限制消费者端的预取信息条数,在消费者的 yaml 配置文件中做如下配置即可:
spring:rabbitmq:listener:simple:prefetch:1# 预取信息条数
预取数量限制为 1 时,消费者只会在处理好当前消息后才会收到新的消息
3、Publish/Subscribe
前面两种模式中,消息只会被一个消费之接收一次,在这里则不同
Publish/Subscribe(发布 / 订阅)模式中,表示生产者的发布的消息会广播给所有的消费者,类似于博主与粉丝的关系
也称为 fanout 模式,因为是使用 fanout 交换机来实现的
关系绑定
@ConfigurationpublicclassFanoutConfig{// 创建 Fanout (广播)交换机@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("my.fanout");}// 创建队列 1@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1");}// 创建队列 2@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2");}// 创建绑定关系(Fanout交换机与队列 1)@BeanpublicBindingfanoutBinding1(FanoutExchange fanoutExchange,Queue fanoutQueue1){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 创建绑定关系(Fanout交换机与队列 2)@BeanpublicBindingfanoutBinding2(FanoutExchange fanoutExchange,Queue fanoutQueue2){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
生产者
此时生产者就不再是向队列发送消息,而是向 Fanout 交换机发送
Fanout 交换机会将消息广播给所有与之绑定的队列
@RunWith(SpringRunner.class)@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidfanoutMode(){String exchangeName ="my.fanout";String msg ="Hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName,"", msg);}}
convertAndSend 向交换机发送消息时,第一个参数为交换机名称,第三个参数为消息内容
第二个参数为 routingKey 路由键,在之后的路由模式中会说到
消费者
消费者监听的依然是队列,没有什么特别的
@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues ="fanout.queue1")publicvoidlistenFanoutQueue1(String msg){System.out.printf("消费者接收到 fanout.queue1 的消息:【 %s 】\n", msg);}@RabbitListener(queues ="fanout.queue2")publicvoidlistenFanoutQueue2(String msg){System.out.printf("消费者接收到 fanout.queue2 的消息:【 %s 】\n", msg);}}
4、Routing
Routing 也就是路由模式,根据 Direct 交换机实现,也称为 Direct 模式
交换机根据收到的消息的路由键,有条件的路由给不同的队列,且可以是多个队列
消费者
在前面的 Fanout 模式中,我们知道了在配置类中创建交换机与队列绑定的方式
其实在 @RabbitListener 注解中我们就可以完成这一步骤
先看消费者的实现:
@ComponentpublicclassSpringRabbitListener{@RabbitListener(bindings =@QueueBinding(// 监听一个绑定关系
value =@Queue("direct.queue1")// 队列, exchange =@Exchange(name ="my.direct", type =ExchangeTypes.DIRECT)// 交换机(名称与类型), key ={"info","warning","err"}// 监听的路由键))publicvoidlistenDirectQueue1(String msg){System.out.printf("消费者接收 direct.queue1 的消息:【 %s 】", msg);}@RabbitListener(bindings =@QueueBinding(
value =@Queue("direct.queue2"), exchange =@Exchange(name ="my.direct")// 交换机默认类型即为 ExchangeTypes.DIRECT, key ={"err"}))publicvoidlistenDirectQueue2(String msg){System.err.printf("消费者接收 direct.queue2 的消息:【 %s 】", msg);}}
注意:当注解中的交换机及队列不存在时,要先启动消费者来创建,然后再启动生产者
顺便一提:如若在配置里中声明 Binding 的 bean 时,使用 with() 来指定路由键
如:
BindingBuilder.bind(queue).to(directExchange).with("info, err");
生产者
此时生产者向指定交换机发送消息时,指定路由键,即可将消息发送到对应的队列中
@RunWith(SpringRunner.class)@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSendDirectExchange(){String exchangeName ="my.direct";String msg;
msg ="Info message.";
rabbitTemplate.convertAndSend(exchangeName,"info", msg);
msg ="Warning message.";
rabbitTemplate.convertAndSend(exchangeName,"warning", msg);
msg ="Error message.";
rabbitTemplate.convertAndSend(exchangeName,"err", msg);}}
输出结果:
消费者接收 direct.queue2 的消息:【 Error message. 】
消费者接收 direct.queue1 的消息:【 Info message. 】
消费者接收 direct.queue1 的消息:【 Warning message. 】
消费者接收 direct.queue1 的消息:【 Error message. 】
queue2 最先输出是由于错误流与输出流的差异
5. Topics
Topics 即话题模式,它和 Routing 模式很相似,都是根据路由键来路由给符合条件的队列,但 Topic 的路由键支持通配符
消费者
@ComponentpublicclassSpringRabbitListener{@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="topic.queue1"), exchange =@Exchange(name ="my.topic", type =ExchangeTypes.TOPIC)// 交换机类型为 TOPIC, key ={"cn.*"}// * 为通配符,只能匹配到下一个点的任意字符串(如 cn.news 可匹配, cn.news.today 不可匹配)))publicvoidlistenTopicQueue1(String msg){System.out.printf("消费者接收 cn.* 的消息:【 %s 】\n", msg);}@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="topic.queue2"), exchange =@Exchange(name ="my.topic", type =ExchangeTypes.TOPIC), key ={"#.news"}// 也可用 # 做通配符,效果同 *))publicvoidlistenTopicQueue2(String msg){System.out.printf("消费者接收 *.news 的消息:【 %s 】\n", msg);}}
生产者
@RunWith(SpringRunner.class)@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSendTopicExchange(){String exchangeName ="my.topic";String message ="富强民主文明和谐自由平等公正法制爱国敬业诚信友善";
rabbitTemplate.convertAndSend(exchangeName,"cn.news", message);}}
6、RPC
RPC 也就是 Request/reply 请求回复模式,类似于网络请求与响应
在这种模式下,不在用生产者与消费者的称呼了,而是用客户端与服务端
下面是一个简单的小例子
创建队列
在服务端配置创建比较好
@ConfigurationpublicclassRpcConfig{// 创建用于接收请求的队列@BeanpublicQueuerequestQueue(){returnnewQueue("request.queue");}}
服务端
RabbitListener 注解在类上,而 @RabbitHandler 注解在方法上表示处理消息的方法
类似于 web 项目中的 Controller 控制器
@Component@RabbitListener(queues ="request.queue")publicclassRpcServer{@RabbitHandlerpublicStringprocess(String request){System.out.println("收到请求内容:"+ request);return"好的已收到!";}}
客户端
使用 convertSendAndReceive() 方法向请求队列发送消息即可,会返回一个 Object 对象,即为服务端 @RabbitHandler 所注解方法的返回值
测试类如下:
@RunWith(SpringRunner.class)@SpringBootTestpublicclassRabbitRpcTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidrpcTest(){String request ="收到请回复";Object reply = rabbitTemplate.convertSendAndReceive("request.queue", request);System.out.println("响应:"+ reply);}
7、Publisher Confirms
Publisher Confirms 即发布确认,用以保证消息成功推送到队列中,或者发送失败时做出相应的措施
配置
在生产者端配置 publisher-confirm-type,如下
spring:rabbitmq:publisher-confirm-type: correlated
消费者
在 rabbitTemplate 中添加消息确认的回调函数,当消息发送之后会根据发送结果调用该函数进行处理
为了标示消息的状态,一般还会设置一个 CorrelationData 对象根据消息一起发送,其中可以包含消息的内容或者一些属性
看如下示例:
@RunWith(SpringRunner.class)@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidconfirmTest()throwsExecutionException,InterruptedException{// 设定消息确认的回调函数
rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){System.out.println("id: "+ correlationData.getId());System.out.println("ack: "+ ack);// 是否成功System.out.println("cause: "+ cause);// 原因Message returnedMessage = correlationData.getReturnedMessage();// 获取返回信息System.out.println("returnedMessage: "+newString(returnedMessage.getBody()));}});// 消息本体String message ="Hello!";// 创建 CorrelationData 并设置返回信息CorrelationData correlationData =newCorrelationData("id-1");
correlationData.setReturnedMessage(newMessage(message.getBytes(),newMessageProperties()));// 发送消息到 confirm.queue 队列并携带 correlationData
rabbitTemplate.convertAndSend("confirm.queue",(Object) message, correlationData);// 调用 correlationData.getFuture().get() 阻塞主线程等待消息确认CorrelationData.Confirm confirm = correlationData.getFuture().get();System.out.println("confirm: "+ confirm);}}
注意:只要消息发送到了队列中,便返回正常结果(ack 为 true),而非是消费者接收之后
版权归原作者 槐阳7 所有, 如有侵权,请联系我们删除。