从前,我向往远方,喜欢到处旅游🚶🚶🚶;后来,我追忆往事,最爱故地重游。每到一处,勾起故人往事,总能被人生的沧桑感所震撼。
文章目录
完整代码已上传Gitee
前言
MQTT协议是一种消息列队传输协议,采用订阅、发布机制,订阅者只接收自己已经订阅的数据,非订阅数据则不接收,既保证了必要的数据的交换,又避免了无效数据造成的储存与处理。因此在工业物联网中得到广泛的应用。
EMQX Cloud 是 EMQ 公司推出的一款面向物联网领域的 MQTT 消息中间件产品。作为全球首个全托管的 MQTT 5.0 公有云服务,EMQX Cloud 提供了一站式运维代管、独有隔离环境的 MQTT 消息服务。在万物互联的时代,EMQX Cloud 可以帮助您快速构建面向物联网领域的行业应用,轻松实现物联网数据的采集、传输、计算和持久化。
1. 物联网消息收发模型
1.1 双向通信
EMQX Cloud 支持海量设备及应用端连接,为应用程序及物联网设备提供安全可靠的双向通信能力:
在该模型中,EMQX Cloud 提供的 MQTT 服务将海量设备与应用连接起来,支持应用与设备间的双向通信,也支持设备与设备间的双向通信。该模型适用于有类即时通讯需求的物联网应用,比较典型的如:智能家居场景中,手机 APP 获取智能设备的状态信息,并且用户可以通过 APP 向智能设备发送控制指令。又如在工业场景中,AGV 机器人之间通过 MQTT 协议来进行即时通信,实现多机协作。EMQX Cloud 提供的 MQTT 服务不仅支持标准 MQTT 协议,也支持 MQTT over WebSocket,以及 CoAP、 MQTT-SN、LwM2M、JT/T808等协议,只需一个消息中间件即可满足多类终端同时接入的需求。
1.2 数据采集
EMQX Cloud 支持设备数据上云,通过海量 Topic 及数据集成的支持,低代码即可实现数据的采集、过滤、转换、计算及持久化。
在该模型中,EMQX Cloud 提供的 MQTT 服务可以实现数据的采集、计算和持久化。该模型适用于有数据采集和持久化需求的物联网应用,比较典型的如:在工业场景中,各个物联网传感器将实时采集的数据汇集到边缘网关,通过边缘网关将数据上传到 MQTT 服务器上,再由数据集成触发数据的过滤、转换和简单计算,并将最终结果转发至其他服务或持久化至目标数据库中。EMQX Cloud 提供了多种接入方案,涵盖了不同的网络条件、各种类型终端设备和边缘网关设备,支持70多种工业协议接入。
1.3 混合模型
EMQX Cloud 提供的 MQTT 服务支持双向通信和数据采集模型的混合应用。通过共享订阅、数据集成等能力,实现数据在物与物、物与应用间流转的同时进行持久化。
在该模型中,EMQX Cloud 提供的 MQTT 服务不仅为设备与设备、设备与应用间架起桥梁,同时可将需要的数据进行持久化,以便非实时应用在后续对获取的数据加以利用。比较典型的如一些人工智能应用,终端获取的数据需要发送至云端,通过云端运行的计算模型经过计算后即时反馈给终端,如物品或人脸识别应用。同时数据的副本需要持久化到数据库中,以便于后续离线训练和改进人工智能计算模型。
2. 服务部署
2.1 新建部署
首先在平台注册账号,新用户可以试用14天
新建部署:
新建项目,将部署服务以组的形式进行管理
新建项目完成后,可以将上边部署服务移动到所属项目下,方便管理。
2.2 添加认证
添加账号密码,连接认证的时候需要用到。
管理控制台:
3. 编码实践
案例采用
Java
整合
3.1 编写代码
pom
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--EMQX依赖--><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency></dependencies>
启动类:
/**
* @description: EMQX CLOUD-DEMO
* @author: yh
* @date: 2022/9/10
*/@SpringBootApplicationpublicclassExampleApplication{publicstaticvoidmain(String[] args){SpringApplication.run(ExampleApplication.class,args);}}
发布订阅:
/**
* CMQX CLOUD 发布订阅
*
* @author: yh
* @date: 2022/9/10
*/@RequestMapping(value ="/MqttClient")@RestControllerpublicclassMqttSample{MqttClient client =null;// 发布、订阅主题String topic ="test/topic";// 消息内容String content ="Hello World EMQ";// qos消息的服务质量可选值:0 1 2int qos =2;// EMQ 部署控制台的连接地址String broker ="tcp://q6dec0f4.cn-shenzhen.emqx.cloud:11578";String clientId =MqttClient.generateClientId();publicMqttSample(){// 持久化MemoryPersistence persistence =newMemoryPersistence();// MQTT 连接选项MqttConnectOptions connOpts =newMqttConnectOptions();// 设置认证信息,配置的账号 密码
connOpts.setUserName("exqcloud");
connOpts.setPassword("hello".toCharArray());try{
client =newMqttClient(broker, clientId, persistence);// 设置回调
client.setCallback(newSampleCallback());// 建立连接System.out.println("Connecting to broker: "+ broker);
client.connect(connOpts);System.out.println("Connected to broker: "+ broker);// 订阅 topic
client.subscribe(topic, qos);System.out.println("Subscribed to topic: "+ topic);}catch(Exception e){
e.printStackTrace();}}/**
* 消息发布
*
* @author: yh
* @date: 2022/9/10
*/@RequestMapping(value ="/send")publicvoidsend(){try{// 发布消息MqttMessage message =newMqttMessage(content.getBytes());
message.setQos(qos);//向服务器上的topic发布消息
client.publish(topic, message);System.out.println("Message published");// 断开连接//client.disconnect();//System.out.println("Disconnected");// 关闭客户端//client.close();//System.exit(0);}catch(MqttException me){System.out.println("reason "+ me.getReasonCode());System.out.println("msg "+ me.getMessage());System.out.println("loc "+ me.getLocalizedMessage());System.out.println("cause "+ me.getCause());System.out.println("excep "+ me);
me.printStackTrace();}}}
回调类:
/**
* 回调类
* @author: yh
* @date: 2022/9/10
*/publicclassSampleCallbackimplementsMqttCallback{/**
* 连接丢失
* @author: yh
* @date: 2022/9/10
*/@OverridepublicvoidconnectionLost(Throwable cause){System.out.println("连接断开:"+ cause.getMessage());}/**
* 收到消息
* @author: yh
* @date: 2022/9/10
*/@OverridepublicvoidmessageArrived(String topic,MqttMessage message){System.out.println("接收到消息-- topic:"+ topic +",Qos:"+ message.getQos()+", 内容:"+newString(message.getPayload()));}/**
* 消息传递成功
* @author: yh
* @date: 2022/9/10
*/@OverridepublicvoiddeliveryComplete(IMqttDeliveryToken token){System.out.println("消息发送成功!");}}
3.2 测试
启动程序
控制台连接数有点延迟,没出来的话多等几秒看看。
接下来就开始:编程第一步
Hello World
,访问 http://127.0.0.1:8080/MqttClient/send
通过控制台可以看到,消息被成功发布、消费,多发几次消息看看
稍等一段时间后,就可以在 部署→指标 中看到相关的数据统计
我这里连续着发送了10条消息
4. 在线调试
有时候消息的发布不是由我们自己来发布,我们只负责消费。这种场景下在开发阶段模拟一个发布者是非常必要的,通过控制台的在线调试功能就可以直接发布消息方便调试。
发送内容:
{"key":"hello","value":"你好"}
多试几次:
推荐一个项目 Spring整合常用组件
到此,本章内容就介绍完啦,如果有帮助到你 欢迎点个赞👍👍👍吧!!您的鼓励是博主的最大动力! 有问题评论区交流。
版权归原作者 鱼找水需要时间 所有, 如有侵权,请联系我们删除。