0


golang开源的可嵌入应用程序高性能的MQTT服务

golang开源的可嵌入应用程序高性能的MQTT服务

什么是MQTT?

MQTT(Message Queuing Telemetry Transport)是一种轻量级的、开放的消息传输协议,设计用于在低带宽、高延迟或不可靠的网络环境中进行通信。MQTT最初由IBM开发,现已成为OASIS标准。
MQTT的设计目标是提供一种简单、轻量、可扩展的协议,适用于各种设备和网络条件。它通常用于物联网(IoT)和传感器网络,其中设备需要以有效的方式进行通信,并且资源(如带宽和电池寿命)可能受到限制。
MQTT的简单设计和适用性使其成为物联网中常用的通信协议之一。它被广泛用于传感器网络、嵌入式设备、移动应用程序和其他场景中,提供了一种可靠、高效的消息传输机制。

什么是Mochi-MQTT

源代码地址:https://github.com/mochi-mqtt/server

Mochi MQTT 是一个完全兼容 MQTT v5的可嵌入的中间件/服务器,完全使用 Go 语言编写,旨在用于遥测和物联网项目的开发。它可以作为独立的二进制文件使用,也可以嵌入到你自己的应用程序中库来使用,经过提出的设计以实现问题的轻量化和快速部署,同时也非常重视代码的质量和可维护性。

用途

物联网项目开发时,常常需要使用MQTT协议对设备接入,在很多场景中,私有化部署物联网系统时资源比较少,性能要求高,一些大型的MQTT服务不满足要求,而且代码不可控。
还有在边缘场景下,需要在边缘网关,边缘控制器设备上部署物联网系统,但是边缘网关的资源很少,内存大约只有4G,所以使用java开发的物联网系统就很难部署上去;使用C/C++开发效率又很低,所以Go语言是最合适的,
Mochi-MQTT刚好又完全是Go编写的开源的,可以嵌入到自己的程序启动。

Mochi MQTT独立部署

Golang的环境配置这里不做说明,请看我前面的博文说明

Mochi MQTT 可以作为独立的中间件使用。只需拉取此仓库代码,然后在 cmd 文件夹中运行 cmd/main.go ,默认将开启下面几个服务端口, tcp (:1883)、websocket (:1882) 和服务状态监控 (:8080) 。

cd cmd
go build -o mqtt && ./mqtt

docker部署

可以从 Docker Hub 仓库中拉取并运行Mochi MQTT官方镜像:

docker pull mochimqtt/server
或者
docker run mochimqtt/server

也提供了一个简单的 Dockerfile,用于运行 cmd/main.go 中的 Websocket(:1882)、TCP(:1883) 和服务端状态信息(:8080)这三个服务监听:

docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest

嵌入自己项目运行和开发

下载Mochi MQTT包

go get github.com/mochi-mqtt/server/v2

将Mochi MQTT作为包导入使用, 示例代码如下

import (
  mqttServer "github.com/mochi-mqtt/server/v2"
    "github.com/mochi-mqtt/server/v2/listeners"
    "github.com/mochi-mqtt/server/v2/packets"
)

var Server *mqttServer.Server

func ServerMqttInit() {
    // 创建新的 MQTT 服务器。
    Server = mqttServer.New(&mqttServer.Options{
        InlineClient: true, // 启动内联客户端
    })
    
    // 初始化数据库实例
    edge := &edgeHook{deviceDao: deviceDao.NewDeviceRepository(),
        productDao:     productDao.NewProductRepository(),
    }
    // 添加自定义权限方法
    err := Server.AddHook(edge, nil)
    if err != nil {
        log.Fatal(err)
    }

    // 在1883端口上创建一个 TCP 服务端。
    tcp := listeners.NewTCP("t1", ":1883", nil)
    err = Server.AddListener(tcp)
    if err != nil {
        log.Fatal(err)
    }

    // 在1882端口上创建一个 Websocket 服务端。
    ws := listeners.NewWebsocket("ws1", ":1882", nil)
    err = server.AddListener(ws)
    if err != nil {
        log.Fatal(err)
    }

    go func() {
        err := Server.Serve()
        if err != nil {
            log.Fatal(err)
        }
    }()
}

type edgeHook struct {
    mqttServer.HookBase
    deviceDao      deviceDao.DeviceRepository
    productDao     productDao.ProductRepository
}

func (h *edgeHook) ID() string {
    return "mqtt-auth"
}

func (h *edgeHook) Provides(b byte) bool {
    // 实现钩子函数
    return bytes.Contains([]byte{
        //MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
        mqttServer.OnConnectAuthenticate,
        //MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
        mqttServer.OnACLCheck,
        //在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
        mqttServer.OnSessionEstablish,
        //当客户端因任何原因断开连接时调用。
        mqttServer.OnDisconnect,
        //当客户端向订阅者发布消息后调用。
        mqttServer.OnPublished,
    }, []byte{b})
}

// OnConnectAuthenticate MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
func (h *edgeHook) OnConnectAuthenticate(cl *mqttServer.Client, pk packets.Packet) bool {
    username := string(pk.Connect.Username)
    password := string(pk.Connect.Password)
    if username == "" || len(username) == 0 {
        return false
    }
    if password == "" || len(password) == 0 {
        return false
    }
    return true
}

// OnACLCheck MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
func (h *edgeHook) OnACLCheck(cl *mqttServer.Client, topic string, write bool) bool {
    username := string(cl.Properties.Username)
    if username == "" || len(username) == 0 {
        return false
    }
    if topic == "" || len(topic) == 0 {
        return false
    }
    return true
}

// OnSessionEstablish 在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
func (h *edgeHook) OnSessionEstablish(cl *mqttServer.Client, pk packets.Packet) {
    username := string(cl.Properties.Username)
    if username == "" || len(username) == 0 {
        return
    }
    //设备连接MQTT成功后保存设备在线状态
}

// OnDisconnect 当客户端因任何原因断开连接时调用。
func (h *edgeHook) OnDisconnect(cl *mqttServer.Client, err error, expire bool) {
    username := string(cl.Properties.Username)
    if username == "" || len(username) == 0 {
        return
    }
    //设备断开MQTT成功后保存设备离线状态
}

// OnPublished 当客户端向订阅者发布消息后调用。
func (h *edgeHook) OnPublished(cl *mqttServer.Client, pk packets.Packet) {
    Log.Infof("mqtt server OnPublished info topic=%s, msg=%s", pk.TopicName, string(pk.Payload))
    //收到客户端消息后做业务逻辑处理
}

// 使用内联客户端方式,向MQTT发送消息
func PublishMsg(topic string, msg []byte) bool {
    err := Server.Publish(topic, msg, false, 0)
    if err != nil {
        Log.Errorf("mqtt EdgePublish error=%v, topic=%s, msg=%s", err, topic, msg)
        return false
    }
    return true
}

// 使用内联客户端方式,订阅边缘MQTT消息topic
func SubscribeTopic(topic string, subscriptionId int, callback func(topic string, msg []byte)) {
    callbackFn := func(cl *mqttServer.Client, sub packets.Subscription, pk packets.Packet) {
        Log.Info("mqtt EdgeSubscribe received message", "client", cl.ID, "subscriptionId", sub.Identifier,
            "topic", pk.TopicName, "payload", string(pk.Payload))
        callback(pk.TopicName, pk.Payload)
    }
    _ = Server.Subscribe(topic, subscriptionId, callbackFn)
}

// 使用内联客户端方式,取消订阅边缘MQTT消息topic
func UnsubscribeTopic(topic string, subscriptionId int) {
    _ = Server.Unsubscribe(topic, subscriptionId)
}

func main() {
    // 创建信号用于等待服务端关闭信号
  sigs := make(chan os.Signal, 1)
  done := make(chan bool, 1)
  signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  go func() {
    <-sigs
    done <- true
  }()
  
  <-done
    Log.Error("caught signal, stopping...")
    Server.Close()
    Log.Error("main.go finished")
}

监控MQTT指标信息

mqttRouters := r.Group("/mqtt", func(context *gin.Context) {})
    {
        mqttRouters.GET("stats", func(c *gin.Context) {
            util.R(c, nil, mqtt.Server.Info)
        })
    }

在这里插入图片描述

详情使用指南请看:https://github.com/mochi-mqtt/server

标签: golang mqtt

本文转载自: https://blog.csdn.net/yinjl123456/article/details/135960581
版权归原作者 beyond阿亮 所有, 如有侵权,请联系我们删除。

“golang开源的可嵌入应用程序高性能的MQTT服务”的评论:

还没有评论