0


RabbitMQ笔记

RabbitMQ

消息队列是实现应用程序与应用程序进行通信的中间件产品,可以实现各个微服务之间的异步操作。
好处:降低系统的耦合度、(用户服务的)快速响应、削峰限流、减少并发压力、便于系统功能的拓展。

RabbitMQ的体系结构介绍

1.Producer为生产者(发送消息),通过Channel与消息中间件Broker连接。
2.Broker中包含多个Virtual Host,每一个Virtual Host就是一个虚拟分组,用户在自己的Virtual Host中使用RabbitMQ组件。在实际开发中,通过Virtual Host区分不同项目、不同功能。
3.Virtual Host包含多个Exchange交换机,Exchange交换机是消息达到Broker的第一站。
4.Queue,消息队列,是消息的容器。消息放在这里等待被消费端取走。Exchange绑定一个或多个队列进行消息的发送。
5.Consumer消费者,通过Channel与Broker连接,监听一个或多个队列,若队列中有消息则会取走消费,(若成功消费发送确认信息)同时在队列中删除消息。

安装

# 拉取镜像
docker pull rabbitmq:3.13-management
# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-management

后台管理

http://192.168.200.100:15672为后台管理界面,可以创建队列,监控消息等。

1.RabbitMQ工作模式

RabbitMQ官网列举了7种RabbitMQ用法,其中Publish/Subscribe、Routing和Topic最为常见。

1.1 Publish/Subscribe 发布订阅模式

生产者不是把消息直接发送到队列,而是发送到交换机,交换机接收消息,而如何处理消息取决于交换机的类型。

  • 交换机有如下3种常见类型 1.Fanout:广播,将消息发送给所有绑定到交换机的队列 2.Direct:定向,把消息交给符合指定routing key的队列 3.Topic:通配符,把消息交给符合routing pattern(路由模式)的队列。 消息发送到Fanout交换机上,就会以广播的形式发送给所有已绑定队列,而监听对应队列的消费者则可以从队列中取走消息。
  • 注意
  1. 前两种简单的工作模式不是没有交换机,而是采用了默认的交换机。
  2. 若多个服务监听同一个队列,则会对消息产生争抢,谁抢到谁消费。

1.2 Routing

  • 基本概念 1.通过路由绑定的方式,把交换机和队列关联起来,交换机队列通过**路由键(string)**进行绑定。 2.生产者发送消息时不仅要指定交换机,还要指定路由键。 3.交换机接收到消息会发送到路由键绑定的队列 4…交换机的类型为:Direct 5.队列绑定交换机的时候需要指定routing key

1.3 Topics

  • 基本概念 Topic类型与Direct相比,都是可以根据RoutingKey把消息通过路由发送到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用 通配符。
  • 注意事项 1.Routingkey一般都是由一个或多个单词组成,多个单词之间以.分割,例如:item.insert。 2.通配符规则:#:匹配零个或多个词; *:匹配一个词

1.4 其他

RPC远程过程调用

本质上是同步调用,不是经典的消息队列的工作方式,和我们使用OpenFeign调用远程接口一样。

  • 补充 RPC远程过程调用,对于Java而言简单来说就是远程方法调用,A服务想要调用B服务(在不同的线程甚至不同的服务器上)的方法,就要使用RPC。 A通过socket、websocket或者消息中间件等方法传递给B中方法所需的参数,b在本地调用后得到结果再返回,就完成了远程过程调用。

Publisher Confirms

发送端消息确认,再后续消息可靠性详细介绍。

2.RabbitMQ整合SpringBoot

2.1 大致流程

  1. RabbitMQ服务器启动RabbitMQ
  2. 建module、改pom、配yml、主程序 相关依赖
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>

配置项

spring:
  rabbitmq:
    host: xxx
    port: xxx
    username:xxx 
    password: xxx
    virtual-host: /
logging:
  level:
    com.atguigu.mq.listener.MyMessageListener: info
  1. 配置类 用作RabbitTemplate的增强
  2. 业务代码开发 发送消息:使用RabbitTemplate 接收消息:使用@RabbitListener注解(processMessage方法的注解) @QueueBinding绑定基本信息 @Queue给value赋值,QUEUE_NAME队列名,durable 为是否持久化 @Exchange给exchange 赋值,EXCHANGE_DIRECT为交换机名 key = {ROUTING_KEY} 设置路由键,此处用{}是因为可能有多个路由键,要以数组的方式赋值

3.消息可靠性

3.1 生产端确认

问题:生产端消息未发送成功,导致信息丢失
解决1:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送。
解决2:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机,再由备份交换机传输到绑定的队列上

3.2 持久化

问题:消息正常发送到队列,还未消费,MQ服务器宕机,导致信息丢失。
解决:消息持久化到硬盘上,服务器重启后直接加载,不会导致消息丢失。
创建交换机和队列时,默认为持久化队列,会自动对其中的消息进行持久化,无需专门设置。
若对消息的重要性要求低,可使用临时的交换机和队列(transient),此时消息不会被持久化,而且重启服务后临时的交换机与队列都会消失。

3.3 消费端确认

问题:生产者取走消息但消费失败,导致消息失效
解决:消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息。消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)

补充

1. 交付标签机制deliveryTag

每一个消息进入队列时,broker都会生成一个唯一标识deliveryTag,是一个64位整数,消息向消费者端进行投递时会携带该信息。
消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作,例如删除消息、重新排队或标记为死信等等那么Broker就必须知道它现在要操作的消息具体是哪一条。而deliveryTag作为消息的唯一标识就很好的满足了这个需求。
对于不同队列中的同一条消息,会产生不同的deliveryTag!

2. 消息确认相关方法参数说明:multiple

在返回消息是参数multile若为true,则会处理当前消息及其在队列中之前的消息,而为了避免不必要的差错,我们一般都设置为false。
此时,NACK、Reject类似,都是返回否定消息,而reject不允许设置multile。

4.消费端限流

问题:若消费者端因为某些原因如 执行效率较慢、生产者生产消息的速度相对快、消费者端重启遇到了大量的积压消息等,都会一次性取走过多的消息导致消费者端的压力过大。
解决:配置prefetch 进行削峰限流
限制每次去走的消息数量,减缓消费者端的服务压力。

spring:
  rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 设置消费者端的回复确认为手动回复(默认为自动)!!
        prefetch: 10 # 设置每次最多从消息队列服务器取回多少消息

5.消息超时

1.队列层面设置:该队列中所有的消息都会设置有效时间
2.消息层面设置

importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;@TestpublicvoidtestSendMessageTTL(){// 1、创建消息后置处理器对象  MessagePostProcessor messagePostProcessor =(Message message)->{// 设定 TTL 时间,以毫秒为单位
        message.getMessageProperties().setExpiration("5000");return message;};// 2、发送消息  
    rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Hello atguigu", messagePostProcessor);}

若消息与队列都设置了有效时间,按照短的计算。

6.死信

6.1 产生死信的三个原因

  • 1.消息超时未消费
  • 2.消费端取走后消费失败basicNack()/basicReject()且拒绝重新放入队列(requeue=false)
  • 3.溢出:消息数量超过队列容纳极限(会将队头消息挤入死信队列)

6.2 死信交换机与死信队列

死信交换机与死信队列其实与常规的无异,若想要将某个队列的死信传入死信队列,需要以下步骤:
1.创建死信交换机与死信队列并绑定
2.创建常规交换机与交换队列并绑定
3.设置正常队列的dead-letter-exchange和dead-letter-routing-key参数,与死信交换机绑定
4.消费端监听死信队列,对死信做特殊处理

7.延迟队列

7.1 基于死信的延迟队列

如果想要做延时任务,可利用常规队列设置超时时间,再绑定死信队列,消费端只监听死信队列,从而实现目的。

7.2 延迟插件 rabbitmq_delayed_message_exchange

插件网址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
插件下载启用后,在managementUI中创建交换机时,延时交换机类型设置为x-delayed-message,并额外设置x-delayed-type为direct、fanout、topic其中的一种。
生产者端正常发送带有ttl的消息给延时交换机
效果:使用rabbitmq_delayed_message_exchange插件后,即使该延时消息成功发送到队列上,也会导致returnedMessage()方法执行

8.事务

  • 总结: 1.在生产者端使用事务消息和消费端没有关系 2.在生产者端使用事务消息仅仅是控制事务内的消息是否发送 3.提交事务就把事务内所有消息都发送到交换机 4.回滚事务则事务内任何消息都不会被发送 5.MQ的事务对消费者端无效!

9.惰性队列

9.1 惰性队列的特征

  • 1.接收到消息后直接存入磁盘而非内存
  • 2.消费者要消费消息时才会从磁盘中读取并加载到内存
  • 3.支持数百万条的消息存储

9.2 如何创建惰性队列?

9.2.1 基于策略方式设定

# 登录Docker容器
docker exec -it rabbitmq /bin/bash

# 运行rabbitmqctl命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读:

  • rabbitmqctl命令所在目录是:/opt/rabbitmq/sbin,该目录已配置到Path环境变量
  • set_policy是子命令,表示设置策略
  • Lazy是当前要设置的策略名称,是我们自己自定义的,不是系统定义的
  • "^lazy-queue$"是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置
  • '{“queue-mode”:“lazy”}'是一个JSON格式的参数设置指定了队列的模式为"lazy"
  • –-apply-to参数指定该策略将应用于队列(queues)级别
  • 命令执行后,所有名称符合正则表达式的队列都会应用指定策略,包括未来新创建的队列

9.2.2 在声明队列时使用参数设定

应用场景

由于各种原因,如消费者离线/崩溃/停机进行维护、突然出现消息进入高峰,生产者的速度超过了消费者、消费者比正常情况慢等原因导致的消息堆积。

10.优先级队列

设置优先级之后:优先级高的消息更大几率先投递而不是完全的先进先出。

10.1 如何创建优先级队列?

在创建队列时,设置x-max-priority(最大优先级数)即可,默认是0。
当x-max-priority为0时,给消息设置优先级也不会生效。

10.2 如何给消息设置优先级?

  @Test
  public void testSendMessage() {
      rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{
          message.getMessageProperties().setPriority(1); //设置优先级为1
          return message;
      });
  }

11.集群

采用MQ集群可以避免单点故障 大流量场景分摊负载 数据同步等功能。

创建集群

详见 Operation016-Cluster.md 文档。

异地容灾

1.联邦交换机

通过网络将两个关联的交换机消息进行传输同步(两个都有)。

2.Shovel插件

通过网络将一部分消息全部挖到另一台关联的服务器上,要配置shovel的消息源和接收者。

对比

  • Shovel和Federation的主要区别:
  1. Shovel更简洁一些
  2. Federation更倾向于跨集群使用,而Shovel是否跨集群都可以
  3. Shovel源队列中的消息经过数据转移后相当于被消费了

12 java代码sample

1.配置类,用于增强RabbitTemplate,使得消息确认的消息可以自己发送,(同时要注意更新配置项acknowledge-mode: manual )

@Configuration@Slf4j//ConfirmCallback、ReturnsCallback 是 RabbitTemplate 的内部类(接口)publicclassRabbitConfigimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{//自动注入RabbitTemplate@AutowiredprivateRabbitTemplate rabbitTemplate;//增强RabbitTemplate@PostConstruct//在组件创建完成后立刻执行该方法,要求方法没有返回值,非私有,无参数publicvoidinitRabbitTemplate(){
        rabbitTemplate.setConfirmCallback(this);//参数要求传入ConfirmCallback实现类,this就是
        rabbitTemplate.setReturnsCallback(this);//参数要求传入ReturnsCallback实现类,this就是}@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){// 消息发送到交换机成功或失败时调用这个方法
        log.info("confirm() 回调函数打印 CorrelationData:"+ correlationData);
        log.info("confirm() 回调函数打印 ack:"+ ack);
        log.info("confirm() 回调函数打印 cause:"+ cause);}@OverridepublicvoidreturnedMessage(ReturnedMessage returned){// 发送到队列失败时才调用这个方法
        log.info("returnedMessage() 回调函数 消息主体: "+newString(returned.getMessage().getBody()));
        log.info("returnedMessage() 回调函数 应答码: "+ returned.getReplyCode());
        log.info("returnedMessage() 回调函数 描述:"+ returned.getReplyText());
        log.info("returnedMessage() 回调函数 消息使用的交换器 exchange : "+ returned.getExchange());
        log.info("returnedMessage() 回调函数 消息使用的路由键 routing : "+ returned.getRoutingKey());}}

2.生产者模块

@SpringBootTest
public class RabbitMQTest {
    
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String EXCHANGE_TIMEOUT = "exchange.test.timeout";
    public static final String ROUTING_KEY = "order";
    public static final String ROUTING_KEY_TIMEOUT = "routing.key.test.timeout";
    public static final String EXCHANGE_NORMAL = "exchange.normal.video";
    public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";
    public static final String EXCHANGE_DELAY = "exchange.test.delay";
    public static final String ROUTING_KEY_DELAY = "routing.key.test.delay";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test01SendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY + "~", "Message Test Confirm~~~ ~~~");
    }

    @Test
    public void test02SendMessage() {
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "Test Prefetch " + i);
        }
    }

    @Test
    public void test03SendMessage() {
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT, ROUTING_KEY_TIMEOUT, "Test timeout " + i);
        }
    }

    @Test
    public void test04SendMessage() {

        // 创建消息后置处理器对象
        MessagePostProcessor postProcessor = message -> {

            // 设置消息的过期时间,单位是毫秒
            message.getMessageProperties().setExpiration("7000");

            return message;
        };

        rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT, ROUTING_KEY_TIMEOUT, "Test timeout", postProcessor);
    }

    @Test
    public void testSendMultiMessage() {
        for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend(
                    EXCHANGE_NORMAL,
                    ROUTING_KEY_NORMAL,
                    "测试死信情况2:消息数量超过队列的最大容量" + i);
        }
    }

    @Test
    public void test05SendMessageDelay() {

        // 创建消息后置处理器对象
        MessagePostProcessor postProcessor = message -> {

            // 设置消息过期时间(以毫秒为单位)
            // x-delay 参数必须基于 x-delayed-message-exchange 插件才能生效
            message.getMessageProperties().setHeader("x-delay", "10000");

            return message;
        };

        // 发送消息
        rabbitTemplate.convertAndSend(
                EXCHANGE_DELAY,
                ROUTING_KEY_DELAY,
                "Test delay message by plugin " + new SimpleDateFormat("HH:mm:ss").format(new Date()),
                postProcessor);
    }

    public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
    public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";

    @Test
    public void test06SendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "message test proirity 3", message -> {

            // 消息本身的优先级数值
            // 切记:不能超过 x-max-priority:    10
            message.getMessageProperties().setPriority(3);

            return message;
        });
    }
}

3.消费者模块

package com.atguigu.mq.listener;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class MyMessageListener {
    public static final String QUEUE_NAME  = "queue.order";
    public static final String QUEUE_NORMAL = "queue.normal.video";
    public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";
    public static final String QUEUE_DELAY = "queue.test.delay";

    // @RabbitListener(queues = {QUEUE_NAME})
    public void processMessage(String dataString, Message message, Channel channel) throws IOException {

        // 获取当前消息的 deliveryTag
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            // 核心操作
            log.info("消费端 消息内容:" + dataString);

            System.out.println(10 / 0);

            // 核心操作成功:返回 ACK 信息
            channel.basicAck(deliveryTag, false);

        } catch (Exception e) {

            // 获取当前消息是否是重复投递的
            //      redelivered 为 true:说明当前消息已经重复投递过一次了
            //      redelivered 为 false:说明当前消息是第一次投递
            Boolean redelivered = message.getMessageProperties().getRedelivered();

            // 核心操作失败:返回 NACK 信息
            // requeue 参数:控制消息是否重新放回队列
            //      取值为 true:重新放回队列,broker 会重新投递这个消息
            //      取值为 false:不重新放回队列,broker 会丢弃这个消息

            if (redelivered) {
                // 如果当前消息已经是重复投递的,说明此前已经重试过一次啦,所以 requeue 设置为 false,表示不重新放回队列
                channel.basicNack(deliveryTag, false, false);
            } else {
                // 如果当前消息是第一次投递,说明当前代码是第一次抛异常,尚未重试,所以 requeue 设置为 true,表示重新放回队列在投递一次
                channel.basicNack(deliveryTag, false, true);
            }

            // reject 表示拒绝
            // 辨析:basicNack() 和 basicReject() 方法区别
            // basicNack()能控制是否批量操作
            // basicReject()不能控制是否批量操作
            // channel.basicReject(deliveryTag, true);
        }
    }

    @RabbitListener(queues = {QUEUE_NAME})
    public void processMessageTestPrefetch(String dataString, Message message, Channel channel) throws IOException, InterruptedException {
        log.info("消费端 消息内容:" + dataString);

        TimeUnit.SECONDS.sleep(1);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = {QUEUE_NORMAL})
    public void processMessageNormal(Message message, Channel channel) throws IOException {
        // 监听正常队列,但是拒绝消息
        log.info("★[normal]消息接收到,但我拒绝。");
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = {QUEUE_DEAD_LETTER})
    public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {
        // 监听死信队列
        log.info("★[dead letter]dataString = " + dataString);
        log.info("★[dead letter]我是死信监听方法,我接收到了死信消息");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = {QUEUE_DELAY})
    public void processMessageDelay(String dataString, Message message, Channel channel) throws IOException {
        log.info("[delay message][消息本身]" + dataString);
        log.info("[delay message][当前时间]" + new SimpleDateFormat("HH:mm:ss").format(new Date()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    public static final String QUEUE_PRIORITY = "queue.test.priority";

    @RabbitListener(queues = {QUEUE_PRIORITY})
    public void processMessagePriority(String dataString, Message message, Channel channel) throws IOException {
        log.info("[priority]" + dataString);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
标签: rabbitmq 笔记

本文转载自: https://blog.csdn.net/weixin_47114534/article/details/142065773
版权归原作者 weixin_47114534 所有, 如有侵权,请联系我们删除。

“RabbitMQ笔记”的评论:

还没有评论