一、安装EMQX MQTT服务器
EMQX 是一个高性能、可扩展的物联网消息中间件,EMQX 实现了 MQTT 协议的服务器端,即 MQTT Broker,它负责接收来自客户端的连接请求,处理订阅、发布消息,并将消息转发给相应的订阅者。
下载EMQX
EMQX官方网站 https://www.emqx.com/zh/try?product=broker 提供了EMQX开源版软件包
但是5.3.2版本以后就没有提供Windows系统软件包,需要到这里下载https://www.emqx.com/zh/downloads/broker

解压

配置EMQX,EMQX的配置文件位于
etc\emqx.conf
,可以根据需要修改配置文件,例如更改端口号、添加插件等

启动EMQX,进入
\bin
目录下,启动cmd输入
emqx start

检查启动状态
emqx_ctl status

可以在任务管理器中看到erl.exe是否运行

访问EMQX Dashboard,启动EMQX 后,进入http://127.0.0.1:18083中,帐号admin,密码public
8083 MQTT/WebSocket 端口
18083 EMQX Dashboard 管理控制台端口
登录后右上角设置,可以切换成简体中文

右侧栏第一个菜单,可查看集群概览

右侧栏第二个菜单,选择客户端认证,创建客户端认证,选择内置数据库,默认参数配置




选择用户管理,添加用户账号:admin 密码:public



二、安装及配置MQTTX客户端
MQTTX 则是一个轻量级的 MQTT 客户端工具,主要用于测试和调试 MQTT 应用程序。
下载MQTTX
访问MQTTX的官方下载页面https://mqttx.app/zh

双击下载的exe文件,选择安装目录并点击完成


配置连接到EMQX
在MQTTX中点击“+”添加新的连接,输入连接名称,如“EMQX Server”,服务器地址填写为
localhost
或你的EMQX服务器的实际IP地址,端口默认为
1883
,如果EMQX配置了SSL/TLS,应使用
8883
,输入用户名和密码(如果EMQX配置了认证)

点击连接,可以开始订阅主题、发布消息等操作来测试MQTT通信。

在EMQX Dashboard可以看到客户端已经连接成功。
进入到WebSocket 客户端,可以进行订阅和发布的测试

三、EMQX目录结构
不同安装方式得到的 EMQ X 其目录结构会有所不同,具体如下:
描述使用 ZIP 压缩包安装使用二进制包安装Homebrew(MacOS)安装可执行文件目录
./bin
/usr/lib/emqx/bin
/usr/local/bin
数据文件
./data
/var/lib/emqx/data
/usr/local/Cellar/emqx/*/data
Erlang 虚拟机文件
./erts-*
/usr/lib/emqx/erts-*
/usr/local/Cellar/emqx/*/erts-
配置文件目录
./etc
/etc/emqx/etc
/usr/local/Cellar/emqx/*/etc
依赖项目录
./lib
/usr/lib/emqx/lib
/usr/local/Cellar/emqx/*/lib
日志文件
./log
/var/log/emqx
/usr/local/Cellar/emqx/*/log
启动相关的脚本、schema 文件
./releases
/usr/lib/emqx/releases
/usr/local/Cellar/emqx/*/releases
以上目录中,用户经常接触与使用的是
bin
、
etc
、
data
、
log
目录。
bin 目录
emqx、emqx.cmd
EMQX 的可执行文件,具体使用可以查看 基本命令。
emqx_ctl、emqx_ctl.cmd
EMQX 管理命令的可执行文件,具体使用可以查看 管理命令 CLI。
etc 目录
EMQX 通过
etc
目录下配置文件进行设置,主要配置文件包括:
配置文件说明emqx.confEMQX 配置文件acl.confEMQX 默认 ACL 规则配置文件plugins/*.confEMQX 各类插件配置文件certsEMQX SSL 证书文件emqx.licLicense 文件仅限 EMQX Enterprise
EMQX 具体的配置内容可以查看 配置项。
data 目录
EMQX 将运行数据存储在
data
目录下,主要的文件包括:
configs/app.*.config
EMQX 读取
etc/emqx.conf
和
etc/plugins/*.conf
中的配置后,转换为 Erlang 原生配置文件格式,并在运行时读取其中的配置。
loaded_plugins
loaded_plugins
文件记录了 EMQ X 默认启动的插件列表,可以修改此文件以增删默认启动的插件。
loaded_plugins
中启动项格式为
{<Plugin Name>, <Enabled>}.
,
<Enabled>
字段为布尔类型,EMQX 会在启动时根据
<Enabled>
的值判断是否需要启动该插件。关于插件的更多内容,请查看 插件。
$ cat loaded_plugins
{emqx_management,true}.
{emqx_recon,true}.
{emqx_retainer,true}.
{emqx_dashboard,true}.
{emqx_rule_engine,true}.
{emqx_bridge_mqtt,false}.
Mnesia
Mnesia 数据库是 Erlang 内置的一个分布式 DBMS,可以直接存储 Erlang 的各种数据结构。
EMQX 使用 Mnesia 数据库存储自身运行数据,例如告警记录、规则引擎已创建的资源和规则、Dashbaord 用户信息等数据,这些数据都将被存储在
mnesia
目录下,因此一旦删除该目录,将导致 EMQX 丢失所有业务数据。
可以通过
emqx_ctl mnesia
命令查询 EMQX 中 Mnesia 数据库的系统信息,具体请查看 管理命令 CLI。
log 目录
emqx.log.*
EMQX 运行时产生的日志文件,具体请查看 日志与追踪。
crash.dump
EMQX 的崩溃转储文件,可以通过
etc/emqx.conf
修改配置,具体内容可以查看 配置项。
erlang.log.*
以
emqx start
方式后台启动 EMQX 时,控制台日志的副本文件。
四、使用 Java SDK 连接
引用自:https://docs.emqx.com/zh/emqx/latest/connect-emqx/java.html
Eclipse Paho Java Client 是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android)。
Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 异步和同步 API。
通过 Maven 安装 Paho Java
通过包管理工具 Maven 可以方便地安装 Paho Java 客户端库,截止目前最新版本安装如下:
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version></dependency>
Paho Java 使用示例
Java 体系中 Paho Java 是比较稳定、广泛应用的 MQTT 客户端库,本示例包含 Java 语言的 Paho Java 连接 EMQX Broker,并进行消息收发完整代码:
App.java
packageio.emqx;importorg.eclipse.paho.client.mqttv3.MqttClient;importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.eclipse.paho.client.mqttv3.MqttException;importorg.eclipse.paho.client.mqttv3.MqttMessage;importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;publicclassApp{publicstaticvoidmain(String[] args){String subTopic ="testtopic/#";String pubTopic ="testtopic/1";String content ="Hello World";int qos =2;String broker ="tcp://broker.emqx.io:1883";String clientId ="emqx_test";MemoryPersistence persistence =newMemoryPersistence();try{MqttClient client =newMqttClient(broker, clientId, persistence);// MQTT 连接选项MqttConnectOptions connOpts =newMqttConnectOptions();
connOpts.setUserName("emqx_test");
connOpts.setPassword("emqx_test_password".toCharArray());// 保留会话
connOpts.setCleanSession(true);// 设置回调
client.setCallback(newPushCallback());// 建立连接System.out.println("Connecting to broker: "+ broker);
client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: "+ content);// 订阅
client.subscribe(subTopic);// 消息发布所需参数MqttMessage message =newMqttMessage(content.getBytes());
message.setQos(qos);
client.publish(pubTopic, message);System.out.println("Message published");
client.disconnect();System.out.println("Disconnected");
client.close();System.exit(0);}catch(MqttException me){System.out.println("reason "+ me.getReasonCode());System.out.println("msg "+ me.getMessage());System.out.println("loc "+ me.getLocalizedMessage());System.out.println("cause "+ me.getCause());System.out.println("excep "+ me);
me.printStackTrace();}}}
回调消息处理类 OnMessageCallback.java
packageio.emqx;importorg.eclipse.paho.client.mqttv3.IMqttDeliveryToken;importorg.eclipse.paho.client.mqttv3.MqttCallback;importorg.eclipse.paho.client.mqttv3.MqttMessage;publicclassOnMessageCallbackimplementsMqttCallback{publicvoidconnectionLost(Throwable cause){// 连接丢失后,一般在这里面进行重连System.out.println("连接断开,可以做重连");}publicvoidmessageArrived(String topic,MqttMessage message)throwsException{// subscribe后得到的消息会执行到这里面System.out.println("接收消息主题:"+ topic);System.out.println("接收消息Qos:"+ message.getQos());System.out.println("接收消息内容:"+newString(message.getPayload()));}publicvoiddeliveryComplete(IMqttDeliveryToken token){System.out.println("deliveryComplete---------"+ token.isComplete());}}
版权归原作者 Yeats_Liao 所有, 如有侵权,请联系我们删除。