0


go消息队列RabbitMQ - 直连模式与work模式

本文纯属rabbitmq在集中模式下的操作,对于rabbitmq的八股文介绍就不做过多解释了。需要连接更多去看我在github上写的文章:RabbitMQ入门学习教程 - 青叶水间 - 一个IT技术文章分享博客 (leellun.github.io)

首先,使用

go get

安装amqp

go get github.com/streadway/amqp

代码公共部分,所有需要使用到rabbitmq都需要导入amqp

package main

import (
  "log"
  "github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
    if err != nil {
      //....
    }
}

连接到RabbitMQ服务器,并且创建通道

创建一个通道,这是大多数用于完成任务的API所在的位置:

// 1. 尝试连接RabbitMQ,建立连接
// 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
//关闭连接
defer conn.Close()
// 2. 接下来,我们创建一个通道,大多数API都是用过该通道操作的。
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

1. 直连模式

1.1 发送端

声明要发送到的队列,然后将消息发布到队列:send.go部分代码

// 3. 声明消息要发送到的队列
q, err := ch.QueueDeclare(
  "hello", // name
  false,   // durable
  false,   // delete when unused
  false,   // exclusive
  false,   // no-wait
  nil,     // arguments
)
failOnError(err, "Failed to declare a queue")

body := "Hello World!"
// 4.将消息发布到声明的队列
err = ch.Publish(
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing {
    ContentType: "text/plain",
    Body:        []byte(body),
  })
failOnError(err, "Failed to publish a message")

声明队列是幂等的——仅当队列不存在时才创建。消息内容是一个字节数组,因此你可以在此处编码任何内容。

点击查看完整的send.go文件

1.2 接收端

由于它将异步地向我们发送消息,因此我们将在goroutine中从通道(由

amqp::Consume

返回)中读取消息。

声明队列:

// 声明队列
q, err := ch.QueueDeclare(
  "hello", // name
  false,   // durable
  false,   // delete when unused
  false,   // exclusive
  false,   // no-wait
  nil,     // arguments
)
failOnError(err, "Failed to declare a queue")

由于它将异步地向我们发送消息,因此我们将在goroutine中从通道(由

amqp::Consume

返回)中读取消息。

// 获取接收消息的Delivery通道
msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  true,   // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")
//创建一个通道读取数据
forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

点击完整的receive.go脚本

1.3 完整示例

现在我们可以运行两个脚本。在一个终端窗口,运行发布者:

go run send.go

然后,运行使用者:

go run receive.go

消费者将打印通过RabbitMQ从发布者那里得到的消息。使用者将持续运行,等待消息(使用Ctrl-C停止它),因此请尝试从另一个终端运行发布者。

如果要检查队列,请尝试使用

rabbitmqctl list_queues

命令。

2 Work模式

该模式和直连模式一样是去消费队列中的消息,区别是将用于在多个消费者之间分配耗时的任务。

比如:一个活你一个人干,现在给你加一个伙伴,你们一起干,现在你的任务就有一部分分配到他的身上。

2.1 消息操作

将一些包含

.

的字符串封装为消息发送到队列中,其中每有一个

.

就表示需要耗费1秒钟的工作,例如,

hello...

表示一个将花费三秒钟的假任务。

2.1.1 发送端

创建一个new_task.go,发送消息

body := bodyFrom(os.Args)  // 从参数中获取要发送的消息正文
err = ch.Publish(
  "",           // exchange
  q.Name,       // routing key
  false,        // mandatory
  false,
  amqp.Publishing {
    DeliveryMode: amqp.Persistent,
    ContentType:  "text/plain",
    Body:         []byte(body),
  })
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

消息封装:

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

2.1.2 接收端

创建一个receive.go 消息正文中出现的每个

.

伪造一秒钟的工作。

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  true,   // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    dot_count := bytes.Count(d.Body, []byte("."))  // 数一下有几个.
    t := time.Duration(dot_count)
    time.Sleep(t * time.Second)  // 模拟耗时的任务
    log.Printf("Done")
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

2.1.3 消息测试

然后,我们就可以打开两个终端,分别执行

new_task.go

worker.go

了。

# shell 1
go run worker.go
# shell 2
go run new_task.go

2.2 循环调度

使用任务队列的优点之一是能够轻松并行化工作。如果我们的工作正在积压,我们可以增加更多的工人,这样就可以轻松扩展。

(1)准备两个控制端执行worker.go,两个消费者——C1和C2。

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C

(2)准备发送消息的发送端任务执行

# shell 3
go run new_task.go msg1.
go run new_task.go msg2..
go run new_task.go msg3...
go run new_task.go msg4....
go run new_task.go msg5.....

**(3)然后我们在

shell1

shell2

两个窗口看到如下输出结果了:**

# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg1.
# => [x] Received a message: msg3...
# => [x] Received a message: msg5.....
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg2..
# => [x] Received a message: msg4....

默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。

2.3 消息确认

为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发送回一个确认(acknowledgement),以告知RabbitMQ已经接收,处理了特定的消息,并且RabbitMQ可以自由删除它。

将使用手动消息确认,方法是为“auto-ack”参数传递一个

false

,然后在完成任务后,使用

d.Ack(false)

worker

发送一个正确的确认(这将确认一次传递)。

msgs, err := ch.Consume(
    q.Name, // queue
    "",     // consumer
    false,  // 注意这里传false,关闭自动消息确认
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
)
if err != nil {
    fmt.Printf("ch.Consume failed, err:%v\n", err)
    return
}

// 开启循环不断地消费消息
forever := make(chan bool)
go func() {
    for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        dotCount := bytes.Count(d.Body, []byte("."))
        t := time.Duration(dotCount)
        time.Sleep(t * time.Second)
        log.Printf("Done")
        d.Ack(false) // 手动传递消息确认
    }
}()

** 忘记确认**

忘记确认是一个常见的错误。这是一个简单的错误,但后果是严重的。当你的客户机退出时,消息将被重新传递(这看起来像随机重新传递),但是RabbitMQ将消耗越来越多的内存,因为它无法释放任何未确认的消息。

为了调试这种错误,可以使用rabbitmqctl打印messages_unacknowledged字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows平台,去掉sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

2.4 消息持久化

即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止运行,我们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。

q, err := ch.QueueDeclare(
    "hello", // name
    true,    // 声明为持久队列
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
)

如果我们已经声明了hello这个队列,那么上面设置就会不起作用。RabbitMQ不允许你使用不同的参数重新定义现有队列,并将向任何尝试重新定义的程序返回错误。

解决方式:可以将hello队列中的消息消费完成后删除队列,或者重新声明一个新的队列。

上面这种持久的选项更改需要同时应用于生产者代码和消费者代码。

在这一点上,我们确信即使RabbitMQ重新启动,任务队列队列也不会丢失。现在我们需要将消息标记为持久的——通过使用

amqp.Publishing

中的持久性选项

amqp.Persistent

err = ch.Publish(
    "",     // exchange
    q.Name, // routing key
    false,  // 立即
    false,  // 强制
    amqp.Publishing{
        DeliveryMode: amqp.Persistent, // 持久(交付模式:瞬态/持久)
        ContentType:  "text/plain",
        Body:         []byte(body),
    })

将消息标记为持久性并不能完全保证消息不会丢失。

2.4 公平分发

你可能已经注意到调度仍然不能完全按照我们的要求工作。例如,在一个有两个

worker

的情况下,当所有的奇数消息都是重消息而偶数消息都是轻消息时,一个

worker

将持续忙碌,而另一个

worker

几乎不做任何工作。嗯,RabbitMQ对此一无所知,仍然会均匀地发送消息。

这是因为RabbitMQ只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。只是盲目地向消费者发送信息。

img

为了避免这种情况,我们可以将预取计数设置为

1

。这告诉RabbitMQ不要一次向一个

worker

发出多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向

worker

发送新消息。相反,它将把它发送给下一个不忙的

worker

err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
)

2.5 完整代码

new_task.go

的最终代码代入如下:

package main

import (
    "fmt"
    "log"
    "os"
    "strings"

    "github.com/streadway/amqp"
)

func main() {
    // 1. 尝试连接RabbitMQ,建立连接
    // 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        fmt.Printf("connect to RabbitMQ failed, err:%v\n", err)
        return
    }
    defer conn.Close()

    // 2. 接下来,我们创建一个通道,大多数API都是用过该通道操作的。
    ch, err := conn.Channel()
    if err != nil {
        fmt.Printf("open a channel failed, err:%v\n", err)
        return
    }
    defer ch.Close()

    // 3. 要发送,我们必须声明要发送到的队列。
    q, err := ch.QueueDeclare(
        "task_queue", // name
        true,         // 持久的
        false,        // delete when unused
        false,        // 独有的
        false,        // no-wait
        nil,          // arguments
    )
    if err != nil {
        fmt.Printf("declare a queue failed, err:%v\n", err)
        return
    }

    // 4. 然后我们可以将消息发布到声明的队列
    body := bodyFrom(os.Args)
    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // 立即
        false,  // 强制
        amqp.Publishing{
            DeliveryMode: amqp.Persistent, // 持久
            ContentType:  "text/plain",
            Body:         []byte(body),
        })
    if err != nil {
        fmt.Printf("publish a message failed, err:%v\n", err)
        return
    }
    log.Printf(" [x] Sent %s", body)
}

// bodyFrom 从命令行获取将要发送的消息内容
func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}
work.go

内容如下:

package main

import (
    "bytes"
    "fmt"
    "log"
    "time"

    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        fmt.Printf("connect to RabbitMQ failed, err:%v\n", err)
        return
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        fmt.Printf("open a channel failed, err:%v\n", err)
        return
    }
    defer ch.Close()

    // 声明一个queue
    q, err := ch.QueueDeclare(
        "task_queue", // name
        true,         // 声明为持久队列
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    if err != nil {
        fmt.Printf("ch.Qos() failed, err:%v\n", err)
        return
    }

    // 立即返回一个Delivery的通道
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // 注意这里传false,关闭自动消息确认
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        fmt.Printf("ch.Consume failed, err:%v\n", err)
        return
    }

    // 开启循环不断地消费消息
    forever := make(chan bool)
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            dotCount := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dotCount)
            time.Sleep(t * time.Second)
            log.Printf("Done")
            d.Ack(false) // 手动传递消息确认
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}
标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/liu289747235/article/details/135958370
版权归原作者 流光影下 所有, 如有侵权,请联系我们删除。

“go消息队列RabbitMQ - 直连模式与work模式”的评论:

还没有评论