前言
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),信息传输的对象我分为客户端消息发布者(pub)、服务器(server)、客户端消息订阅者(sub),一个客户端可以同时为发布者和订阅者。服务器,服务器为信息传输的枢纽有传递数据、管理客户端、数据保真等作用。
主题
MQTT的通信方式为pub向server发布一个主题并且向这个主题中发布数据,sub如果订阅了这个主题就可以接收到这个主题下的消息。
打个比喻,我们用户观看互联网视频平台上的视频,如果我们想观看一名UP的视频,那么我们用户就一定要关注这名UP的频道,而这名UP会不定时得向他自己的频道发布视频供我们用户观看。而平台就负责了视频、用户、UP的管理,并提供了视频传输的服务。
连接MQTT服务器
客户端连接服务器分为两步:1、客户端向服务器发送连接请求。2、服务器向客户端发送连接确认。
客户端向服务器发送连接请求
客户端向服务器发送一个CONNECT报文
报文内容包括:
clientId——客户端的ID,不能出现重名的客户端
cleanSession——清除会话,在消息的重发中用到
username——连接客户端时输入的用户名,没有可以不输入
password——连接客户端时输入的密码,没有可以不输入
lastWillQos——遗言的Qos
lastWillMessage——遗言的消息内容
lastWillRetain——遗言是否保留
keepAlive——心跳时间间隔,用于服务器判断此客户端是否意外断线
服务器向客户端发送连接确认
服务器向客户端发送一个CONNACK报文
报文内容包括:
sessionPresent——当前会话,配合cleanSession清除会话使用,用于查看是否有保留消息
returnCode——连接返回码,用0到5来表示连接的状态
发布、订阅和取消订阅
发布操作包括1、pub向server发布消息。2、server向订阅了主题的sub发布消息。每次pub更新了主题内的数据,server都会向sub发布消息。
订阅则简单得多,当sub订阅了一个主题后,关于这个主题的操作就只有等着接收数据和取消订阅了。
当每次发布与订阅时就会创建相对应的主题,不需要什么专门操作。
这里有个问题,如果pub不更新数据,那么sub是不能接收到数据的,为了解决这个问题pub发布的消息可以选择为保留状态,这样sub一订阅这个主题就会收到这个保留消息。
这些操作同样是通过报文交互完成。
发布
pub向server或者server向订阅了该主题的sub发送一个PUBLISH报文
报文内容包括:
packetId——报文标识符,用于标识数据包,只有当qos为非0时才为非0的数字序列
topicName——主题名
qos——服务质量等级,分为0,1,2三档次
retainFlag——保留标志,标识消息是否需要设置为保留消息
payload——有效载荷,消息的实际内容
dupFlag——重发标志,标识消息是否可以重发,只有当qos为非0才能为true
订阅
sub向server发送一个SUBSCRIBE报文
报文包括qos与多个订阅主题名,即一个SUBSCRIBE报文可以用于订阅一个或者多个主题。
qos——服务质量等级,在发布与订阅中都要设置。
topic——主题名,可为多个
在sever接收到SUBSCRIBE报文后,server向sub发送一个SUBACK报文
报文包括“订阅返回码”和“报文标识符”这两个信息。
packetId——报文标识符
returnCode —— 订阅返回码,服务端会针对SUBSCRIBE报文中的所有订阅主题来逐一回复给客户端一个返回码。
取消订阅
sub向server发送一个UNSUBSCRIBE报文
报文内容:
packetId——报文标识符
topic——主题名,可为多个
当server接收到UNSUBSCRIBE报文后,会向sub发送取消订阅确认报文 – UNSUBACK报文
该报文含有客户端所发送的“取消订阅报文标识符”。
MQTT主题
主题基本形式
主题的最基本形式就是一个字符串。
主题是区分大小写的。
主题可以使用空格,但不建议。
大部分MQTT服务端不支持中文主题。
主题分级
MQTT主题各个级别之间可以使用”/”来分隔
列如 username/myTopic/1
分别是第1级 username、第2级 myTopic、第3级 1
主题通配符
当客户端订阅主题时,可以使用通配符同时订阅多个主题。通配符只能在订阅主题时使用,下面我们将介绍两种通配符:单级通配符和多级通配符。
单级通配符: +
单级通配符可以代替一个主题级别。
多级通配符 : #
单级通配符仅可代替一个主题级别,而多级通配符”#”可以涵盖任意数量的主题级别。
主题应用注意事项
以$开始的主题是MQTT服务端系统保留的特殊主题,我们不能随意订阅或者向其发布信息。
不要用 “/” 作为主题开头,这没有意义。
主题中不要使用空格,难以阅读且容易忽略。
保持主题简洁明了。
主题中尽量使用ASCII字符。
在主题中嵌入客户端ID。
Qos 服务质量等级
什么是服务质量
MQTT服务质量(Quality of Service 缩写 QoS)正是用于告知物联网系统,哪些信息是重要信息需要准确无误的传输,而哪些信息不那么重要,即使丢失也没有问题。
MQTT协议有三种服务质量级别:
QoS = 0 – 最多发一次
QoS = 1 – 最少发一次
QoS = 2 – 保证收一次
设置QoS
发布消息
客户端发布信息时,PUBLISH数据包中专有一个信息为qos。该信息正是用于设置客户端发布MQTT消息的QoS等级。
订阅消息
同样的,在客户端订阅MQTT主题时,SUBSCRIBE数据包中也同样有一个信息用于设置订阅主题的QoS级别。客户端正是通过该主题来设置订阅主题的QoS级别的。
接收端连接服务端
另外,要想实现QoS>0的MQTT通讯,客户端在连接服务端时必须要将cleanSession设置为false。如果这一步没有实现,那么客户端是无法实现QoS>0的MQTT通讯。
服务质量降级
假如客户端A发布到主题1的消息是采用QoS = 2,然而客户端B订阅主题1采用QoS = 1。那么服务端该如何来应对这一情况呢?
答案是服务端会选择Qos等级低的一方为准。
保留消息
保留消息的作用
默认情况下只有当pub发布了一条新的数据时,sub才会接收到数据。
那么一种情况:
pub要隔很久才会更新一次消息。
sub在pub更新消息的间隙订阅了主题。
那么在pub更新前sub是接收不到消息的,这情况下pub将会有很长的时间是没有信息的状态。
为了避免这种情况,我们把一条pub发布的消息设置为保留消息,当sub订阅了主题时能马上接收到保留消息,而不需要白白等待pub更新。
发布保留消息的方法
MQTT设备发布的保留消息的流程与发布普通消息的流程十分类似。唯一区别是,在发布保留消息时,MQTT设备需要将PUBLISH报文中retainFlag设置为true。
修改保留消息的方法
每一个主题只能有一个“保留消息”,如果客户端想要更新“保留消息”,就需要向该主题发送一条新的“保留消息”,这样服务端会将新的“保留消息”覆盖旧的“保留消息”。当有客户端订阅该主题时,服务端就会将最新的“保留消息”发送给订阅客户端了。
删除保留消息的方法
如果要删除主题的“保留消息”,可以通过向该主题发布一条空的“保留消息”,也就是发送一条0字节payload的“保留消息”。
心跳机制
有些客户端并不经常发送消息给服务端。对于这种客户端,服务端可以使用类似心跳检测的方法,来判断客户端是否在线。
让客户端在没有向服务端发送信息时,可以定时向服务端发送一条消息。这条用于心跳机制的消息也被称作心跳请求(PINGREQ)。心跳请求的作用正是用于告知服务端,当前客户端依然在线。服务端在收到客户端的心跳请求后,会回复一条消息。这条回复消息被称作心跳响应(PINGRESP)。
由于心跳请求是客户端定时发送的,一旦服务端发现客户端停止发送请求信息,那么服务端就会知道,这台客户端已经断开了连接。
在客户端连接服务端时,会将心跳时间间隔信息放入CONNECT报文。
keepAlive正是用于告知服务端心跳时间间隔的。
假如客户端的心跳间隔时间是60秒,那么服务端是不是每隔60秒就检查一次客户端是否发来心跳请求呢?
如果客户端在心跳时间间隔内发布了消息给服务端,那么服务端不需要客户端发送心跳请求也可以确定该客户端肯定在线。
但是当客户端在心跳间隔内没有发布消息给服务端,这时客户端会主动发送一个心跳请求消息给服务端。以表明自己仍让在线。
在实际运行中,如果服务端没有在1.5倍心跳时间间隔内收到客户端发布消息(PUBLISH)或发来心跳请求(PINGREQ),那么服务端就会认为这个客户端已经掉线。
另外,心跳机制不仅仅用于服务端判断客户端是否在线。客户端也可以利用这一机制来判断自己是否与服务端仍保持连接。如果客户端发送了心跳请求(PINGREQ)给服务端一段时间后,仍然没有收到服务端回复的心跳确认。那么客户端也会认为自己已经断开了与服务端的连接。
MQTT遗嘱
当客户端正常断开连接时,会向服务端发送DISCONNECT报文,服务端接收到该报文后,就知道,客户端是正常断开连接,而并非意外断开连接。
然而,当服务端在没有收到DISCONNECT报文的情况下,发现客户端“心跳”停止了,这时服务端就知道客户端是意外断线了。
当客户端意外断线时server会向订阅了此客户端遗嘱的sub发布遗嘱。
客户端如何将遗嘱消息发送给服务端
MQTT客户端要想连接服务端,首先要向服务端发送CONNECT报文,遗嘱相关的内容就在CONNECT报文中。
lastWillTopic —— 遗嘱主题
lastWillMessage —— 遗嘱消息
lastWillQoS —— 遗嘱QoS
lastWillRetain —— 遗嘱保留
遗嘱和一般主题类似,可以对这个遗嘱主题进行发布消息等操作,不同点为要在CONNECT报文中也就是连接服务器过程中将遗嘱内容发给服务器,并且在客户端意外离线时服务器会对订阅了此客户端的遗嘱主题的客户端发布遗嘱消息。
pub与sub代码
pub
#include<WiFi.h>#include"WiFiClient.h"#include"Adafruit_MQTT.h"#include"Adafruit_MQTT_Client.h"#defineWLAN_SSID"xxx_wifi"#defineWLAN_PASS"xxx_password"#defineMQTT_SERVER"xxx_server"#defineMQTT_SERVERPORT1883
WiFiClient client;
Adafruit_MQTT_Client mqtt(&client,MQTT_SERVER,MQTT_SERVERPORT);
Adafruit_MQTT_Publish pub =Adafruit_MQTT_Publish(&mqtt,"my_test_topic/1");voidsetup(){
Serial.begin(115200);
WiFi.begin(WLAN_SSID,WLAN_PASS);delay(2000);
Serial.print("Connecting to ");
Serial.println(WLAN_SSID);while(WiFi.status()!= WL_CONNECTED){delay(500);
Serial.print(".");}
Serial.println();
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());}voidloop(){MQTT_connect();
Serial.print(F("\nSendinf val"));if(!pub.publish("Hello World")){
Serial.println(F("Failed"));}else{
Serial.println(F("OK!"));}delay(2000);}voidMQTT_connect(){int8_t ret;if(mqtt.connected()){return;}
Serial.print("Connecting to MQTT...");uint8_t retries =3;while((ret = mqtt.connect())!=0){
Serial.println(mqtt.connectErrorString(ret));
Serial.println("Retrying MQTT connection in 5 seconds...");
mqtt.disconnect();delay(5000);
retries--;if(retries ==0){while(1);}}
Serial.println("MQTT Connected!");}
sub
#include<WiFi.h>#include"WiFiClient.h"#include"Adafruit_MQTT.h"#include"Adafruit_MQTT_Client.h"#defineWLAN_SSID"xxx_wifi"#defineWLAN_PASS"xxx_password"#defineMQTT_SERVER"xxx_server"#defineMQTT_SERVERPORT1883
WiFiClient client;
Adafruit_MQTT_Client mqtt(&client,MQTT_SERVER,MQTT_SERVERPORT);
Adafruit_MQTT_Subscribe sub =Adafruit_MQTT_Subscribe(&mqtt,"my_test_topic/1");voidsetup(){
Serial.begin(115200);
WiFi.begin(WLAN_SSID,WLAN_PASS);delay(2000);
Serial.print("Connecting to ");
Serial.println(WLAN_SSID);while(WiFi.status()!= WL_CONNECTED){delay(500);
Serial.print(".");}
Serial.println();
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
mqtt.subscribe(&sub);}voidloop(){MQTT_connect();
Adafruit_MQTT_Subscribe *subscription;while((subscription = mqtt.readSubscription(1000))){if(subscription ==&sub){
Serial.print("Got: ");
Serial.println((char*)sub.lastread);}}}voidMQTT_connect(){int8_t ret;if(mqtt.connected()){return;}
Serial.print("Connecting to MQTT...");uint8_t retries =3;while((ret = mqtt.connect())!=0){
Serial.println(mqtt.connectErrorString(ret));
Serial.println("Retrying MQTT connection in 5 seconds...");
mqtt.disconnect();delay(5000);
retries--;if(retries ==0){while(1);}}
Serial.println("MQTT Connected!");}
版权归原作者 不亦乐乎肯尼迪 所有, 如有侵权,请联系我们删除。