文章目录
一、MQTT说明
1.1、mqtt文档
官网:https://mqtt.org/
仅供参考:https://www.emqx.com/zh/mqtt
1.2、MQTT消息服务质量
**
MQTT规定了3种消息等级
**
- QoS 0: 消息
最多传递一次
,不需要客户端给与回复,如果当时客户端不可用,则会丢失该消息。 - QoS 1: a、消息传递
至少 1 次
,发布者会发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内要收到接收者的应答
,发布者若没收到应答
,会将消息的 DUP 置为 1 并重发消息
。 b、所以Qos 1消息级别取决于接受者在规定时间内给与发布者反馈,若没有反馈,则会再次接受到消息。 - QoS a、消息
仅传送一次
,发布者发布 QoS 为 2 的消息之后,会将发布的消息储存
起来并等待接收者回复
PUBREC 的消息,发送者收到 PUBREC 消息后,它就可以安全丢弃掉之前的发布消息,因为它已经知道接收者成功收到了消息。 b、发布者会保存 PUBREC 消息并应答一个 PUBREL,等待接收者回复 PUBCOMP 消息,当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。 c、QoS 2 消息的核心是接收者给发布者反馈两次接收结果,相当于一次接收,一次确认接收
。
1.1.1、归纳
- QoS 0 消息只发一次,不在乎是否被别的客户端收到,只要发送了就算结束。
- QoS 1 消息需要消息接收者在规定时间内给予反馈,结束的标志是在发送后规定时间内收到反馈,否则就会一直发送。
- QoS 2 消息需要发送者和接收者双方互相进行消息确认,只要有一方没有确定就不会结束。
二、MQTT环境搭建
有2种方式
1、原生mqtt
2、rabbitmq的mqtt插件
第一种:centos、Ubuntu 安装mqtt和使用https://blog.csdn.net/qq_44413835/article/details/120606097
**
mqtt客户端下载
:**
我是使用MQTTBox: https://dl.pconline.com.cn/download/1323304.html
mqttx下载:https://mqttx.app/zh
第二种:安装rabbitmq在开启mqtt插件-好处rabbitmq有web管理平台
注明:如果不会使用rabbitmq查看我的消息队列的专栏,里面有集成篇
docker安装rabbitmq
https://blog.csdn.net/qq_44413835/article/details/123648048
进入docker-rabbitmq容器
docker exec -it rabbitmq /bin/bash
安装后开启mqq插件
# 打开rabbitmq_mqtt
rabbitmq-plugins enable rabbitmq_mqtt
#打开rabbitmq_web_mqtt
rabbitmq-plugins enable rabbitmq_web_mqtt
如图:
三、boot集成原生mqtt
1.1、项目结构
版本boot:2.3.6.RELEASE、web工程
1.2、依赖
<!--集成MQTT--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!--开启流支持--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><!--gson序列化工具--><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId></dependency><!--lombok依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--Swagger-UI API文档生产工具--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.7.0</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.7.0</version></dependency><!--健康检查--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--SpringBoot配置处理器,自定义配置项--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency>
1.3、application.properties配置
spring.application.name=mqtt_demo
server.port=8080
# --------------mqtt配置-----------------------------# 默认接受消息的主题--指定多个多级主题【物联网数据主题、对话主题-聊天室】
mqtt.receiver.defaultTopic=receive_iot_topic/#,receive_chat_topic/## 默认发送消息的主题
mqtt.sender.defaultTopic=test_send
# mqtt发送者的id
mqtt.sender.clientId=mqttProducer
# mqtt接收者的id-随机id来拼串
mqtt.receiver.clientId=${random.value}# 地址和用户名密码
mqtt.url=tcp://服务器ip地址:1883
mqtt.username=用户名
mqtt.password=密码
1.4、实体类
IotData
packagesqy.bean;importcom.google.gson.annotations.SerializedName;importorg.springframework.stereotype.Component;importjava.io.Serializable;importjava.util.Date;@ComponentpublicclassIotDataimplementsSerializable{@SerializedName("deviceid")String deviceid;//设备id@SerializedName("sensorid")String sensorid;//数据id@SerializedName("types")String types;//设备来源@SerializedName("loraid")String loraid;//loraid硬件的id@SerializedName("createtime")Date createtime;//创建时间@SerializedName("temp")float temp;//温度@SerializedName("humi")float humi;//湿度@SerializedName("light")float light;//光敏//get/set/tostring省略...
api响应的实体类
packagesqy.rvo;/**
* @author suqinyi
* @Date 2022/4/15
* 通用返回对象
*/publicclassApiResult<T>{privatelong code;privateString message;privateT data;privatefinalstaticlong SUCCESS_CODE=1000;privatefinalstaticlong FAIL_CODE=2000;protectedApiResult(){}protectedApiResult(long code,String message,T data){this.code = code;this.message = message;this.data = data;}/**
* 成功返回结果
*/publicstatic<T>ApiResult<T>success(T data,String message){returnnewApiResult<T>(SUCCESS_CODE, message, data);}/**
* 失败返回结果
*/publicstatic<T>ApiResult<T>failed(String message){returnnewApiResult<T>(FAIL_CODE, message,null);}//get/set/tostring省略...}
1.5、mqtt配置类
MqttConfig
packagesqy.config.mqtt;importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.integration.annotation.ServiceActivator;importorg.springframework.integration.channel.DirectChannel;importorg.springframework.integration.core.MessageProducer;importorg.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;importorg.springframework.integration.mqtt.core.MqttPahoClientFactory;importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;importorg.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;importorg.springframework.integration.mqtt.support.DefaultPahoMessageConverter;importorg.springframework.messaging.MessageChannel;importorg.springframework.messaging.MessageHandler;importorg.springframework.util.StringUtils;importsqy.service.mqtt.MqttCaseServiceImpl;/**
* @author suqinyi
* @Date 2022/4/15
* mqtt的配置类
*/@ConfigurationpublicclassMqttConfig{/**
* 发布的bean名称
*/publicstaticfinalString CHANNEL_NAME_OUT ="mqttOutboundChannel";publicstaticfinalString CHANNEL_NAME_IN ="mqttInboundChannel";// 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息privatestaticfinalbyte[] WILL_DATA;static{
WILL_DATA ="offline".getBytes();}@Value("${mqtt.username}")privateString username;@Value("${mqtt.password}")privateString password;@Value("${mqtt.url}")privateString url;@Value("${mqtt.sender.clientId}")privateString clientsId;@Value("${mqtt.sender.defaultTopic}")privateString defaultsTopic;@Value("${mqtt.receiver.clientId}")privateString clientcId;@Value("${mqtt.receiver.defaultTopic}")privateString defaultcTopic;/**
* MQTT连接器选项
*/@BeanpublicMqttConnectOptionsgetSenderMqttConnectOptions(){MqttConnectOptions options =newMqttConnectOptions();// 设置连接的用户名System.out.println(username);if(!username.trim().equals("")){
options.setUserName(username);}// 设置连接的密码
options.setPassword(password.toCharArray());// 设置连接的地址
options.setServerURIs(newString[]{url});// 设置超时时间 单位为秒
options.setConnectionTimeout(100);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线// 但这个方法并没有重连的机制
options.setKeepAliveInterval(30);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
options.setWill("willTopic", WILL_DATA,2,false);return options;}/**
* MQTT客户端
*/@BeanpublicMqttPahoClientFactorysenderMqttClientFactory(){DefaultMqttPahoClientFactory factory =newDefaultMqttPahoClientFactory();
factory.setConnectionOptions(getSenderMqttConnectOptions());return factory;}/**
* MQTT消息处理器(生产者)
*/@Bean@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)publicMessageHandlermqttOutbound(){MqttPahoMessageHandler messageHandler =newMqttPahoMessageHandler(clientsId,senderMqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultsTopic);return messageHandler;}/**
* MQTT信息通道(生产者)
*/@Bean(name = CHANNEL_NAME_OUT)publicMessageChannelmqttOutboundChannel(){DirectChannel channel =newDirectChannel();return channel;}/**
* MQTT信息通道(消费者)
*/@Bean(name = CHANNEL_NAME_IN)publicMessageChannelmqttInboundChannel(){DirectChannel channel =newDirectChannel();return channel;}/**
* MQTT消息订阅绑定(消费者)
*/@BeanpublicMessageProducerinbound(){String[] receiverTopics =StringUtils.split(defaultcTopic,",");// 可以同时消费(订阅)多个TopicMqttPahoMessageDrivenChannelAdapter adapter =newMqttPahoMessageDrivenChannelAdapter("re"+ clientcId,senderMqttClientFactory(),
receiverTopics);
adapter.setCompletionTimeout(5000);
adapter.setConverter(newDefaultPahoMessageConverter());
adapter.setQos(1);// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());return adapter;}/**
* MQTT消息处理器(消费者)
*/@Bean@ServiceActivator(inputChannel = CHANNEL_NAME_IN)publicMessageHandlerhandler(){MqttCaseServiceImpl service =newMqttCaseServiceImpl();return service;}}
1.6、mqtt发布接口
IMqttSender
packagesqy.mqtt;importorg.springframework.integration.annotation.MessagingGateway;importorg.springframework.integration.mqtt.support.MqttHeaders;importorg.springframework.messaging.handler.annotation.Header;importorg.springframework.stereotype.Component;importsqy.config.mqtt.MqttConfig;/**
* MQTT生产者消息发送接口
* 通过接口将数据传递到集成流
*/@Component@MessagingGateway(defaultRequestChannel =MqttConfig.CHANNEL_NAME_OUT)publicinterfaceIMqttSender{/**
* 发送信息到MQTT服务器
*
* @param data 发送的文本
*/voidsendToMqtt(String data);/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/voidsendToMqtt(@Header(MqttHeaders.TOPIC)String topic,String payload);/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param qos 对消息处理的几种机制。
* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息主体
*/voidsendToMqtt(@Header(MqttHeaders.TOPIC)String topic,@Header(MqttHeaders.QOS)int qos,String payload);}
1.7、mqtt接收消息
MqttCaseServiceImpl
packagesqy.service.mqtt;importcom.google.gson.Gson;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.messaging.Message;importorg.springframework.messaging.MessageHandler;importorg.springframework.messaging.MessagingException;importorg.springframework.stereotype.Service;importsqy.bean.IotData;/**
* MQTT接收消息
*/@ServicepublicclassMqttCaseServiceImplimplementsMessageHandler{/**
* MessageHeaders:
* public static final String PREFIX = "mqtt_";
* public static final String QOS = "mqtt_qos";
* public static final String ID = "mqtt_id";
* public static final String RECEIVED_QOS = "mqtt_receivedQos";
* public static final String DUPLICATE = "mqtt_duplicate";
* public static final String RETAINED = "mqtt_retained";
* public static final String RECEIVED_RETAINED = "mqtt_receivedRetained";
* public static final String TOPIC = "mqtt_topic";
* public static final String RECEIVED_TOPIC = "mqtt_receivedTopic";
* public static final String MESSAGE_EXPIRY_INTERVAL = "mqtt_messageExpiryInterval";
* public static final String TOPIC_ALIAS = "mqtt_topicAlias";
* public static final String RESPONSE_TOPIC = "mqtt_responseTopic";
* public static final String CORRELATION_DATA = "mqtt_correlationData";
*/@AutowiredGson gson;@OverridepublicvoidhandleMessage(Message<?> message)throwsMessagingException{String topic =(String) message.getHeaders().get("mqtt_receivedTopic");String payload =(String) message.getPayload();System.out.println("headers:"+ topic +" 接收的数据:"+ payload);if(topic.contains("receive_iot_topic")){System.out.println("硬件的信息的主题");IotData entity = gson.fromJson(payload,IotData.class);if(entity!=null){//不是心跳数据if(!entity.getTypes().equals("heartbeat")){//判断硬件来源if(entity.getTypes().equals("esp32")){System.out.println("来着esp32的数据");//写入数据库 ...}}}else{System.out.println("序列化失败");}}if(topic.contains("receive_chat_topic")){System.out.println("对话的主题");//...构建聊天室.....//...相互订阅发送消息就可以了....//...逻辑代码...}}}
1.8、集成Swagger2配置
Swagger2Config
packagesqy.config.swagger;importio.swagger.annotations.Api;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importspringfox.documentation.builders.ApiInfoBuilder;importspringfox.documentation.builders.PathSelectors;importspringfox.documentation.builders.RequestHandlerSelectors;importspringfox.documentation.service.ApiInfo;importspringfox.documentation.spi.DocumentationType;importspringfox.documentation.spring.web.plugins.Docket;importspringfox.documentation.swagger2.annotations.EnableSwagger2;/**
* @author suqinyi
* @Date 2022/4/15
* Swagger2API文档的配置
* http://localhost:8081/swagger-ui.html
*/@Configuration@EnableSwagger2publicclassSwagger2Config{@BeanpublicDocketcreateRestApi(){returnnewDocket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()//为当前包下controller生成API文档.apis(RequestHandlerSelectors.basePackage("sqy.controller"))//为有@Api注解的Controller生成API文档.apis(RequestHandlerSelectors.withClassAnnotation(Api.class)).paths(PathSelectors.any()).build();}privateApiInfoapiInfo(){returnnewApiInfoBuilder().title("SwaggerUI演示").description("mqtt-demo").contact("sqy").version("1.0").build();}}
1.9、mqtt测试类
MqttController
packagesqy.controller;importcom.google.gson.Gson;importio.swagger.annotations.Api;importio.swagger.annotations.ApiOperation;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.PostMapping;importorg.springframework.web.bind.annotation.RequestBody;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importsqy.bean.IotData;importsqy.mqtt.IMqttSender;importsqy.rvo.ApiResult;/**
* @author suqinyi
* @Date 2022/4/15
* MQTT测试接口
*/@Api(tags ="MqttController", description ="MQTT测试接口")@RestController@RequestMapping("/mqtt")publicclassMqttController{@AutowiredIMqttSender mqttSender;@AutowiredGson gson;//这个是外面配置文件里面的设置的接收主题之一privatefinalstaticString SEND_TOPIC_PREFIX ="receive_iot_topic/";@ApiOperation("向指定主题发送消息")@PostMapping("/sendToTopic")publicApiResultsendToTopic(String topic,String payload){/**
* 想接收方方法消息-主题:receive_iot_topic/#,receive_chat_topic/#
*/
mqttSender.sendToMqtt(topic,payload);System.out.println("发送成功=>"+"主题:"+ topic +" 载荷:"+ payload);returnApiResult.success(null,"发送成功");}/**
* 127.0.0.1:8081/mqtt/control_command
* post、json
* {
* "createtime": "2022-04-17T07:02:23.707Z",
* "deviceid": "001设备",
* "humi": 30,
* "light": 55,
* "loraid": "r001",
* "sensorid": "123456789",
* "temp": 100,
* "types": "esp32"
* }
*/@ApiOperation("模拟硬件发送的数据或控制指令")@PostMapping("/control_command")publicApiResultcontrolCommand(@RequestBodyIotData iotData){String deviceId = iotData.getDeviceid();// 前缀 + 设备号String topic = SEND_TOPIC_PREFIX + deviceId;String payload=gson.toJson(iotData);
mqttSender.sendToMqtt( topic,payload);System.out.println("发送成功=>"+"主题:"+ topic +" 载荷:"+ payload);returnApiResult.success(null,"发送成功");}}
1.10、测试效果
- 登入swagger测试:http://localhost:8081/swagger-ui.html#/
- 或用post测试
或者使用postman测试:
127.0.0.1:8081/mqtt/control_command
{"deviceid":"001设备","humi":30,"light":55,"loraid":"r001","sensorid":"123456789","temp":100,"types":"esp32","createtime":"2022-04-17T07:02:23.707Z"}
如图:
postman测试图:
后台打印
mqttbox订阅接收
swagger测试gif图:
版权归原作者 suqinyi 所有, 如有侵权,请联系我们删除。