SpringAMQP
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配。
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
1.Basic Queue 简单队列模型
1.1引入依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
1.2消息发送
在publisher服务的application.yml中添加配置
spring:rabbitmq:host: 192.168.150.101 # 主机名port:5672# 端口virtual-host: / # 虚拟主机username: haojiale # 用户名password:123321# 密码
在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送
packagecom.example.mq.spring;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){// 队列名称String queueName ="simple.queue";// 消息String message ="hello, spring amqp!";// 发送消息
rabbitTemplate.convertAndSend(queueName, message);}}
1.3消息接收
在consumer服务的application.yml中添加的配置内容同消息发送一样
在consumer服务中编写监听器,消费消息
packagecom.example.mq.listener;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueueMessage(String msg)throwsInterruptedException{System.out.println("spring 消费者接收到消息:【"+ msg +"】");}}
2.WorkQueue
WorkQueue也被称为任务模型。就是让多个消费者绑定到一个队列,共同消费队列中的消息
当消息处理比较耗时的时候,可能生产消息的速度远远大于消息的消费速度,消息就会堆积越来越多,无法及时处理,使用work模型,多个消费者共同处理消息提升消费速度
2.1消息发送
循环发送消息,模拟大量消息堆积现象
在publisher服务中添加一个测试方法:
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/@TestpublicvoidtestWorkQueue()throwsInterruptedException{// 队列名称String queueName ="simple.queue";// 消息String message ="hello, message_";for(int i =0; i <50; i++){// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}
2.2消息接收
模拟多个消费者绑定同一个队列,在consumer服务中添加2个新的方法:
@RabbitListener(queues ="simple.queue")publicvoidlistenWorkQueue1(String msg)throwsInterruptedException{System.out.println("消费者1接收到消息:【"+ msg +"】"+LocalTime.now());Thread.sleep(20);}@RabbitListener(queues ="simple.queue")publicvoidlistenWorkQueue2(String msg)throwsInterruptedException{System.err.println("消费者2........接收到消息:【"+ msg +"】"+LocalTime.now());Thread.sleep(200);}
执行测试方法结果是消费者1很快处理完了自己的25条消息,消费者2却在缓慢处理自己的25条消息,也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力是存在问题的。
解决
修改consumer服务的application.yml文件配置,通过设置prefetch来控制消费者预取的消息数量
spring:rabbitmq:listener:simple:prefetch:1# 每次只能获取一条消息,处理完成才能获取下一个消息
3.发布/订阅
发布订阅的模型图如图:
Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.1 Fanout
3.1.1声明队列和交换机
在consumer服务中声明队列和交换机
packagecom.example.mq.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanoutConfig{/**
* 声明交换机
* @return Fanout类型交换机
*/@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("example.fanout");}/**
* 第1个队列
*/@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/**
* 第2个队列
*/@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
3.1.2消息发送
在publisher服务中编写测试方法发送消息
@TestpublicvoidtestFanoutExchange(){// 队列名称String exchangeName ="example.fanout";// 消息String message ="hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName,"", message);}
3.1.3消息接收
在consumer服务中添加两个方法,作为消费者:
@RabbitListener(queues ="fanout.queue1")publicvoidlistenFanoutQueue1(String msg){System.out.println("消费者1接收到Fanout消息:【"+ msg +"】");}@RabbitListener(queues ="fanout.queue2")publicvoidlistenFanoutQueue2(String msg){System.out.println("消费者2接收到Fanout消息:【"+ msg +"】");}
3.1.4总结
交换机的作用:
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败情况下消息丢失
- FanoutExchange会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
- Queue
- FanoutExchange
- Binding
3.2 Direct
在Direct模型下:
- 队列与交换机的绑定不能是任意绑定,而是要指定一个RoutingKey(路由key)
- 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
- Exchange不会再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RountingKey与消息的RoutingKey完全一致,才会收到消息
3.2.1消息发送
在publisher服务中添加测试方法:
@TestpublicvoidtestSendDirectExchange(){// 交换机名称String exchangeName ="itcast.direct";// 消息String message ="红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"red", message);}
3.2.2消息接收——基于注解声明交换机和队列
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="direct.queue1"),
exchange =@Exchange(name ="itcast.direct", type =ExchangeTypes.DIRECT),
key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="direct.queue2"),
exchange =@Exchange(name ="itcast.direct", type =ExchangeTypes.DIRECT),
key ={"red","yellow"}))publicvoidlistenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【"+ msg +"】");}
3.2.3总结
Direct交换机与Fanout交换机的区别?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
3.3 Topic
Topic类型的Exchang可以让队列在绑定Routing key的时候使用通配符
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:
item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配
item.spu.insert
或者
item.spu
item.*
:只能匹配
item.spu
图示:
3.3.1消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
@TestpublicvoidtestSendTopicExchange(){// 交换机名称String exchangeName ="itcast.topic";// 消息String message ="喜报!孙悟空大战哥斯拉,胜!";// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"china.news", message);}
3.3.2消息接收
在consumer服务中添加方法:
@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="topic.queue1"),
exchange =@Exchange(name ="itcast.topic", type =ExchangeTypes.TOPIC),
key ="china.#"))publicvoidlistenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="topic.queue2"),
exchange =@Exchange(name ="itcast.topic", type =ExchangeTypes.TOPIC),
key ="#.news"))publicvoidlistenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【"+ msg +"】");}
版权归原作者 ~越努力越幸运~ 所有, 如有侵权,请联系我们删除。