0


RabbitMQ基础

RabbitMQ官网:https://rabbitmq.p2hp.com/

初识MQ

  1. MQ全称为Message Queue-消息队列,是一种应用程序对应用程序的消息通信,一端只管往队列不断发布信息,另一端只管往队列中读取消息,发布者不需要关心读取消息的谁,读取消息者不需要关心发布消息的是谁,各干各的互不干扰。

同步通讯和异步通讯

  • 同步通讯- 优点: - 时效性强- 缺点:- 耦合度高,每次加入新的需求都要就该原来的代码- 性能下降,调用者需要等待服务提供者响应,如果调用链路过长则响应时间等于每次调用时间之和- 资源浪费,调用链中的每个服务在等待响应过程中,不能释放占用的资源,在高并发场景下会极度浪费系统资源- 级联失败,如果服务提供者出现问题,所有调用方都会出现问题,从而影响整个微服务
  • 异步通讯- 优点: - 耦合度低- 吞吐量提升- 故障隔离- 流量削峰- 缺点:- 依赖于Broker的可靠性、安全性、吞吐能力- 架构复杂,排查难度高

MQ技术选型

RabbitMQ

ActiveMQ

RocketMQ

Kafka

公司/社区

Rabbit

Apache

阿里

Apache

开发语言

Erlang

Java

Java

Scala&Java

协议支持

AMQP、XMPP、SMTP、STOPM

OpenWire、STOMP、REST、XMPP、AMQP

自定义协议

自定义协议

可用性

一般

单机吞吐量

一般

非常高

消息延迟

微妙

毫秒

毫秒

毫秒

消息可靠性

一般

一般

RabbitMQ基础架构图

  • Broker:接收和分发消息的应用。
  • Virtual Host:虚拟主机,多个不同的用户使用同一个RabbitMQ Server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建的exchange/queue等。
  • Connection:publisher / consumer 和 broker 之间TCP连接
  • Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
  • Exchange:交换机,消息到达Broker的第一站,根据分发规则,匹配查询表中的routing key,分到消息的queue中。常用的类型有:direct(point-to-point),topic(publish-subscribe) and fanout(multicast)
  • Queue:存放消息,最终被consumer取走
  • Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息保存到exchange中的查询表中,用于message的分发依据。
  • Producer(Publisher):消息发布者,发送消息
  • Consumer:消息消费者,处理消息

RabbitMQ 常见消息模型

  • 基本消息队列(BasicQueue)
  • 工作消息队列(WorkQueue)
  • 发布订阅模式- Fanout Exchange(广播)- Direct Exchange(路由)- Topic Exchange(主题)

SpringAMQP

  1. Spring AMQP Spring 框架提供的一个基于 AMQP 协议的消息队列框架,用于简化 Spring 应用程序对消息队列的使用。它难点在于减少了对 AMQP 协议的细节处理,提供了一个高级别的抽象,使得生产者和消费者可以用简单的方式与消息队列进行通信。
  2. 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层默认实现。
  3. 特征:
  4. 1.监听器容器,用于异步处理入站消息
  5. 2.用于发送和接收消息的RabbitTemplate
  6. 3.RabbitAdmin用于自动声明式队列,交换和绑定

SpringAMQP基础配置

  1. 1.引入依赖
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  1. 2.在**publisher**和**consumer**配置
  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1 #主机名
  4. port: 5672 #端口
  5. virtual-host: / #虚拟主机(默认)
  6. username: root #用户名
  7. password: 123 #密码

SpringAMQP实现RabbitMQ的基础消息队列(BasicQueue)

  1. ![](https://i-blog.csdnimg.cn/direct/194fd8f7d19a45ada639ddc2e9b09f82.png) 基础消息队列:一对一的消息传递,‌即一个消息只能被一个消费者消费。
  2. 1.在**Publisher**服务中使用RabbittTemplateconvertAndSend方法**发送消息**
  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class SpringAmqtTest{
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @Test
  7. public void testSendMessage2SimpleQueue(){
  8. String queueName = "simple.queue"; // 队列名称
  9. String message = "123";// 消息内容
  10. rabbitTemplate.convertAndSend(queueName, message);// 给simple.queue队列发送消息
  11. }
  12. }
  1. 2.在**Consumer**服务中**绑定simple.queue队列**,**接收消息**并编写消费逻辑
  1. @Component
  2. public class SpringRabbitListener {
  3. // 使用Rabbit监听器监听名称为simple.queue的队列。可同时监听多个队列。
  4. @RabbitListener(queues = "simple.queue")
  5. // 方法中的msg参数类型是根据消息发送者的内容来定的,发送和接收要保持一致。如果是一个对象那接收就是一个对象
  6. public void listenSimpleQueueMessage(String msg) throws Exception {
  7. System.out.println("spring 消费者接收到消息:" + msg);
  8. }
  9. }

SpringAMQP实现RabbitMQ的工作消息队列(WorkQueue)

  1. 工作消息队列:一对多的消息发布方式,‌即一个消息可以被多个消费者同时消费。‌
  2. **特点**:是一个消息只会被一个消费者消费,‌但多个消费者可以同时处理不同的消息,‌从而提高消息的处理速度。
  3. **弊端**:当多个消费者绑定一个队列时,消费者之间的消息分配是按照**轮询**的方式来消费消息,即consumer1消费者消费了1条消息后,轮到consumer2消费者消费消息,在consumer1处理消息完成前,consumer2消费者是不会处理消息的。这就导致如果其中一个消费者处理速度很慢,可能会影响其他的消费者闲置,不能有效地利用系统资源。
  4. 解决方案:
  5. 1.消息推送给能力较强的消费者(通过prefetch count
  6. RabbitMQ中,可以通过设置prefetch count来限制消费者每次从队列中取出的消息数量。这样做可以保证一个消费者不会因为处理速度慢而积累过多的消息,从而使其他消费者闲置。
  7. 2.基于权重的分发(通过消费者权重)
  8. RabbitMQ中,可以给每个消费者设置一个权重,这样RabbitMQ就会根据权重来分发消息。权重高的消费者会得到更多的消息,权重低的消费者会得到更少的消息。
  9. **实现WorkQueue模式**
  1. // publisher
  2. @Component
  3. public class publisherService {
  4. @Autowirte
  5. private RabbitTemplate rabbitTemplate;
  6. public void publisherSendMessage(){
  7. String queneName = "work.queue";
  8. String message = "123";
  9. for (int i = 0; i < 50; i++) {
  10. rabbitTemplate.convertAndSend(queueName, message);// 发送消息
  11. }
  12. }
  13. }
  1. // consumer监听队列。 可绑定多个消费者
  2. @Component
  3. public class ListenerQueue {
  4. @RabbitListener(queues = "work.queue")
  5. public void listenerMessage1(String msg) throws Exception {
  6. System.out.println("spring 消费者1111接收到消息:" + msg);
  7. }
  8. @RabbitListener(queues = "work.queue")
  9. public void listenerMessage2(String msg) throws Exception {
  10. System.out.println("spring 消费者2222222222接收到消息:" + msg);
  11. }
  12. }

SpringAMQP实现RabbitMQ的发布者订阅模式

  1. 允许将同一消息发送给多个消费者。实现方式是加入了**交换机(exchange)**,exchange负责消息路由,而不是存储,路由失败则消息丢失,queue能存储消息。
  2. **交换机作用:**
  3. 1.接收publisher发送的消息
  4. 2.将消息按照规则路由到与之绑定的队列
  5. 3.不能缓存消息,路由失败则消息丢失

** 常见的Exchange类型包括:**

** FanoutExchange(广播):会将接收到的消息路由到每个绑定的队列**

** DirectExchange(路由):会将接收到的消息根据规则路由到指定的队列,因此称为路由模式(routes)**

** TopicExchange(话题):会将接收到消息根据规则路由到指定的队列,与Direct类似,区别在于RoutingKey必须是多个单词的列表,并且以 . 分割。**

发布者订阅模式之FanoutExchange
  1. FanoutExchange会将接收到的消息路由到每一个跟其绑定的queue

  1. // 模拟FanoutExchange
  2. // 1.声明FanoutExchange交换机
  3. // 2.声明队列(多个队列就生成多个)
  4. // 3.绑定队列和交换机(多个队列可以绑定一个交换机)
  5. @Configuration
  6. public class FanoutConfig {
  7. // 声明FanoutExchange交换机
  8. @Bean
  9. public FanoutExchange fanoutExchange(){
  10. return new FanoutExchange("itcast.fanout");
  11. }
  12. // 声明队列1
  13. @Bean
  14. public Queue fanoutQueue1(){
  15. return new Queue("fanout.queue1");
  16. }
  17. // 把队列1跟交换机绑定,在绑定时,与队列方法名必须一致,如fanoutQueue1
  18. @Bean
  19. public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
  20. return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
  21. }
  22. // 声明队列2
  23. @Bean
  24. public Queue fanoutQueue2() {
  25. return new Queue("fanout.queue2");
  26. }
  27. // 把队列2跟交换机绑定
  28. @Bean
  29. public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
  30. return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
  31. }
  32. }
  33. // consumer监听队列
  34. public class ListenerQueue {
  35. @RabbitListener(queues = "fanout.queue1")
  36. public void listenerFanoutQueue1(String msg){
  37. System.out.println("spring 消费者1111接收到消息:" + msg);
  38. }
  39. @RabbitListener(queues = "fanout.queue2")
  40. public void listenerFanoutQueue2(String msg){
  41. System.out.println("spring 消费者22222222接收到消息:" + msg);
  42. }
  43. }
  44. // publisher发送消息
  45. @RunWith(SpringRunner.class)
  46. @SpringBootTest
  47. public class PublisherTest(){
  48. @Autowirt
  49. private RabbitTemplate rabbitTeplate;
  50. @Test
  51. public void fanoutSend(){
  52. String exchangeName = "itcast.fanout";// 交换机的名称,与声明交换机的名称保持一致
  53. String msg = "123";
  54. // 三个参数分别是:交换机名称、RoutingKey、消息内容
  55. rabbitTeplate.convertAndSend(exchangeName, "", msg);
  56. }
  57. }
发布者订阅模式之DirectExchange
  1. DirectExchange会将接收到的消息根据规则路由到指定的队列,因此称为路由模式(routes
  2. **特点:**

** 1.发布者发送消息时,指定消息的RoutingKey**

  1. **2.每个Queue都与Exchange设置一个BindingKey**
  2. **3.Exchange将消息路由到BindingKeyRoutingKey一致的Queue上**

  1. // consumer监听队列
  2. // 可通过@RabbitListener注解,同时声明并绑定队列、交换机、BindingKey。
  3. // bindings 可以绑定多个
  4. @component
  5. public class ListenerQueue {
  6. @RabbitListener(bindings = @QueueBinding(
  7. value = @Queue(name = "direct.queue1"),
  8. exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
  9. key = { "blue", "red"} // key可以配置一个或多个
  10. ))
  11. public void listenerDirectQueue1(String msg) {
  12. // ......
  13. }
  14. @RabbitListener(bindings = @QueueBinding(
  15. value = @Queue(name = "direct.queue2"),
  16. exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
  17. key = { "yellow", "red"}
  18. ))
  19. public void listenerDirectQueue2(String msg) {
  20. // ......
  21. }
  22. }
  23. // publisher发送消息
  24. @RunWith(SpringRunner.class)
  25. @SpringBootTest
  26. public class PublisherTest(){
  27. @Autowirt
  28. private RabbitTemplate rabbitTeplate;
  29. @Test
  30. public void directSend(){
  31. String exchangeName = "itcast.direct";// 交换机的名称,与声明交换机的名称保持一致
  32. String msg = "123";
  33. // 三个参数分别是:交换机名称、RoutingKey、消息内容
  34. rabbitTeplate.convertAndSend(exchangeName, "red", msg);
  35. }
  36. }
发布者订阅模式之TopicExchange
  1. **TopicExchange会将接收到消息根据根据规则路由到指定的队列,与Direct类似,区别在于RoutingKey必须是多个单词的列表,并且以 . 分割。**

** Queue和Exchange指定BindingKey时可以使用通配符:**

** #:代指0个或多个单词**

** :代指一个单词*

注意:只能在BindingKey中使用 # 和 * 通配符,RoutingKey必须是多个单词并用 . 分割

  1. // consumer监听队列
  2. @Component
  3. public class ListenerQueue {
  4. @RabbitListener(bindings = @QueueBinding(
  5. value = @Queue(name = "topic.queue1"),
  6. exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
  7. key = "chain.#"
  8. ))
  9. public void topicQueue1(String msg) {
  10. // .......
  11. }
  12. @RabbitListener(bindings = @QueueBinding(
  13. value = @Queue(name = "topic.queue2"),
  14. exchange = @Exchange(name = "itcast.topic" type = ExchangeTypes.TOPIC),
  15. key = "#.news"
  16. ))
  17. public void topicQueue2(String msg) {
  18. // .......
  19. }
  20. }
  21. // publisher发送消息
  22. @RunWith(SpringRunner.class)
  23. @SpringBootTest
  24. public class PublisherTest(){
  25. @Autowirt
  26. private RabbitTemplate rabbitTeplate;
  27. @Test
  28. public void topicSend(){
  29. String exchangeName = "itcast.topic";// 交换机的名称,与声明交换机的名称保持一致
  30. String msg = "123";
  31. // 三个参数分别是:交换机名称、RoutingKey、消息内容
  32. rabbitTeplate.convertAndSend(exchangeName, "chain.weather", msg);
  33. }
  34. }

本文转载自: https://blog.csdn.net/BabyOi/article/details/140508782
版权归原作者 云雨殇 所有, 如有侵权,请联系我们删除。

“RabbitMQ基础”的评论:

还没有评论