本文首发在这里
基于Example封装便于使用的SDK,支持发布、消费,连接恢复,死信队列,以及官方入门中的多种使用场景
参数解释(测试代码在下面)
- 直接使用
amq.topic,仅是出于逻辑简单代码少,若想数据隔离,可声明自定义主题类型交换(ExchangeDeclare)并使用 - 建议生产消费者尽量对称提供
queueName和keys,确保发布消费前完成声明绑定逻辑。queueName用来声明名称是queueName、queueName_dead_letter(死信)的队列,绑定queueName到amq.topic,并使用所有的keys充当路由键 - 若队列已声明且路由键已绑定,未来消费者启动仅需提供
queueName表明从此队列消费,未来生产者启动仅需提供key用做发布路由键,但queueName会因为空而使用默认值default_queue_name来触发总是执行的声明队列(QueueDeclare)逻辑,避免意外声明RabbitMQ随机命名的队列 - 使用手动消息确认,队列和消息标记为持久,并不使用临时独占队列
- 消费者可合理调大
Qos.prefetchCount来提高吞吐率
启动RabbitMQ
docker run -d-p15672:15672 -p5672:5672 --hostname rabbitmq --name rabbitmq -eRABBITMQ_DEFAULT_USER=guest -eRABBITMQ_DEFAULT_PASS=guest rabbitmq:3-management
消费者
package main
import("context""flag""log/slog""os""os/signal""strings""sync""time""github.com/panshiqu/golang/rabbitmq"
amqp "github.com/rabbitmq/amqp091-go")var addr = flag.String("addr","amqp://guest:guest@localhost:5672/","address")var queueName = flag.String("queueName","","queue name")var keys = flag.String("keys","","routing keys")funconDelivery(delivery *amqp.Delivery)error{
slog.Info("onDelivery", slog.String("body",string(delivery.Body)))// if string(delivery.Body) == "hi2" {// return errors.New("dead letter")// }
time.Sleep(time.Second)
slog.Info("onDelivery done")returnnil}funcmain(){
flag.Parse()
client := rabbitmq.New(*queueName,*addr, strings.Split(*keys,","))
ctx, cancel := context.WithCancel(context.Background())
wg :=&sync.WaitGroup{}go client.ConsumeFunc(ctx, wg, onDelivery)
c :=make(chan os.Signal,1)
signal.Notify(c, os.Interrupt)
sig :=<-c
slog.Info("notify", slog.Any("signal", sig))cancel()
wg.Wait()}
生产者
package main
import("flag""fmt""log/slog""os""os/signal""strings""sync""time""github.com/panshiqu/golang/rabbitmq")var addr = flag.String("addr","amqp://guest:guest@localhost:5672/","address")var queueName = flag.String("queueName","","queue name")var keys = flag.String("keys","","routing keys")var key = flag.String("key","","routing key")funcpublish(client *rabbitmq.Client, wg *sync.WaitGroup){
wg.Add(1)defer wg.Done()for i :=1;; i++{
time.Sleep(time.Second)
data := fmt.Sprintf("hi%d", i)if err := client.Push(*key,[]byte(data)); err !=nil{
slog.Error("push", slog.Any("err", err))return}
slog.Info("push", slog.String("data", data))}}funcmain(){
flag.Parse()
client := rabbitmq.New(*queueName,*addr, strings.Split(*keys,","))
wg :=&sync.WaitGroup{}gopublish(client, wg)
c :=make(chan os.Signal,1)
signal.Notify(c, os.Interrupt)
sig :=<-c
slog.Info("notify", slog.Any("signal", sig))if err := client.Close(); err !=nil{
slog.Error("close", slog.Any("err", err))}
wg.Wait()}
多种使用场景
Hello World
go run consume/main.go -queueName one_queue -keys one_key
go run publish/main.go -queueName one_queue -keys one_key -key one_key
go run consume/main.go -queueName one_queue -keys one_key_new
注:后续启动修改
keys
,并不会解绑原路由键,而是纯粹绑定新路由键,如此以来发布
key
不管是
one_key
还是
one_key_new
都将路由到
one_queue
# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name source_kind destination_name destination_kind routing_key arguments
amq.topic exchange one_queue queue one_key []
amq.topic exchange one_queue queue one_key_new []
Work Queues
go run consume/main.go -queueName two_queue -keys two_key
go run consume/main.go -queueName two_queue -keys two_key
go run publish/main.go -queueName two_queue -keys two_key -key two_key
Publish/Subscribe
go run consume/main.go -queueName three_queue_new -keys three_key
go run publish/main.go -queueName three_queue -keys three_key -key three_key
go run consume/main.go -queueName three_queue -keys three_key
注:上面的生产者仅会声明
queueName
为
three_queue
的队列,并不会声明
three_queue_new
,所以声明
three_queue_new
的消费者必须在生产者发布消息前启动,但
three_queue
没有这个限制
Routing
go run consume/main.go -queueName four_queue -keys error
go run consume/main.go -queueName four_queue_new -keys error,info
go run publish/main.go -queueName four_queue -keys error -key error
go run publish/main.go -queueName four_queue -keys error -key info
Topics
go run consume/main.go -queueName five_queue -keys"*.error"
go run consume/main.go -queueName five_queue_new -keys"kern.#"
go run publish/main.go -queueName five_queue -keys"*.error"-key cron.error
go run publish/main.go -queueName five_queue -keys"*.error"-key kern.info
go run publish/main.go -queueName five_queue -keys"*.error"-key kern.error
死信队列
业务逻辑如下返回错误,重新投递仍失败后将进入死信队列,保证消息不丢失,还可反复消费来排查问题
ifstring(delivery.Body)=="hi2"{return errors.New("dead letter")}
go run consume/main.go -queueName six_queue -keys six_key
go run publish/main.go -queueName six_queue -keys six_key -key six_key
go run consume/main.go -queueName six_queue_dead_letter
特殊情况说明
- 生产者正常退出若是等待发布接口返回消息将不丢失
- 消费者正常退出是会等待业务逻辑处理且发送确认完成的
- 消费者崩溃,RabbitMQ将会重新投递没有收到确认的消息,那怕仅是消费者确认没发出去
- 消费者处理消息成功,RabbitMQ关闭,消费者发送确认失败,服务开启后仍会重新投递此消息
- RabbitMQ关闭,发布接口将阻塞至服务启动,消息基本不会丢失
- 理论上消费者和生产者均可先于RabbitMQ启动
建议业务逻辑对消息的消费支持幂等
关于发布确认
目前并未支持一对一的确认,意思就是发布三条消息,某次的发布因未收到确认而阻塞,但是无从知晓是三条中的哪条消息?当前是可以换用
PublishWithDeferredConfirm
发布并记录本端递增生成的
DeliveryTag
,虽然相同通道的消息是加锁顺序发布的,但是想要和对端RabbitMQ递增生成的
DeliveryTag
对应起来,在依赖可靠网络传输的基础上,收到消息时应立即递增生成,如此以来对应关系才能得到保证,目前还没有看RabbitMQ实现源码来明确这点
heartbeat
两端协调心跳时间会取较小值,但想禁用心跳双端须同时置0,如此便于调试避免触发断线重连
amqp://guest:guest@localhost:5672/?heartbeat=1800
rabbitmqctl eval 'application:get_env(rabbit, heartbeat).'
rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 1800).'
有用的命令
rabbitmqctl list_queues
rabbitmqctl list_queues name messages_ready messages_unacknowledged
rabbitmqctl list_exchanges
rabbitmqctl list_bindings
版权归原作者 磐石区 所有, 如有侵权,请联系我们删除。