本文纯属rabbitmq在集中模式下的操作,对于rabbitmq的八股文介绍就不做过多解释了。需要连接更多去看我在github上写的文章:RabbitMQ入门学习教程 - 青叶水间 - 一个IT技术文章分享博客 (leellun.github.io)
首先,使用
go get
安装amqp
go get github.com/streadway/amqp
代码公共部分,所有需要使用到rabbitmq都需要导入amqp
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
//....
}
}
连接到RabbitMQ服务器,并且创建通道
创建一个通道,这是大多数用于完成任务的API所在的位置:
// 1. 尝试连接RabbitMQ,建立连接
// 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
//关闭连接
defer conn.Close()
// 2. 接下来,我们创建一个通道,大多数API都是用过该通道操作的。
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
1. 直连模式
1.1 发送端
声明要发送到的队列,然后将消息发布到队列:send.go部分代码
// 3. 声明消息要发送到的队列
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := "Hello World!"
// 4.将消息发布到声明的队列
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
声明队列是幂等的——仅当队列不存在时才创建。消息内容是一个字节数组,因此你可以在此处编码任何内容。
点击查看完整的send.go文件
1.2 接收端
由于它将异步地向我们发送消息,因此我们将在goroutine中从通道(由
amqp::Consume
返回)中读取消息。
声明队列:
// 声明队列
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
由于它将异步地向我们发送消息,因此我们将在goroutine中从通道(由
amqp::Consume
返回)中读取消息。
// 获取接收消息的Delivery通道
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
//创建一个通道读取数据
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
点击完整的receive.go脚本
1.3 完整示例
现在我们可以运行两个脚本。在一个终端窗口,运行发布者:
go run send.go
然后,运行使用者:
go run receive.go
消费者将打印通过RabbitMQ从发布者那里得到的消息。使用者将持续运行,等待消息(使用Ctrl-C停止它),因此请尝试从另一个终端运行发布者。
如果要检查队列,请尝试使用
rabbitmqctl list_queues
命令。
2 Work模式
该模式和直连模式一样是去消费队列中的消息,区别是将用于在多个消费者之间分配耗时的任务。
比如:一个活你一个人干,现在给你加一个伙伴,你们一起干,现在你的任务就有一部分分配到他的身上。
2.1 消息操作
将一些包含
.
的字符串封装为消息发送到队列中,其中每有一个
.
就表示需要耗费1秒钟的工作,例如,
hello...
表示一个将花费三秒钟的假任务。
2.1.1 发送端
创建一个new_task.go,发送消息
body := bodyFrom(os.Args) // 从参数中获取要发送的消息正文
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
消息封装:
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
2.1.2 接收端
创建一个receive.go 消息正文中出现的每个
.
伪造一秒钟的工作。
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte(".")) // 数一下有几个.
t := time.Duration(dot_count)
time.Sleep(t * time.Second) // 模拟耗时的任务
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
2.1.3 消息测试
然后,我们就可以打开两个终端,分别执行
new_task.go
和
worker.go
了。
# shell 1
go run worker.go
# shell 2
go run new_task.go
2.2 循环调度
使用任务队列的优点之一是能够轻松并行化工作。如果我们的工作正在积压,我们可以增加更多的工人,这样就可以轻松扩展。
(1)准备两个控制端执行worker.go,两个消费者——C1和C2。
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
(2)准备发送消息的发送端任务执行
# shell 3
go run new_task.go msg1.
go run new_task.go msg2..
go run new_task.go msg3...
go run new_task.go msg4....
go run new_task.go msg5.....
**(3)然后我们在
shell1
和
shell2
两个窗口看到如下输出结果了:**
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg1.
# => [x] Received a message: msg3...
# => [x] Received a message: msg5.....
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg2..
# => [x] Received a message: msg4....
默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。
2.3 消息确认
为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发送回一个确认(acknowledgement),以告知RabbitMQ已经接收,处理了特定的消息,并且RabbitMQ可以自由删除它。
将使用手动消息确认,方法是为“auto-ack”参数传递一个
false
,然后在完成任务后,使用
d.Ack(false)
从
worker
发送一个正确的确认(这将确认一次传递)。
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // 注意这里传false,关闭自动消息确认
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
fmt.Printf("ch.Consume failed, err:%v\n", err)
return
}
// 开启循环不断地消费消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false) // 手动传递消息确认
}
}()
** 忘记确认**
忘记确认是一个常见的错误。这是一个简单的错误,但后果是严重的。当你的客户机退出时,消息将被重新传递(这看起来像随机重新传递),但是RabbitMQ将消耗越来越多的内存,因为它无法释放任何未确认的消息。
为了调试这种错误,可以使用rabbitmqctl打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows平台,去掉sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
2.4 消息持久化
即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止运行,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。
q, err := ch.QueueDeclare(
"hello", // name
true, // 声明为持久队列
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
如果我们已经声明了hello这个队列,那么上面设置就会不起作用。RabbitMQ不允许你使用不同的参数重新定义现有队列,并将向任何尝试重新定义的程序返回错误。
解决方式:可以将hello队列中的消息消费完成后删除队列,或者重新声明一个新的队列。
上面这种持久的选项更改需要同时应用于生产者代码和消费者代码。
在这一点上,我们确信即使RabbitMQ重新启动,任务队列队列也不会丢失。现在我们需要将消息标记为持久的——通过使用
amqp.Publishing
中的持久性选项
amqp.Persistent
。
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // 立即
false, // 强制
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久(交付模式:瞬态/持久)
ContentType: "text/plain",
Body: []byte(body),
})
将消息标记为持久性并不能完全保证消息不会丢失。
2.4 公平分发
你可能已经注意到调度仍然不能完全按照我们的要求工作。例如,在一个有两个
worker
的情况下,当所有的奇数消息都是重消息而偶数消息都是轻消息时,一个
worker
将持续忙碌,而另一个
worker
几乎不做任何工作。嗯,RabbitMQ对此一无所知,仍然会均匀地发送消息。
这是因为RabbitMQ只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。只是盲目地向消费者发送信息。
为了避免这种情况,我们可以将预取计数设置为
1
。这告诉RabbitMQ不要一次向一个
worker
发出多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向
worker
发送新消息。相反,它将把它发送给下一个不忙的
worker
。
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
2.5 完整代码
new_task.go
的最终代码代入如下:
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func main() {
// 1. 尝试连接RabbitMQ,建立连接
// 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Printf("connect to RabbitMQ failed, err:%v\n", err)
return
}
defer conn.Close()
// 2. 接下来,我们创建一个通道,大多数API都是用过该通道操作的。
ch, err := conn.Channel()
if err != nil {
fmt.Printf("open a channel failed, err:%v\n", err)
return
}
defer ch.Close()
// 3. 要发送,我们必须声明要发送到的队列。
q, err := ch.QueueDeclare(
"task_queue", // name
true, // 持久的
false, // delete when unused
false, // 独有的
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Printf("declare a queue failed, err:%v\n", err)
return
}
// 4. 然后我们可以将消息发布到声明的队列
body := bodyFrom(os.Args)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // 立即
false, // 强制
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
fmt.Printf("publish a message failed, err:%v\n", err)
return
}
log.Printf(" [x] Sent %s", body)
}
// bodyFrom 从命令行获取将要发送的消息内容
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
work.go
内容如下:
package main
import (
"bytes"
"fmt"
"log"
"time"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Printf("connect to RabbitMQ failed, err:%v\n", err)
return
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
fmt.Printf("open a channel failed, err:%v\n", err)
return
}
defer ch.Close()
// 声明一个queue
q, err := ch.QueueDeclare(
"task_queue", // name
true, // 声明为持久队列
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
fmt.Printf("ch.Qos() failed, err:%v\n", err)
return
}
// 立即返回一个Delivery的通道
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // 注意这里传false,关闭自动消息确认
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
fmt.Printf("ch.Consume failed, err:%v\n", err)
return
}
// 开启循环不断地消费消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false) // 手动传递消息确认
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
版权归原作者 流光影下 所有, 如有侵权,请联系我们删除。