0


Golang使用消息队列(RabbitMQ)

最近在使用Golang做了一个网盘项目(类似百度网盘),这个网盘项目有一个功能描述如下:用户会删除一个文件到垃圾回收站,回收站的文件有一个时间期限,比如24h,24h后数据库中记录和oss中文件会被删除,在之前的版本中,可以使用定时任务来检查数据库记录中删除时间来判断是否删除,但是这不是最佳的,因此考虑如何基于RabbitMQ来实现这个功能。

使用RabbitMQ的架构

在这里插入图片描述

代码

因为前端有点麻烦,这里全部使用Golang后端来模拟实现整个架构,包括生产端和消费端。这里有一些细节

  • 注意交换机和队列的绑定,一定要细心
  • 交换机一旦声明了就不能更改,如果要发生一些属性的更改,就要删除原来的内容,重新生成
  • 下列的内容不包含RabbitMQ持久化的内容
package main

import("fmt""github.com/streadway/amqp""log""strings")funcInitRabbitMQ()*amqp.Connection {
    mq :="amqp"
    host :="127.0.0.1"
    port :="5672"
    user :="root"
    pwd :="root"
    dns := strings.Join([]string{mq,"://", user,":", pwd,"@", host,":", port,"/"},"")
    conn, err := amqp.Dial(dns)if err !=nil{
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)}return conn
}funcInitMainExchangeAndQueue(ch *amqp.Channel, userID string)*amqp.Channel {// 队列信息
    exchangeName :="main_exchange"
    queueName := fmt.Sprintf("user_queue_%s", userID)
    messageTTL :=int32(300000)// 声明主交换机
    err := ch.ExchangeDeclare(
        exchangeName,// 交换机名"direct",// Exchange typefalse,// Durablefalse,// Auto-deletedfalse,// Internalfalse,// No-waitnil,// Arguments)if err !=nil{
        log.Fatalf("Failed to declare an main exchange: %v", err)}// 声明用户队列_, err = ch.QueueDeclare(
        queueName,// 队列名false,// Durablefalse,// Delete when unusedfalse,// Exclusivefalse,// No-wait
        amqp.Table{"x-dead-letter-routing-key":"dead",// routing-key"x-dead-letter-exchange":"dead_exchange",// 死信交换机"x-message-ttl":             messageTTL,// TTL},)if err !=nil{
        log.Fatalf("Failed to declare a queue: %v", err)}// 绑定
    err = ch.QueueBind(queueName, userID,"main_exchange",false,nil)if err !=nil{
        log.Fatalf("Failed to bind queue to exchange: %v", err)}return ch
}funcInitDeadExchangeAndQueue(ch *amqp.Channel){// 声明死信交换机
    err := ch.ExchangeDeclare("dead_exchange",
        amqp.ExchangeDirect,true,false,false,false,nil,)if err !=nil{
        log.Fatalf("Failed to declare an dead exchange: %v", err)}// 声明一个死信队列_, err = ch.QueueDeclare("dead_queue",true,false,false,false,nil)if err !=nil{
        log.Fatalf("Failed to declare a queue: %v", err)}// 绑定
    err = ch.QueueBind("dead_queue","dead","dead_exchange",false,nil)if err !=nil{
        log.Fatalf("Failed to bind queue to exchange: %v", err)}}funcPublishMessage(ch *amqp.Channel, userID, fileID string){// 用户信息
    message := fmt.Sprintf("%s|%s", userID, fileID)
    exchangeName :="main_exchange"// 发布用户消息
    err := ch.Publish(
        exchangeName,// Exchange
        userID,// Routing keyfalse,// Mandatoryfalse,// Immediate
        amqp.Publishing{
            ContentType:"text/plain",
            Body:[]byte(message),})if err !=nil{
        log.Fatalf("Failed to publish a message: %v", err)}
    log.Printf("Message sent to user %s: %s", userID, message)}funcConsumeTTL(ch *amqp.Channel){// 声明死信交换机
    err := ch.ExchangeDeclare("dead_exchange",// 交换机名"direct",// Exchange typetrue,// Durablefalse,// Auto-deletedfalse,// Internalfalse,// No-waitnil,// Arguments)if err !=nil{
        log.Fatalf("Failed to declare a dead letter exchange: %v", err)}// 创建消费者并阻塞等待消费死信队列中的消息
    megs, err := ch.Consume("dead_queue",// Queue"",// Consumerfalse,// Auto-acknowledgefalse,// Exclusivefalse,// No-localfalse,// No-waitnil,// Args)if err !=nil{
        log.Fatalf("Failed to register a consumer for dead letter queue: %v", err)}// 使用无限循环一直监听
    fmt.Println("Waiting for message from dead_queue......")for d :=range megs {// 实际中,处理消息的逻辑,例如删除文件或其他操作
        fmt.Println(string(d.Body))// 消费完成后手动确认消息
        err = d.Ack(false)if err !=nil{
            log.Fatalf("Failed to ack message: %v", err)}}}funcConsume(ch *amqp.Channel, userID string){// 下面的信息可以通过前后端进行传递
    queueName := fmt.Sprintf("user_queue_%s", userID)// 消费消息
    megs, err := ch.Consume(
        queueName,// Queue"",// Consumertrue,// Auto-acknowledgefalse,// Exclusivefalse,// No-localfalse,// No-waitnil,// Args)if err !=nil{
        log.Fatalf("Failed to register a consumer: %v", err)}// 这里直接是由前端发送过来的API进行触发,所以不用一直阻塞监听
    d, ok :=<-megs
    if!ok {
        log.Fatalf("Failed to get message: %v", err)}
    fmt.Println(string(d.Body))// 消息完成后确认消息
    err = d.Ack(true)if err !=nil{
        log.Fatalf("Failed to ack message: %v", err)}}funcmain(){// 获取客户端
    client :=InitRabbitMQ()defer client.Close()

    ch, err := client.Channel()if err !=nil{
        log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()//ConsumeTTL(ch)// 构造dead_exchange及dead_queue// InitDeadExchangeAndQueue(ch)// 假设这是web请求信息//var userID1 = "test-id1"//var fileID1 = "file1"// 构造main_exchange及user_queue//ch = InitMainExchangeAndQueue(ch, userID1)// 针对用户1:假设还消息没有过期时候就被recovery,即在user_queue中就被消费,实际中发布消息的这部分逻辑应当放在前端中//PublishMessage(ch, userID1, fileID1)//time.Sleep(20 * time.Second)// 模拟后端消费消息//Consume(ch, userID1)// 针对用户2:模拟其不被后端消费,过期到死信队列中var userID2 ="test-id2"var fileID2 ="file2"
    ch =InitMainExchangeAndQueue(ch, userID2)PublishMessage(ch, userID2, fileID2)// 注意这个消息没有被消费,理论上应当被死信队列消费}

从dead_exchange中消费:
在这里插入图片描述


本文转载自: https://blog.csdn.net/weixin_43495948/article/details/132382511
版权归原作者 HumbleSwage 所有, 如有侵权,请联系我们删除。

“Golang使用消息队列(RabbitMQ)”的评论:

还没有评论