0


rabbitMQ大致讲解

1.基本概况

    MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常 见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不 用依赖其他服务。

    RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。

2.使用场景

    **学习或者使用一项技术,首先要先知道他能够解决那些痛点,而不要过分关注如何使用它,先理解后使用**
  • 异步通信:- 将耗时的任务异步执行,以提高系统的响应性。生产者将任务放入消息队列,而消费者从队列中获取任务并执行。
  • 系统解耦:- 在分布式系统中,通过消息队列实现各个组件之间的解耦,使得系统更加灵活、可扩展,并降低各个组件之间的依赖性。
  • 削峰填谷:- 处理流量峰值,通过消息队列缓冲和平滑大量的请求,以防止系统过载。生产者将请求发送到队列,而消费者按照其处理能力逐渐处理这些请求。
  • 日志处理:- 在分布式系统中,将日志异步发送到消息队列,然后由日志处理服务消费,以便进行日志聚合、监控和分析。
  • 事件驱动架构:- 构建事件驱动的体系结构,通过消息队列进行组件之间的通信。当一个组件触发事件时,它将消息发布到队列,而其他组件则订阅并处理这些事件。
  • 任务分发和调度:- 将任务分发到多个工作者节点,以实现任务的并行处理。生产者将任务放入队列,而多个消费者从队列中获取任务并执行。
  • 订单处理:- 在电子商务系统中,通过消息队列处理订单和支付流程。当用户下订单时,将订单信息发送到队列中,而订单处理服务异步处理这些订单。
  • 微服务通信:- 在微服务架构中,通过消息队列实现微服务之间的通信。微服务可以通过发布和订阅消息进行协作,而不直接调用彼此的 API。
  • 实时数据处理:- 将实时生成的数据通过消息队列传递给分析引擎或仪表板,以便实时监控和分析系统的状态。
  • 发布/订阅模式:- 实现发布/订阅模式,使得多个消费者可以订阅并处理同一类事件。这在通知系统和广播消息时非常有用。

3.大体介绍

3.1四大核心概念

            生产者(publisher)->交换机(exchange)->队列(queues)->消费者(consumer)

3.2核心部分

            取自官网![](https://img-blog.csdnimg.cn/direct/ac704795537d4ece8308a62e6a14c868.png)
3.2.1 生产者
    RabbitMQ 中的生产者负责将消息发送到队列。

创建一个简单的生产者通常需要以下步骤:

  1. 创建连接工厂: 使用 ConnectionFactory 创建连接到 RabbitMQ 服务器的连接。
  2. 创建连接: 调用 newConnection 方法创建一个连接对象。
  3. 创建通道: 调用 createChannel 方法创建一个通道,所有的操作都在通道上进行。
  4. 声明交换器和队列: 使用 exchangeDeclare 方法声明交换器,使用 queueDeclare 方法声明队列。
  5. 发布消息: 使用 basicPublish 方法发布消息到交换器。
import com.rabbitmq.client.*;

public class SimpleProducer {
    private static final String EXCHANGE_NAME = "my_direct_exchange";
    private static final String ROUTING_KEY = "my_routing_key";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        // 创建连接
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明直连交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);

            // 发布消息
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());
            
            System.out.println("Message sent: " + MESSAGE);
        }
    }
}

消息的基本属性

使用

basicPublish

方法发布消息时,可以设置消息的基本属性,例如:

  • 交换器名称(Exchange Name): 指定消息发送的交换器。
  • Routing Key(路由键): 对于 Direct、Topic 类型的交换器,消息的路由键与绑定键进行匹配,以决定将消息路由到哪个队列。
  • BasicProperties(基本属性): 可以设置消息的基本属性,如消息的 ID、内容类型、优先级等。

发布确认机制

RabbitMQ 提供了发布确认机制,用于确保消息成功发送到交换器。可以通过调用

confirmSelect

方法开启确认模式,并通过

waitForConfirms

waitForConfirmsOrDie

方法等待确认。

// 开启发布确认模式
channel.confirmSelect();

// 发布消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());

// 等待确认
if (channel.waitForConfirms()) {
    System.out.println("Message sent successfully!");
} else {
    System.err.println("Failed to send message!");
}
3.2.2 exchange(交换器)
//声明交换器的方法
public DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException {
        return this.exchangeDeclare(exchange, type, durable, autoDelete, false, arguments);
    }
  1. exchange(交换器名称):- 类型:String- 描述:要声明的交换器的名称。

  2. type(交换器类型,下方有对交换器类型的详解):- 类型:String- 描述:交换器的类型,有四种类型可以选择: - direct: 将消息路由到与消息的 routing key 完全匹配的队列。- fanout: 将消息广播到所有与交换器绑定的队列。- topic: 将消息路由到与消息的 routing key 匹配的队列,支持通配符。- headers: 使用消息的 headers 属性进行匹配。

  3. durable(持久性):- 类型:boolean- 描述:如果设置为 true,交换器将在服务器重启后仍然存在(持久性交换器)。如果设置为 false,交换器将在服务器重启后被删除(非持久性交换器)。

  4. autoDelete(自动删除):- 类型:boolean- 描述:如果设置为 true,交换器将在至少有一个队列与之绑定时存在。当所有与交换器绑定的队列都解绑时,交换器将被删除。

  5. arguments(其他参数):- 类型:Map<String, Object>- 描述:其他的一些交换器参数,以键值对的形式传递。例如,在使用 Headers Exchange 类型时,可以通过设置 x-match 参数来定义匹配规则。

      生产者推送到具体交换器,交换器推送到零个或多个队列中,具体推送规则取决于交换器类型以及相应的路由规则
    

Exchange type(交换器类型)Default pre-declared namesDirect exchange(直接)(Empty string) and amq.directFanout exchange(扇出)amq.fanoutTopic exchange(主题)amq.topicHeaders exchange(标头)amq.match (and amq.headers in RabbitMQ)
交换器还有许多其他属性,例如名字,持久性(rabbitMQ重启之后是否仍然存在该交换器),自动删除(最后一个队列与交换器解除绑定之后删除),参数(可选,由插件或其他特定功能使用)

  1. 默认交换器

           他是一个没有名称的直接交换器(名字为空字符串),它会使默认没有绑定的队列自动绑定到该交换器上,**默认交换器不支持绑定/解绑操作,如果进行该操作会报错**
    
    1. 直接交换器

          他会根据路由key直接将消息推送到响应名称队列中
      

     如上图,a队列绑定了aa,b队列绑定了bb,cc这两个键值,exchange就会根据消息所包含的路由来进行消息的推送选择

      3.fanout扇出交换器

            该交换器会将消息发送到所有与他绑定的队列中,从而忽视路由键的关联

     4.topic主题交换器

            该交换器可以更加方便的进行自定义队列推送,适用于比较复杂的业务场景

    ![](https://img-blog.csdnimg.cn/direct/32ccdd1ffd2145588ba613163e84764c.png)![](https://img-blog.csdnimg.cn/direct/e8f5635778744882ba15a8bf91a4ac9a.png)

           ** 注意:1.当一个队列绑定键是#,那么他将接收所有交换器推送的消息**

** 2.如果队列中没有#与好出现,那么该队列就如同direct一样的效果*

** 5.标头交换器**

** ** 他与直接交换器功能个差不多一致只是传参不一样,从下面源码可以看出,标头交换器是根据arguments进行路由选择的

             声明标头交换器的时候可以设置是全部匹配还是部分匹配(下述为示例代码)
// 声明 Header Exchange,并设置匹配模式为 "any"(部分匹配即可),"all"(需要全部匹配才可)
            Map<String, Object> exchangeArgs = new HashMap<>();
            exchangeArgs.put("x-match", "any");
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS, false, false, exchangeArgs);
3.2.3 queue(队列)
     相关属性name(队列名称),Durable(队列在服务端重启之后是否仍然存在),Exclusive(独占,只由一个队列连接使用,当该链接关闭时,删除连接),Auto-delete(自动删除,如果没有消费者连接到这个队列,那么队列将被自动删除),Arguments(可选;由插件和代理特定功能(如消息 TTL、队列长度限制等)使用)
//该方法为声明队列的方法,关于其中的参数下述进行详解
public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
        validateQueueNameLength(queue);
        return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();
    }
  1. queue(队列名称):- 类型:String- 描述:要声明的队列的名称。如果不指定队列名称,RabbitMQ 将会为队列生成一个唯一的名称。
  2. durable(持久性):- 类型:boolean- 描述:如果设置为 true,队列将在服务器重启后仍然存在(持久性队列)。如果设置为 false,队列将在服务器重启后被删除(非持久性队列)。
  3. exclusive(排他性):- 类型:boolean- 描述:如果设置为 true,队列将只允许当前连接创建的消费者使用。一旦连接关闭,队列将被删除。
  4. autoDelete(自动删除):- 类型:boolean- 描述:如果设置为 true,队列将在至少有一个消费者连接到它时存在。当所有消费者断开连接时,队列将被删除。
  5. arguments(其他参数):- 类型:Map<String, Object>- 描述:其他的一些队列参数,以键值对的形式传递。例如,可以通过设置 x-message-ttl 参数来定义消息的过期时间。

返回值为

AMQP.Queue.DeclareOk

,其中包含有关已声明队列的一些信息。

3.2.4Bindings (绑定)
            用来绑定交换机与队列的路由关系,也可以进行交换器与交换器关系绑定(一般用不上)
//队列绑定
public com.rabbitmq.client.impl.AMQImpl.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException {
        validateQueueNameLength(queue);
        return (com.rabbitmq.client.impl.AMQImpl.Queue.BindOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Bind.Builder()).queue(queue).exchange(exchange).routingKey(routingKey).arguments(arguments).build()).getMethod();
    }
//交换器绑定
public BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException {
        return (BindOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Exchange.Bind.Builder()).destination(destination).source(source).routingKey(routingKey).arguments(arguments).build()).getMethod();
    }
3.2.5 消费者
            rabbitMQ是进行消息转发,不涉及业务处理,需要创建消费者来处理队列中的数据

             主要使用步骤为:
  1. 创建连接工厂: 使用 ConnectionFactory 创建连接到 RabbitMQ 服务器的连接。
  2. 创建连接: 调用 newConnection 方法创建一个连接对象。
  3. 创建通道: 调用 createChannel 方法创建一个通道,所有的操作都在通道上进行。
  4. 声明队列: 使用 queueDeclare 方法声明要消费的队列。
  5. 创建消费者: 创建 Consumer 对象,实现 DefaultConsumer 抽象类的 handleDelivery 方法,该方法在接收到消息时被调用。
  6. 订阅队列: 调用 basicConsume 方法订阅队列,传入队列名称和消费者对象。
import com.rabbitmq.client.*;

public class SimpleConsumer {
    private static final String QUEUE_NAME = "my_queue";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        // 创建连接
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 处理接收到的消息
                    String message = new String(body, "UTF-8");
                    System.out.println("Received message: " + message);
                }
            };

            // 订阅队列
            channel.basicConsume(QUEUE_NAME, true, consumer);

            // 等待接收消息,此处可以添加逻辑来保持程序运行
            Thread.sleep(10000);
        }
    }
}

消费者的基本属性

  • Consumer Tag(消费者标签): 在调用 basicConsume 时,可以传入一个消费者标签。消费者标签用于标识消费者,可以在取消订阅时使用。如果不提供标签,RabbitMQ 将生成一个唯一的标签。
  • Auto Acknowledgment(自动应答): 在调用 basicConsume 时,可以传入一个布尔值表示是否自动应答。如果设置为 true,一旦消息被接收,就会自动向 RabbitMQ 发送应答。如果设置为 false,需要在处理完消息后手动调用 basicAck 方法发送应答。
  • Delivery(传送信息):handleDelivery 方法的参数 Delivery 包含有关消息的信息,如交付标签、路由键、交换器等。
  • BasicProperties(基本属性):handleDelivery 方法的参数 BasicProperties 包含有关消息的基本属性,如消息的 ID、内容类型、优先级等。

消费者的应答机制

RabbitMQ 提供了两种消费者应答机制:

  • 自动应答: 设置 autoAck 参数为 true,一旦消息被接收,就会自动向 RabbitMQ 发送应答。
  • 手动应答: 设置 autoAck 参数为 false,在处理完消息后,需要手动调用 basicAck 方法发送应答。

自动应答简化了应用程序的逻辑,但在某些情况下可能导致消息的丢失。手动应答允许应用程序更细粒度地控制消息的确认。选择使用哪种应答机制取决于应用程序的需求和消息处理的语义。

消费者的错误处理

在实际应用中,消费者应该具备良好的错误处理机制。可以捕获异常并根据需要执行适当的操作,例如记录日志、重试、拒绝消息等。

@Override
public void handleDelivery(String consumerTag, Envelope envelope,
                           AMQP.BasicProperties properties, byte[] body) throws IOException {
    try {
        // 处理接收到的消息
        String message = new String(body, "UTF-8");
        System.out.println("Received message: " + message);

        // 模拟处理中的异常
        if (message.contains("error")) {
            throw new RuntimeException("Simulated error during message processing");
        }

        // 手动应答
        channel.basicAck(envelope.getDeliveryTag(), false);
    } catch (Exception e) {
        // 异常处理逻辑
        System.err.println("Error processing message: " + e.getMessage());

        // 可以选择拒绝消息并重新入队,或者进行其他错误处理
        channel.basicNack(envelope.getDeliveryTag(), false, true);
    }
}

本文转载自: https://blog.csdn.net/m0_67920613/article/details/135537048
版权归原作者 自律_平庸 所有, 如有侵权,请联系我们删除。

“rabbitMQ大致讲解”的评论:

还没有评论