0


基于RabbitMQ的消息监听器

1. 背景

机构的新增、更新、删除在微服务A中已经完成了(微服务A已经部署,不能修改代码),如果在微服务A中对机构进行新增、更新、删除操作后,需要同步到自己的微服务B中,这里采用MQ消息通知的方式实现。

微服务A中配置如下:

消息发往的交换机为:itcast-auth,交换机的类型为:topic

发送消息的规则如下:

● 消息为json字符串
  ○ 如:{"type":"ORG","content":[{"managerId":"1","parentId":"0","name":"测试组织","id":"973902113476182273","status":true}],"operation":"UPDATE"}
● type表示变更的对象,比如组织:ORG 
● content为更改对象列表
● operation类型列表
  ○ 新增-ADD
  ○ 修改-UPDATE
  ○ 删除-DEL

2. 消息监听器

/**
 * 对于微服务A消息的处理
 */
@Slf4j
@Component
public class AuthMQListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT),
            exchange = @Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC),
            key = "#"
    ))
    public void listenAgencyMsg(String msg) {
        //{"type":"ORG","operation":"ADD","content":[{"id":"977263044792942657","name":"55","parentId":"0","managerId":null,"status":true}]}
        log.info("接收到消息 -> {}", msg);
        JSONObject jsonObject = JSONUtil.parseObj(msg);
        String type = jsonObject.getStr("type");
        if (!StrUtil.equalsIgnoreCase(type, "ORG")) {
            //非机构消息
            return;
        }
        String operation = jsonObject.getStr("operation");
        JSONObject content = (JSONObject) jsonObject.getJSONArray("content").getObj(0);
        String name = content.getStr("name");
        Long parentId = content.getLong("parentId");

        // 。。。消息处理。。。
    }

}

2.1 标记监听器

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT),
        exchange = @Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC),
        key = "#"
))
public void listenAgencyMsg(String msg) {
  • @RabbitListener:标记该方法为RabbitMQ的消息监听器。它会监听指定的队列并处理收到的消息。
  • @QueueBinding:将队列与交换机绑定。 - @Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT):指定要监听的队列的名称,这里在常量类里定义了。- @Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC):指定交换机的名称和类型(Topic),这里在常量类里定义了,与微服务A中配置相同。- key = "#":路由键,这里#表示匹配所有路由键。
  • listenAgencyMsg(String msg):当消息队列接收到消息时,会调用这个方法,并将消息内容传递进来。

2.2 消息解析

log.info("接收到消息 -> {}", msg);
JSONObject jsonObject = JSONUtil.parseObj(msg);
String type = jsonObject.getStr("type");
if (!StrUtil.equalsIgnoreCase(type, "ORG")) {
    //非机构消息
    return;
}
String operation = jsonObject.getStr("operation");
JSONObject content = (JSONObject) jsonObject.getJSONArray("content").getObj(0);
String name = content.getStr("name");
Long parentId = content.getLong("parentId");
  • log.info("接收到消息 -> {}", msg):记录接收到的消息日志。

  • JSONObject jsonObject = JSONUtil.parseObj(msg):将消息字符串解析为JSON对象。

  • String type = jsonObject.getStr("type"):从消息中提取type字段。

  • if (!StrUtil.equalsIgnoreCase(type, "ORG")) { return; }:判断消息类型是否为“ORG”,如果不是,直接返回不做处理。

  • 提取operation字段:操作类型(如ADD、UPDATE、DEL)。

  • 提取content内容:content字段是一个数组,这里取第一个对象。

  • 提取name字段:表示机构的名称。

  • 提取parentId字段:表示父机构的ID。

3. RabbitMQ介绍

RabbitMQ是一种广泛使用的消息队列(Message Queue)系统,它基于AMQP(Advanced Message Queuing Protocol)协议,用于在不同的系统或组件之间传递消息。通过消息队列,系统可以实现解耦、异步处理、负载均衡等特性,从而提高系统的可扩展性和可靠性。

3.1 RabbitMQ的核心概念

  1. 生产者(Producer)- 生产者是消息的发送方。它负责将消息发送到RabbitMQ的交换机中。
  2. 消费者(Consumer)- 消费者是消息的接收方。它从RabbitMQ的队列中获取并处理消息。
  3. 队列(Queue)- 队列是RabbitMQ内部存储消息的地方。消息从生产者发送到队列中,消费者从队列中获取消息。队列类似于一个消息的存储池。
  4. 交换机(Exchange)- 交换机负责接收生产者发送的消息,并根据一定的路由规则将消息路由到一个或多个队列。交换机有不同的类型,常见的有: - Direct Exchange:直接交换机,根据消息的路由键精确匹配队列。- Fanout Exchange:扇出交换机,不考虑路由键,直接将消息广播到所有绑定的队列中。- Topic Exchange:主题交换机,根据路由键的模式匹配(使用通配符)将消息路由到一个或多个队列。- Headers Exchange:头交换机,通过消息的头部属性来路由消息。
  5. 路由键(Routing Key)- 路由键是生产者在将消息发送到交换机时指定的一个字符串。交换机会根据这个字符串决定将消息路由到哪个队列。
  6. 绑定(Binding)- 绑定是交换机和队列之间的连接关系。通过绑定,可以将交换机和队列关联起来,并通过路由键决定消息的流向。

3.2 消息的生命周期

  1. 生产者发送消息- 生产者将消息发送到交换机,并指定一个路由键。
  2. 交换机路由消息- 交换机根据路由键和绑定规则,将消息路由到一个或多个队列中。
  3. 消费者接收消息- 消费者从队列中取出消息并进行处理。处理完成后,消费者可以向RabbitMQ发送一个确认消息(ACK),告知RabbitMQ该消息已成功处理。
  4. 消息确认与重试- 如果消费者处理消息失败,可以选择不发送确认消息,RabbitMQ会将消息重新放回队列,等待其他消费者处理,或进行重试。

3.3 RabbitMQ的常见使用场景

  1. 解耦- 在复杂系统中,各个组件之间可能有很强的依赖性。通过消息队列,生产者和消费者可以实现解耦,生产者只需将消息发送到队列,不需要关心谁会处理这些消息。
  2. 异步处理- 有些任务可能是耗时操作,例如生成报告、图片处理等。通过消息队列,系统可以将这些耗时操作异步处理,不会阻塞主流程。
  3. 负载均衡- RabbitMQ可以将消息分发给多个消费者,从而实现负载均衡。即使流量高峰期,消息处理也不会成为系统瓶颈。
  4. 消息广播- 通过Fanout Exchange,可以实现消息广播,将同一消息同时发送给多个队列,让多个系统或服务同时收到消息并处理。

4. 总结

这段代码的主要作用是通过监听RabbitMQ消息队列,处理微服务A中与机构相关的消息。在微服务B中通过解析消息内容,动态确定消息的类型和需要执行的操作,并调用相应的服务处理该消息。这种设计可以有效地处理异步消息,并将业务逻辑与消息队列解耦。


本文转载自: https://blog.csdn.net/qq_46637011/article/details/141328385
版权归原作者 cyt涛 所有, 如有侵权,请联系我们删除。

“基于RabbitMQ的消息监听器”的评论:

还没有评论