0


【消息队列】RabbitMQ实现消费者组机制

1. RabbitMQ 的 发布订阅模式

https://www.rabbitmq.com/tutorials/tutorial-three-go

  • P 生产者:发送消息到一个特定的交换机(交换机类型是fanout),不需指定具体的目标队列
  • X 交换机:将消息分发给所有绑定到它的队列
  • C 消费者:订阅主题,通过绑定到交换机的队列接收消息

2. GRPC 服务间的实体同步

考虑以下业务需求——

  • 模拟消费者组机制:- 同一消费者组下的消费者(即一个服务的多个实例)监听同一个队列,是竞争关系- 不同消费者组(即不同服务)监听不同队列,这些队列绑定到同一个交换机,不同消费者组可以独立消费相同的数据
  • 消费历史数据:当生产者先启动,生产了一部分数据,消费者后启动时,也能消费到历史数据

服务之间的实体数据同步方案:

2.1 生产者服务

(1) 初始化

生产者初始化时需要负责绑定 RabbitMQ 的交换机和队列关系,即显式声明自己的实体有哪些消费者在消费。比如:

  • 声明交换机 exchange_user、exchange_group
  • 声明消费者 consumer_user_rpc、consumer_org_rpc
  • 创建队列 exchange_user_consumer_user_rpc、exchange_user_consumer_org_rpc、exchange_group_consumer_user_rpc、exchange_group_consumer_org_rpc,也就是对每个 topic-consumer 组合,创建一个相应的队列
  • 将交换机和队列绑定

(2) 实体变更时发送消息

发送消息到交换机,交换机会自动分发给所有绑定到它的队列,也就是发送一条消息至 exchange_user 交换机,那么消息会被投递给队列 exchange_user_consumer_user_rpc 和 队列 exchange_user_consumer_org_rpc。

2.2 消费者服务

消费者订阅一个 topic,处理 rabbitMQ 队列发来的消息。

  • 若消息处理成功(业务流程成功),发送 Ack 给 rabbitMQ 确认消费
  • 若消息处理失败(业务流程失败),发送 Nack 通知 rabbitMQ 处理失败,消息将放回队列等待下次消费

Ack 时 rabbitMQ 会记录消费者消费的 offset,下次会基于 offset 继续消费~

3. 可靠性

3.1 生产者丢失消息

(1) 生产者绑定交换机和队列

在生产者初始化时,需要先将交换机和队列的关系绑定好,以避免此场景发生:生产者先启动,未绑定交换机和队列,发送了消息到交换机,此时无法投递到具体队列。消费者后启动,即便做了交换机和队列绑定,也无法消费到历史消息。

func NewMQ(rabbitMQCfg *Config, option Option) (*RabbitMQ, error) {
    // ...

    // 初始化交换机和队列
    for topic, consumerGroups := range option.TopicConsumerGroupsBinding {
        err = initExchange(topic, consumerGroups, mq)
        if err != nil {
            return nil, err
        }
    }
    return mq, nil
}

func initExchange(exchange, consumerGroups string, mq *RabbitMQ) error {
    // 1. 创建发送通道
    pch, err := mq.conn.Channel()
    if err != nil {
        return err
    }
    mq.produceChannels[exchange] = pch

    // 2. 开启消息确认机制
    if err := pch.Confirm(false); err != nil {
        return err
    }

    // 3. 创建交换机
    // 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数
    err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)
    if err != nil {
        return err
    }
    slog.Info("rabbitmq declared exchange", "exchange_name", exchange)

    // 4. 创建队列并绑定到交换机
    for _, consumerGroup := range strings.Split(consumerGroups, ",") {
        consumerGroup = strings.TrimSpace(consumerGroup)
        if consumerGroup == "" {
            continue
        }
        queue := queueName(exchange, consumerGroup)

        // 创建队列
        // 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数
        _, err = pch.QueueDeclare(queue, true, false, false, false, nil)
        if err != nil {
            return err
        }

        // 将队列绑定到交换机
        // 参数 queue:队列名称, key:路由键, exchange:交换机名称, noWait:是否等待服务器确认, args:额外参数
        err = pch.QueueBind(queue, "", exchange, false, nil)
        if err != nil {
            return err
        }
        slog.Info("rabbitmq declared and bind queue", "queue", queue, "bind_exchange", exchange)

        // 创建接收通道
        cch, err := mq.conn.Channel()
        if err != nil {
            return err
        }
        mq.consumeChannels[queue] = cch
    }

    // 5. 开启消息确认事件监听、消息投递事件监听
    mq.publishWatcher[exchange] = &watcher{
        returnCh:  pch.NotifyReturn(make(chan amqp.Return)),
        confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),
    }
    // 监听未被交换机投递的消息
    go func() {
        for ret := range mq.publishWatcher[exchange].returnCh {
            // 尝试重新投递
            ctx, _ := context.WithTimeout(context.Background(), mq.config.Timeout)
            if err := mq.publish(ctx, ret.Exchange, ret.MessageId, ret.Body, ret.Timestamp); err != nil {
                slog.Error("rabbitmq republish failed.", "exchange", ret.Exchange, "msgID", ret.MessageId, "err", err)
            } else {
                slog.Warn("rabbitmq got exchange undelivered msg, republished.", "exchange", ret.Exchange, "msgID", ret.MessageId)
            }
            time.Sleep(time.Second * 3)
        }
    }()
    return nil
}

(2) 发送重试

发送消息时增加重试机制。若超过重试上限,需记录日志或报警。

func (r *RabbitMQ) Produce(ctx context.Context, topic string, data map[string]any) error {
    body, _ := json.Marshal(data)
    msgID := uuid.New()

    var retried int
    for {
        err := r.publish(ctx, topic, msgID, body, time.Now())
        if err == nil {
            return nil
        }

        retried++
        if retried > r.option.RetryNum {
            return err
        }
        time.Sleep(r.option.RetryInterval)
    }
}

(3) confirm 消息确认机制

生产端投递消息到 RabbitMQ 后,RabbitMQ 将发送一个确认事件,让生产端知晓消息已发送成功。监听 confirm 事件以确认消息的发送状态:

func initExchange(exchange string, mq *RabbitMQ) error {
    // ...
    // 开启消息确认机制
    if err := pch.Confirm(false); err != nil {
        return err
    }
    
    // 创建监听器
    mq.publishWatcher[exchange] = &watcher{
        confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),
    }
    // ...
}

func (r *RabbitMQ) publish(ctx context.Context, ...) error {
    // publish发送消息
    // ...
    
    // 等待rabbitmq返回消息确认
    select {
    case confirm := <-r.publishWatcher[exchange].confirmCh:
        if !confirm.Ack {
            return errors.New("publish failed, got nack from rabbitmq")
        }
    case <-ctx.Done():
        return errors.New("context deadline, publish to rabbitmq timeout")
    case <-time.After(r.config.Timeout):
        return errors.New("publish to rabbitmq timeout")
    }
    return nil
}

3.2 消费者丢失消息

消费者消费完成后,必须手动 Ack 通知 MQ,表示已经消费成功:

func (r *RabbitMQ) Ack(topic, consumerGroup, msgID string) error {
    // ...

    return consumeChannel.Ack(deliveryTag, false)
}

如果消费失败,需要手动 Nack,那此条消息会重新入队,等待下次消费:

func (r *RabbitMQ) Nack(topic, consumerGroup, msgID string) error {
    // ...

    return consumeChannel.Nack(deliveryTag, false, true)
}

3.3 RabbitMQ 中间件丢失消息

(1) 数据持久化到磁盘

交换机持久化(durable=true):

// 创建交换机
// 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数
err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)

队列持久化(durable=true):

// 创建队列
// 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数
_, err = pch.QueueDeclare(queue, true, false, false, false, nil)

消息持久化(DeliveryMode=Persistent):

err := ch.Publish(
    exchange, // 交换机名称
    "",       // 路由键
    true,     // 如果消息不能路由到任何队列,是否返回未处理的消息. true将返回未处理通知, false将丢弃消息
    false,    // 是否立即交付给消费者. 若为true, 当队列中没有等待中的消费者时,消息会被丢弃
    amqp.Publishing{
        MessageId:    msgID,              // 消息ID
        ContentType:  "application/json", // 消息内容类型
        Body:         body,               // 消息内容
        DeliveryMode: amqp.Persistent,    // 消息需要持久化
        Timestamp:    t,                  // 消息时间
    },
)

(2) RabbitMQ 本身的数据一致性保证

RabbitMQ 使用 raft 共识算法保证数据一致性:

https://www.rabbitmq.com/docs/clustering#replica-placement


本文转载自: https://blog.csdn.net/lixiaonan0318/article/details/143078309
版权归原作者 喝醉的小喵 所有, 如有侵权,请联系我们删除。

“【消息队列】RabbitMQ实现消费者组机制”的评论:

还没有评论