0


EMQX 安装使用和部分坑

0、前言

1、强烈建议查看官方文档,说的是真的不错,浅显易懂:EMQ 提供了通俗易懂的技术文章,帮助开发者快速了解 MQTT 协议及其相关特性

2、在我使用到现在,对比了很多MQTT broker,EMQX 几乎是完美的,只有一点限制,那就是开源版本不支持消息持久化

3、当然直接使用官方编译好的方式,启动也是不错的,很简单,也是很推荐的,官方文档的安装步骤一栏就有,记得选好版本欧

官方安装文档地址
在这里插入图片描述
点击上面箭头的链接,就会跳转了在这里插入图片描述

一、安装

1、这里安装使用docker安装,版本为4.4.9,目前为止版本已经到5.x,但是本着先不用最新版本的原则,就先使用4.x版本
官网文档:4.4版本中文官方文档,左下角有切换版本和语言的按钮,选择对应的版本就好

2、从

dockerHub

中找到对应的镜像版本 emqx/emqx

3、我就使用4.4.9版本的镜像就🆗。

在这里插入图片描述

4、建立对应的数据挂载目录,至于为什么挂载这些目录,是因为这些目录比较重要,可以看官方的目录解释

mkdir-p /data/docker/emqx/{etc,lib,data,log}

在这里插入图片描述
在这里插入图片描述

5、先运行一个demo 的emqx,就是没有挂载的镜像版本,为了将对应的目录文件copy出来,因为等下如果挂载,会读取挂载目录的数据,所以我们又不知道要那些数据,所以先跑一个demo,把对应文件数据copy出来,后面用来挂载即可

对应端口说明,如下图,其中挂载一个时间是因为,我发现这个镜像的时区不对,比我们少8个小时,所以挂载下时区,就可以保证和系统是一致的了

docker run -d--name emqx -v /etc/localtime:/etc/localtime  -p1883:1883 -p8081:8081 -p8083:8083 -p8084:8084 -p8883:8883 -p18083:18083 emqx/emqx:4.4.9

在这里插入图片描述
在这里插入图片描述

6、将对应的问价copy到我们前面建立的目录中,copy完成之后,就会发现我们的目录有数据了,这都是默认的基础配置,后面就可以用它们来挂载

dockercp emqx:/opt/emqx/etc /data/docker/emqx
dockercp emqx:/opt/emqx/lib /data/docker/emqx
dockercp emqx:/opt/emqx/data /data/docker/emqx
dockercp emqx:/opt/emqx/log /data/docker/emqx

在这里插入图片描述
7、修改目录权限

chown-R1000:1000 /data/docker/emqx/
chmod-R755 /data/docker/emqx/

在这里插入图片描述

8、删除不用的,建立完整的镜像

# 删除前面那个demodocker stop emqx &&dockerrm emqx

# 建立新的docker run -d--name emqx --restart=always \-p1883:1883 \-p8883:8883 \-p8083:8083 \-p8084:8084 \-p8081:8081 \-p18083:18083 \-v /etc/localtime:/etc/localtime \-v /data/docker/emqx/etc:/opt/emqx/etc \-v /data/docker/emqx/lib:/opt/emqx/lib \-v /data/docker/emqx/data:/opt/emqx/data \-v /data/docker/emqx/log:/opt/emqx/log \
emqx/emqx:4.4.9

在这里插入图片描述
9、登录页面,根据前面的那个端口关系知道,

18083

dashboard

端口,访问即可

http://192.168.172.229:18083

,输入默认账号:

admin

默认密码:

public

在这里插入图片描述
10、然后会强制你修改默认的密码,就🆗了,然后用新密码登录
在这里插入图片描述

二、springBoot 使用

1、这个参考文章 springboot整合mqtt实现消息发送和消费,以及客户端断线重连之后的消息恢复,就行写的很好

三、修改客户端链接授权

1、如上默认任何客户端都可以链接,这肯定是不可以的。

2、官方提供很多认证方式,这里选取较为方便的一种 mnesia认证
在这里插入图片描述
3、首先在配置文件中关闭匿名客户端链接,修改我们的挂载的配置文件(镜像里面的配置也会跟着改),然后重启镜像(不然不能生效…)

# 修改下述配置vim /data/docker/emqx/etc/emqx.conf

# 重启镜像docker restart emqx

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
4、再次链接你就会发现无权链接
在这里插入图片描述
5、先在控制台开启

 mnesia认证

在这里插入图片描述
在这里插入图片描述

6、增加对应用户,这的话官方说的已经很清楚了 Mnesia 认证,两种方式

  • 预设认证数据(就是修改对应的配置文件,手动添加用户和密码)
  • 使用 HTTP API 管理认证数据 (通过http的方式,来添加用户和密码)

7、那我们通过修改配置文件的方式好了,改完之后重启

# 1、修改配置文件,增加用户vim /data/docker/emqx/etc/plugins/emqx_auth_mnesia.conf

# etc/plugins/emqx_auth_mnesia.conf## clientid 认证数据
auth.client.1.clientid = thermal
auth.client.1.password = TAUb&LjCdYB#EjAMNB## username 认证数据
auth.user.2.username = thermal
auth.user.2.password = TAUb&LjCdYB#EjAMNB# 2、重启docker restart emqx

在这里插入图片描述

在这里插入图片描述

至于为啥有clientId和userName,两种,我建议是两个都配置,并且保持一致,当然你也可以只配置

userName

的方式也行,我已经测试过了。在这里插入图片描述

8、最后就可以了,使用最新的账号去链接。注意那个

admin

账号是登录控制台的,和客户端链接认证没有关系,客户端链接得写后面我们增加的用户

thermal

在这里插入图片描述

四、使用问题说明

4.1、MQTT EMQX中如何监听客户端上下线?

  • 1、官方提供了三种方案,但是最后一种是企业版本的要收费在这里插入图片描述
  • 2、订阅相关的 $SYS 主题
  •                                     S                            Y                            S                            /                            b                            r                            o                            k                            e                            r                            s                            /                                  SYS/brokers/                     SYS/brokers/{node}/clients/${clientid}/connected
    
  •                                     S                            Y                            S                            /                            b                            r                            o                            k                            e                            r                            s                            /                                  SYS/brokers/                     SYS/brokers/{node}/clients/${clientid}/disconnected
    
  • 3、可以直接使用web_hook来实现 参考文章 MQTT EMQX中如何监听客户端上下线?,但是这个文章中说web_hook没有问题是不对的,比如,我们提供的web_hook接口的服务需要重启,而重启这段时间内,其他客户端上下线的数据,我们就拿不到了。在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

4.2、emqx开源版服务重启后主题和消息会丢失(无法持久化离线消息)

1、意思就是,消息已经发布到

emqx

服务器了,但是此时接收者不在线,而我们重启emqx服务后,再让接收者上线,前面那一部分数据,接收者是收不到的,也就是丢失了。 文章说明参考 基于emq x开源版实现服务重启后主题和消息恢复的完整方案(二),开源版本不持支数据持久化

在这里插入图片描述

2、付费版本是提供了很多方式的数据持久化的,参考官方文章EMQX 插件持久化系列 (五)MySQL MQTT 数据存储

注意这里的消息持久化,是指普通消息重启之后也会在(即离线消息),官方有提供一个插件,

emqx_retainer

,但是这个只是持久化

保留消息的

,每次重启都会发给订阅者 ,关于

保留消息

的概念,可以参考 官方文档 保留消息

**大家要区分出来

离线消息,消息未送到订阅端,保存在emqx服务器中,但是emqx服务重启了

保留消息(只有保存最新的一次消息)

的区别**,下面这个配置是配置保留消息的持久化的,所以和持久化离线消息是没有的。

在这里插入图片描述

4.3、客户端断线自动重连

1、这篇文章说的很不错,参考链接 SpringBoot 开发之 MQTT 协议客户端断线后自动重连与保留断线时发布的主题消息,可以直接看这篇,下面的就是它的解决方案。

2、如果有一个客户端程序已经用emqx很久了,突然emqx服务需要重启,那么这个客户端程序就会抛出一个异常,说被迫断开了一个链接,即使emqx服务重启好了之后,这个客户端程序还是不可以使用,除非你重启这个客户端程序,再次重连,所以我们需要做的是,不重启客户都安程序,当emqx服务OK的时候,自动重连。

  • 设置参数AutomaticReconnecttrue但是 使用 automaticReconnect 为 true 表示断线自动重连,但仅仅只是重新连接,并不订阅主题;
  • 所以还要在链接完成后,再次订阅主题
@OverridepublicvoidconnectComplete(boolean b,String s){// 客户端连接成功CodeUtils.info("[MQTT] 连接成功,重新订阅主题...");try{
        client.subscribe(topic,QOS);}catch(MqttException e){
        e.printStackTrace();}}

上面代码等同于,我们自己在链接成功的时候去订阅主题,

主题可以重复订阅
@OverridepublicvoidconnectComplete(boolean reconnect,String serverURI){
        log.info("platform端 链接成功,是否是reconnect的结果: {},serverURI: {}", reconnect, serverURI);// 以testtopic/#结尾表示订阅所有以testtopic开头的主题// 手动订阅之后,messageArrived() 方法就收不到消息了,都在这里
        mqttPlatformClient.subscribe(TopicConstant.UPPER_MACHINE_2_PLATFORM,2,newIMqttMessageListener(){@OverridepublicvoidmessageArrived(String topic,MqttMessage message)throwsException{System.out.println("platform端 订阅消息回调函数topic: "+ topic +" ,msg: "+ message.toString());}});}

两种方式二选一即可。

4.4、客户端掉线再上线,获取掉线期间的数据

1、我们假设emqx服务是🆗的,那么如果一个客户端服务,需要重启更新版本,那么,我希望更新版本之后,这个客户端还能收到掉线这段时间的数据,这个很简单,emqx服务已经实现了,需要设置以下两点

  • 重连后接收端的clientId不变
  • 且设置参数clearSession为false

2、但是EMQX服务保存的数据量也是有配置的,这里说的是将离线消息保存在内存中,即EMQX服务没有重启(开源版本不支持离线消息持久化,前面说了),如下图,我们知道,离线保存消息在内存中(也就是飞行窗口和消息队列)只会保存5分钟,且最大也才1000条,修改参数,可以在全局参数文件

emqx.conf

中修改

在这里插入图片描述

3、修改时间参数和队列存储长度的参数,可以看 4.5小节,里面举例就是按照这个参数举例的。

# 队列长度,默认1000,设置为0不限制
zone.external.max_mqueue_len  =1000# 会话默认超时时间, 这个其实就是在离线消息的保存时间 2h = 120min = 7200s
zone.external.session_expiry_interval = 2h

# 注意并不是 zone.external.await_rel_timeout 这个参数

4、其实我们搭建完成

Dashboard

后,也是能看出来一些的。
在这里插入图片描述
在这里插入图片描述

4.4.1、验证离线消息默认队列长度

1、 我们使用默认的配置,不修改参数

zone.external.max_mqueue_len

zone.external.session_expiry_interval

,直接通过

EmqxPlatformApplication :7093/

应用向队列里面发送2w条消息

这个实验得在2h内完成,因为我们没有修改

zone.external.session_expiry_interval

参数

2、 为了方便观察,我们使用控制台,监控该队列的数据流入流出情况,目前为止都是 0

在这里插入图片描述
3、 该图是mock数据的接口,我们直接请求该接口,传参为 20000

在这里插入图片描述
4、 发送结束,我们看到监控台,已经流入了2w的数据了,因为消费应用

EmqxUpperMachineApplication

,还没有启动,所以这些消息,会变为离线消息
在这里插入图片描述
在这里插入图片描述

5、启动消费客户端发现只有 19000-20000的数据,其他数据丢失
在这里插入图片描述
在这里插入图片描述

6、我们观察日志可以看到,日志中写了,队列已经满了,所以放不下,我们计算下数据的条数就知道,从第五行开始,加上19000,就是说会到19004行,正好 19000条被遗弃掉,所以队列长度,默认为1000

2022-12-09T10:09:27.513000+08:00 [warning] [email protected]:53739 [Session] Dropped msg due to mqueue is full: Message(Id=0005EF5BA46867AAF4420000020B0001, QoS=2, Topic=test/thermal/platform_2_upper_machine, From=<<"platform">>, Flags=[], Headers=#{peerhost => {127,0,0,1}, properties => #{},proto_ver => 3,protocol => mqtt,username => <<"thermal">>})

在这里插入图片描述
在这里插入图片描述

7、所以,如果不限制的话,可以设置为0,则20000条数据就都会在,修改之后,我们也能在控制台看到
在这里插入图片描述

4.4.2、验证离线消息保留时长

1、我们使用默认的配置,不修改参数

zone.external.max_mqueue_len

,修改

zone.external.session_expiry_interval

设置为 20分钟,控制台页面也能看到修改的参数值

# 为啥设置那么长时间,是为了和其他参数区分开来,更好的验证,我们过去20分钟就去验证即可。
zone.external.session_expiry_interval = 20m

在这里插入图片描述
在这里插入图片描述

2、 直接通过

EmqxPlatformApplication :7093/

应用向队列里面发送10条消息即可,因为我们是验证已经存储的消息的保留时间,不需要太多数据

  • 监控如下.在这里插入图片描述
  • 过20多分钟后 启动客户端发现没有之前的10条数据
  • 查看日志发现已经丢弃

4.5、emqx的全局配置文件参数说明

1、这个很重要,我强烈建议你去读取一遍,了解一下有那些参数可以配置,其中有写参数非常重要,
EMQX- V4.4 官方参数文档地址

在这里插入图片描述
2、这里我举几个比较重要的参数,比如4.4小节中提到的飞行窗口的参数,看如下图,可得知,离线消息先到飞行窗口里面,再到meassge_queue中,两个的参数分别为

zone.内部/外部.max_inflight

max_mqueue_len

,并且这些离线数据存储也是有时限的,第四小节中的那张图片中有说明,其实那个就是

EMQX

的默认配置

含义解释如下:在这里插入图片描述

  • 就可以知道max_mqueue_len默认值是1000,要是希望不限制,则改为0即可,如果你设置为1000,但是有3000条消息,那么这个队列里面只会保留2000-3000这个一千条最新的数据。1到1999的数据会收不到,以上面的参数,设置离线消息的保留时间。

在这里插入图片描述

  • 内部max_inflight
  • 在这里插入图片描述
  • 外部max_inflight在这里插入图片描述

4.6、docker 部署后即使挂载了日志目录,还是没有日志

1、官方说明,如果版本大约4.3.3,还使用docker部署,则自能通过 docker logs 查看,或者如下配置,官方说明 控制日志输出

在这里插入图片描述

4.7、emqx 会丢弃消息

1、接受队列的客户端一直不在,则会丢弃消息,举一个例子,如下,现有客户端

  • platform
  • upperMachine

2、有队列

platform_2_upper_machine

,则

platform

上线,向队列

platform_2_upper_machine

发生消息,但是客户端

upperMachine

没有上线过或者上线过后,session已经被清除了

什么时候被清除,就是前面说的那个参数

zone.external.session_expiry_interval

,下面这个图设置的值是一天,也就是86400秒。这个参数确实可以是保存离线消息的时间,但是提前是你之前有登录过emqx服务器,即下面的记录中有你,也就是在

zone.external.session_expiry_interval

时间内,存在,一旦这个时间期限一过,而你还是没有登录,则会自动清除,离线消息也就没有了

在这里插入图片描述
3、所以这个时候你向队列

platform_2_upper_machine

发送消息,会直接被遗弃,因为没有

zone.external.session_expiry_interval

时间范围内存活的客户端来消费这个队列。

在这里插入图片描述

五、生产环境建议修改的值

根据第四小节的说明,我们可以知道要修改EMQX的服务参数

  • 修改web_hook的地址,来监听客户端的上下线问题
  • 修改消息队列大小,不用修改飞行窗口的大小 ,修改参数zone.external.max _mqueue_len为0,表示不限制,注意是外部域(external)的参数
  • 修改离线消息的保存时间可以根据线上环境来考虑zone.external.session_expiry_interval
  • 还有一个部署时候的日志bug,如果版本大约4.3.3,还使用docker部署,则自能通过 docker logs 查看,或者如下配置,官方说明 控制日志输出

在这里插入图片描述

六、各个功能的账号密码说明

6.1、登录

dashboard

的账号密码

1、登录

dashboard

的账号密码,默认是admin/public,这个修改的方式很简单,直接到用户模块进行修改即可,这个修改的作用只是用于登录dashboard,无任何其他作用
在这里插入图片描述

6.2、客户端的账号密码(比如java程序)

1、这个看第三节

修改客户端链接授权

即可

6.3、使用HTTP API,的用户名和密码

1、这个官方也给出了说明 HTTP API,用户名和密码哪里配置?

在这里插入图片描述
在这里插入图片描述
2、添加完成之后密码就是密钥,会自动生成
在这里插入图片描述
3、比如现在我需要获取对应的客户端信息通过api接口,账号密码安装前面的生成即可.
在这里插入图片描述


本文转载自: https://blog.csdn.net/qq_38263083/article/details/127067222
版权归原作者 铛铛响 所有, 如有侵权,请联系我们删除。

“EMQX 安装使用和部分坑”的评论:

还没有评论