前言:
关于整合mqtt网上的教程很多,但大部分都是cv来cv去,中间的监听代码也没有讲清楚。教程也是很久之前的。所以决定自己来写一个教程。废话不多说直接开始教程。
本文只有教程,没有其他废话,如果需要请留言,后续更新下一版(包括主题消息的订阅方式改变,其他断线重连方式,EMQX的API对接-监听设备更加方便)。
Springboot整合mqtt采用注解进行监听(第二篇)
第一步安装 EMQX:
MQTT服务用的是EMQX,安装方式请搜索EMQX,移步他们官网。
官网地址:https://www.emqx.io/zh
测试工具请下载MQTTX
安装完成后默认管理地址:ip:18083 账户:admin 密码:public
第二步加入依赖:
<!-- MQTT相关配置 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
第三步加入配置:
在yml文件中加入以下配置
#MQTT客户端publish:mqtt:host: tcp://127.0.0.1:1883clientId: mqtt_publish
options:userName: GuoShun
password: qq1101165230
# 这里表示会话不过期cleanSession:false# 配置一个默认的主题,加载时不会用到,只能在需要时手动提取defaultTopic: devops
timeout:1000KeepAliveInterval:10#断线重连方式,自动重新连接与会话不过期配合使用会导致#断线重新连接后会接收到断线期间的消息。需要更改设置请看password联系我automaticReconnect:trueconnectionTimeout:3000# 最大链接数maxInflight:100
第四步创建一个MQTTConfigBuilder类,用来加载yml中的配置
/**
* @author by Guoshun
* @version 1.0.0
* @description mqtt配置类
* @date 2023/12/12 15:10
*/@Configuration@ConfigurationProperties(MQTTConfigBuilder.PREFIX)@DatapublicclassMQTTConfigBuilder{//配置的名称publicstaticfinalStringPREFIX="publish.mqtt";/**
* 服务端地址
*/privateString host;/**
* 客户端id
*/privateString clientId;/**
* 配置链接项
*/privateMqttConnectOptions options;}
第五步创建一个MQTTClientUtils
@Slf4j@ConfigurationpublicclassMQTTClientUtils{@AutowiredprivateMQTTConfigBuilder mqttConfig;privateMqttClient mqttClient;publicMQTTClientUtilscreateDevOpsMQTTClient(){this.createMQTTClient();returnthis;}privateMQTTClientUtilsconnect(){try{this.mqttClient.connect(mqttConfig.getOptions());
log.info("MQTTClient连接成功!");}catch(MqttException mqttException){
mqttException.printStackTrace();
log.error("MQTTClient连接失败!");}returnthis;}privateMqttClientcreateMQTTClient(){try{this.mqttClient =newMqttClient( mqttConfig.getHost(), mqttConfig.getClientId());
log.info("MQTTClient创建成功!");returnthis.mqttClient;}catch(MqttException exception){
exception.printStackTrace();
log.error("MQTTClient创建失败!");returnnull;}}/**
* 消息发送
* @param topicName
* @param message
* @return
*/publicbooleanpublish(String topicName,String message){
log.info("订阅主题名:{}, message:{}", topicName, message);MqttMessage mqttMessage =newMqttMessage(message.getBytes(StandardCharsets.UTF_8));try{this.mqttClient.publish(topicName, mqttMessage);returntrue;}catch(MqttException exception){
exception.printStackTrace();returnfalse;}}/**
* 消息发送 : retained 默认为 false
* "retained message" 指的是 Broker 会保留的最后一条发布到某个主题的消息。
* 当新的订阅者连接到该主题时,Broker 会将这条保留消息立即发送给订阅者,即使在订阅者订阅时该消息并未被重新发布。
* 这对于一些需要初始状态或者最后一次已知状态的应用场景非常有用。
* @param topicName
* @param message
* @param qos
* @return
*/publicbooleanpublish(String topicName,int qos,String message){
log.info("主题名:{}, qos:{}, message:{}", topicName, qos, message);MqttMessage mqttMessage =newMqttMessage(message.getBytes(StandardCharsets.UTF_8));try{this.mqttClient.publish(topicName, mqttMessage.getPayload(), qos,false);returntrue;}catch(MqttException exception){
exception.printStackTrace();returnfalse;}}/**
* 订阅某个主题
*
* @param topicName
* @param qos
*/publicvoidsubscribe(String topicName,int qos){
log.info("订阅主题名:{}, qos:{}", topicName, qos);try{this.mqttClient.subscribe(topicName, qos);}catch(MqttException e){
e.printStackTrace();}}/**
* 订阅某个主题
*
* @param topicName
* @param qos
*/publicvoidsubscribe(String topicName,int qos,IMqttMessageListener messageListener){
log.info("订阅主题名:{}, qos:{}, Listener类:{}", topicName, qos, messageListener.getClass());try{this.mqttClient.subscribe(topicName, qos, messageListener);}catch(MqttException e){
e.printStackTrace();}}/**
* 取消订阅主题
* @param topicName 主题名称
*/publicvoidcleanTopic(String topicName){
log.info("取消订阅主题名:{}", topicName);try{this.mqttClient.unsubscribe(topicName);}catch(MqttException e){
e.printStackTrace();}}//这里是初始化方法@PostConstructpublicvoidinitMqttClient(){//创建连接MQTTClientUtils mqttClientUtils =this.createDevOpsMQTTClient().connect();//这里主要是项目启动时订阅一些主题。看个人需要使用//mqttClientUtils.subscribe("test/#", 2, new HeartBeatListener());//MessageCallbackListener订阅主题,接受到该主题消息后交给MessageCallbackListener去处理
mqttClientUtils.subscribe("message/call/back",2,newMessageCallbackListener());//需要注意的是new MessageCallbackListener()虽然会接收到消息,但这么做不对。//举个简单列子:就是你有切面对MessageCallbackListener中重写的方法做一些其他操作,//那么接收到消息后该切面并不会生效,所以不建议这么做,以下是修改过后的。//@Resource//private MessageCallbackListener messageCallbackListener;//mqttClientUtils.subscribe("message/call/back", 2, messageCallbackListener);}}
使用方式创建一个MQTTService
可以MQTTClientUtils去调用方法,也可以将MQTTService 扩展开来,使用MQTTService 去调用方法
/**
* @author by Guoshun
* @version 1.0.0
* @description MQTT服务类,负责调用发送消息
* @date 2023/12/12 16:53
*/@ServicepublicclassMQTTService{@ResourceprivateMQTTClientUtils mqttClientUtils;/**
* 向主题发送消息
* @param topicName
* @param message
*/publicvoidsendMessage(String topicName,String message){
mqttClientUtils.publish(topicName, message);}/**
* 向主题发送消息
* @param topicName 主题名称
* @param qos qos
* @param message 具体消息
*/publicvoidsendMessage(String topicName,int qos,String message){
mqttClientUtils.publish(topicName, qos, message);}}
消息监听处理(简单实现)
/**
* @author by Guoshun
* @version 1.0.0
* @description 消息回调返回
* @date 2023/12/12 17:27
*/@ComponentpublicclassMessageCallbackListenerimplementsIMqttMessageListener{@OverridepublicvoidmessageArrived(String topic,MqttMessage message)throwsException{String messageBody =newString(message.getPayload(),StandardCharsets.UTF_8);System.out.println("收到消息:"+topic+", 消息内容是:"+ messageBody);}}
版权归原作者 陆慢慢 所有, 如有侵权,请联系我们删除。