0


在Go中迅速使用RabbitMQ

文章目录

  • 为什么要使用消息队列

image-20240903160417835

1 认识

1.1 MQ分类

  • 有Broker- 重Topic —— 在整个broker中,依据topic来进行消息中转。在重topic的MQ中必然需要topic —— kafka- 轻Topic —— topic只是一种中转模式 —— rabbitMQ
  • 无Broker

1.2 安装

  1. # latest RabbitMQ 3.13docker run \-eRABBITMQ_DEFAULT_USER=dusong \#默认账号和密码均为:guest-eRABBITMQ_DEFAULT_PASS=123123\-d\#detached mode-v mq-plugins:/plugins \#插件挂载--rm\--name rabbitmq \-p5672:5672 \#消息通信端口-p15672:15672 \#管理界面端口
  2. rabbitmq:3.13-management

1.3 基本流程

image-20240904110326213

  • exchange只能转发消息,不能存储消息
  • 通过bind将queue绑定到exchange

2 Work模型

  • 多个消费者绑定到一个队列
  • 同一个消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量(不设置默认平均平均分配)image-20240904144344585err = ch.Qos(1,// prefetch count0,// prefetch sizefalse,// global)

3 交换机

3.1 fanout

fanout类型的交换机会将消息转发给所有绑定到改交换机的队列

3.2 direct

image-20240904151234823

  1. err = ch.ExchangeDeclare("logs_direct",// name"direct",// typetrue,// durablefalse,// auto-deletedfalse,// internalfalse,// no-waitnil,// arguments)failOnError(err,"Failed to declare an exchange")
  2. ctx, cancel := context.WithTimeout(context.Background(),5*time.Second)defercancel()
  3. body :=bodyFrom(os.Args)
  4. err = ch.PublishWithContext(ctx,"logs_direct",// exchange"log",// routing keyfalse,// mandatoryfalse,// immediate
  5. amqp.Publishing{
  6. ContentType:"text/plain",
  7. Body:[]byte(body),})

3.3 topic

image-20240904151944421

4 Golang创建交换机/队列/Publish/Consume/Bind

  • 创建交换机err = ch.ExchangeDeclare("logs_direct",// name"direct",// typetrue,// durablefalse,// auto-deletedfalse,// internalfalse,// no-waitnil,// arguments)
  • 创建队列q, err := ch.QueueDeclare("hello",// namefalse,// durable(是否持久化)false,// delete when unusedfalse,// exclusivefalse,// no-waitnil,// arguments)
  • 绑定err = ch.QueueBind( q.Name,// queue name"log",// routing key"logs_direct",// exchangefalse,nil)
  • 发送body :="this is log"err = ch.PublishWithContext(ctx,"logs_direct",// exchange"log",// routing keyfalse,// mandatoryfalse,// immediate amqp.Publishing{ ContentType:"text/plain", Body:[]byte(body),})
  • 接收msgs, err := ch.Consume( q.Name,// queue"",// consumertrue,// auto ackfalse,// exclusivefalse,// no localfalse,// no waitnil,// args)

5 可靠性

5.1 生产者可靠性

  • 生产者重连
  • 生产者确认(ack)

5.2 MQ可靠性

  • 交换机/队列持久化
  • 消息持久化

5.2.1 Lazy Queue

image-20240904172117264

image-20240904163818387

5.3 消费者可靠性

  • 消费者确认机制image-20240904172521990

5.4 业务幂等性

  • 消费者因为保证可靠性可能消费业务多次,因此需要保证业务幂等性
  1. 给消息加上uuid
  2. 在业务逻辑上做修改

5.4 Golang实现可靠性

在使用 RabbitMQ 的 Go 应用程序中,要确保消息的可靠性,通常需要从以下几个方面入手:

1. 确保消息生产者的可靠性

  • 消息确认(Publisher Confirms): 开启 RabbitMQ 的发布确认模式。通过调用 Channel.Confirm() 方法,让 RabbitMQ 服务器在成功接收并持久化消息后向生产者发送确认。这样可以确保生产者知道消息已被可靠接收。ch.Confirm(false)// 启用发布确认模式confirm := ch.NotifyPublish(make(chan amqp.Confirmation,1))// 发布消息err = ch.Publish(exchange, routingKey, mandatory, immediate, msg)if err !=nil{// 处理发布失败的情况}select{case confirmed :=<-confirm:if confirmed.Ack { fmt.Println("消息已确认")}else{ fmt.Println("消息未确认")}case<-time.After(time.Second *5): fmt.Println("消息确认超时")}
  • 消息持久化(Message Durability): 将消息标记为持久化,以确保即使 RabbitMQ 服务器重启,消息也不会丢失。通过设置 DeliveryModeamqp.Persistent 来实现:msg := amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType:"text/plain", Body:[]byte("Hello, RabbitMQ!"),}

2. 确保消息队列的可靠性

  • 队列持久化(Queue Durability): 创建队列时,将其声明为持久化队列。这样即使 RabbitMQ 服务器重启,队列依然存在。_, err = ch.QueueDeclare("my_queue",// 队列名true,// 是否持久化false,// 是否自动删除false,// 是否排他false,// 是否阻塞nil,// 其他参数)if err !=nil{ log.Fatalf("Failed to declare a queue: %s", err)}

3. 确保消息消费者的可靠性

  • 手动确认(Manual Acknowledgment): 消费者手动确认接收到的消息。这样只有在消息成功处理后,RabbitMQ 才会将其从队列中移除。如果消费者没有确认消息且发生故障,RabbitMQ 会将消息重新投递。msgs, err := ch.Consume("my_queue",// 队列名"",// 消费者标识false,// 自动确认false,// 是否排他false,// 是否阻塞false,// 是否在同一个连接上消费nil,// 其他参数)if err !=nil{ log.Fatalf("Failed to register a consumer: %s", err)}for d :=range msgs {// 处理消息 fmt.Printf("Received a message: %s", d.Body)// 手动确认 d.Ack(false)}
  • QoS(Quality of Service): 设置消费者的 QoS 参数,例如 prefetch_count,确保消费者不会一次处理太多消息,从而导致过载。err = ch.Qos(1,// 每次处理一条消息0,// 消息大小限制(不限制)false,// 是否应用于整个通道)if err !=nil{ log.Fatalf("Failed to set QoS: %s", err)}

4. 容错处理

  • 重试机制: 在生产者和消费者中实现重试机制,例如使用带有指数回退的重试逻辑,以应对 RabbitMQ 不可用或网络波动的情况。
  • 死信队列(DLX): 配置死信队列,将处理失败的消息路由到指定的死信队列,方便后续分析和处理。

通过这些措施,可以有效提高使用 RabbitMQ 时的消息可靠性。

6 延迟消息

6.1 死信交换机

image-20240905145924200

6.2 延迟消息插件

6.2.1 安装

  1. https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
  2. 将插件放在该目录image-20240905153455222
  3. docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq-delayed-message-exchange

6.2.2 使用

  1. // 3. 声明延迟交换机
  2. err = ch.ExchangeDeclare("delay_exchange",// 交换机名称"x-delayed-message",// 交换机类型true,// 是否持久化false,// 是否自动删除false,// 是否内部使用false,// 是否等待
  3. amqp.Table{"x-delayed-type":"direct"},// 交换机类型的设置)failOnError(err,"Failed to declare an exchange")// 4. 发送消息
  4. body :="Hello World with delay"
  5. err = ch.Publish("delay_exchange",// 交换机名称"routing_key",// 路由键false,// 是否强制发送false,// 是否立即发送
  6. amqp.Publishing{
  7. ContentType:"text/plain",
  8. Body:[]byte(body),
  9. Headers: amqp.Table{"x-delay":int32(5000),// 延迟时间,单位为毫秒 (5秒延迟)},})

6.2.3 应用场景

  • 消息内部维护一个计时器,延迟消息对CPU的消耗较高,适用于延迟时间较短的场景

image-20240905155732697
false, // 是否立即发送
amqp.Publishing{
ContentType: “text/plain”,
Body: []byte(body),
Headers: amqp.Table{
“x-delay”: int32(5000), // 延迟时间,单位为毫秒 (5秒延迟)
},
})

  1. ### 6.2.3 应用场景
  2. - 消息内部维护一个计时器,延迟消息对CPU的消耗较高,适用于延迟时间较短的场景
  3. [外链图片转存中...(img-eA0QMPnx-1725527666228)]
标签: golang rabbitmq

本文转载自: https://blog.csdn.net/Dusong_/article/details/141936206
版权归原作者 Dusong_ 所有, 如有侵权,请联系我们删除。

“在Go中迅速使用RabbitMQ”的评论:

还没有评论