0


springboot集成mqtt

文章目录


一、MQTT说明

1.1、mqtt文档

官网:https://mqtt.org/
仅供参考:https://www.emqx.com/zh/mqtt

1.2、MQTT消息服务质量

**

MQTT规定了3种消息等级

**

  1. QoS 0: 消息最多传递一次,不需要客户端给与回复,如果当时客户端不可用,则会丢失该消息。
  2. QoS 1: a、消息传递至少 1 次,发布者会发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内要收到接收者的应答,发布者若没收到应答,会将消息的 DUP 置为 1 并重发消息。 b、所以Qos 1消息级别取决于接受者在规定时间内给与发布者反馈,若没有反馈,则会再次接受到消息。
  3. QoS a、消息仅传送一次,发布者发布 QoS 为 2 的消息之后,会将发布的消息储存起来并等待接收者回复 PUBREC 的消息,发送者收到 PUBREC 消息后,它就可以安全丢弃掉之前的发布消息,因为它已经知道接收者成功收到了消息。 b、发布者会保存 PUBREC 消息并应答一个 PUBREL,等待接收者回复 PUBCOMP 消息,当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。 c、QoS 2 消息的核心是接收者给发布者反馈两次接收结果,相当于一次接收,一次确认接收

1.1.1、归纳

  1. QoS 0 消息只发一次,不在乎是否被别的客户端收到,只要发送了就算结束。
  2. QoS 1 消息需要消息接收者在规定时间内给予反馈,结束的标志是在发送后规定时间内收到反馈,否则就会一直发送。
  3. QoS 2 消息需要发送者和接收者双方互相进行消息确认,只要有一方没有确定就不会结束。

二、MQTT环境搭建

有2种方式
1、原生mqtt
2、rabbitmq的mqtt插件


第一种:centos、Ubuntu 安装mqtt和使用https://blog.csdn.net/qq_44413835/article/details/120606097

**

mqtt客户端下载

:**
我是使用MQTTBox: https://dl.pconline.com.cn/download/1323304.html

mqttx下载:https://mqttx.app/zh

第二种:安装rabbitmq在开启mqtt插件-好处rabbitmq有web管理平台

注明:如果不会使用rabbitmq查看我的消息队列的专栏,里面有集成篇

docker安装rabbitmq
https://blog.csdn.net/qq_44413835/article/details/123648048

进入docker-rabbitmq容器

docker exec  -it rabbitmq  /bin/bash

安装后开启mqq插件

# 打开rabbitmq_mqtt
rabbitmq-plugins enable rabbitmq_mqtt
#打开rabbitmq_web_mqtt
rabbitmq-plugins enable rabbitmq_web_mqtt

如图:
在这里插入图片描述
在这里插入图片描述

三、boot集成原生mqtt

1.1、项目结构

版本boot:2.3.6.RELEASE、web工程
在这里插入图片描述

1.2、依赖

<!--集成MQTT--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!--开启流支持--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><!--gson序列化工具--><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId></dependency><!--lombok依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--Swagger-UI API文档生产工具--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.7.0</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.7.0</version></dependency><!--健康检查--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--SpringBoot配置处理器,自定义配置项--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency>

1.3、application.properties配置

spring.application.name=mqtt_demo
server.port=8080
# --------------mqtt配置-----------------------------# 默认接受消息的主题--指定多个多级主题【物联网数据主题、对话主题-聊天室】
mqtt.receiver.defaultTopic=receive_iot_topic/#,receive_chat_topic/## 默认发送消息的主题
mqtt.sender.defaultTopic=test_send
# mqtt发送者的id
mqtt.sender.clientId=mqttProducer
# mqtt接收者的id-随机id来拼串
mqtt.receiver.clientId=${random.value}# 地址和用户名密码
mqtt.url=tcp://服务器ip地址:1883
mqtt.username=用户名
mqtt.password=密码

1.4、实体类

IotData

packagesqy.bean;importcom.google.gson.annotations.SerializedName;importorg.springframework.stereotype.Component;importjava.io.Serializable;importjava.util.Date;@ComponentpublicclassIotDataimplementsSerializable{@SerializedName("deviceid")String deviceid;//设备id@SerializedName("sensorid")String sensorid;//数据id@SerializedName("types")String types;//设备来源@SerializedName("loraid")String loraid;//loraid硬件的id@SerializedName("createtime")Date createtime;//创建时间@SerializedName("temp")float temp;//温度@SerializedName("humi")float humi;//湿度@SerializedName("light")float light;//光敏//get/set/tostring省略...

api响应的实体类

packagesqy.rvo;/**
 * @author suqinyi
 * @Date 2022/4/15
 * 通用返回对象
 */publicclassApiResult<T>{privatelong code;privateString message;privateT data;privatefinalstaticlong SUCCESS_CODE=1000;privatefinalstaticlong FAIL_CODE=2000;protectedApiResult(){}protectedApiResult(long code,String message,T data){this.code = code;this.message = message;this.data = data;}/**
     * 成功返回结果
     */publicstatic<T>ApiResult<T>success(T data,String message){returnnewApiResult<T>(SUCCESS_CODE, message, data);}/**
     * 失败返回结果
     */publicstatic<T>ApiResult<T>failed(String message){returnnewApiResult<T>(FAIL_CODE, message,null);}//get/set/tostring省略...}

1.5、mqtt配置类

MqttConfig

packagesqy.config.mqtt;importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.integration.annotation.ServiceActivator;importorg.springframework.integration.channel.DirectChannel;importorg.springframework.integration.core.MessageProducer;importorg.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;importorg.springframework.integration.mqtt.core.MqttPahoClientFactory;importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;importorg.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;importorg.springframework.integration.mqtt.support.DefaultPahoMessageConverter;importorg.springframework.messaging.MessageChannel;importorg.springframework.messaging.MessageHandler;importorg.springframework.util.StringUtils;importsqy.service.mqtt.MqttCaseServiceImpl;/**
 * @author suqinyi
 * @Date 2022/4/15
 * mqtt的配置类
 */@ConfigurationpublicclassMqttConfig{/**
     * 发布的bean名称
     */publicstaticfinalString CHANNEL_NAME_OUT ="mqttOutboundChannel";publicstaticfinalString CHANNEL_NAME_IN ="mqttInboundChannel";// 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息privatestaticfinalbyte[] WILL_DATA;static{
        WILL_DATA ="offline".getBytes();}@Value("${mqtt.username}")privateString username;@Value("${mqtt.password}")privateString password;@Value("${mqtt.url}")privateString url;@Value("${mqtt.sender.clientId}")privateString clientsId;@Value("${mqtt.sender.defaultTopic}")privateString defaultsTopic;@Value("${mqtt.receiver.clientId}")privateString clientcId;@Value("${mqtt.receiver.defaultTopic}")privateString defaultcTopic;/**
     * MQTT连接器选项
     */@BeanpublicMqttConnectOptionsgetSenderMqttConnectOptions(){MqttConnectOptions options =newMqttConnectOptions();// 设置连接的用户名System.out.println(username);if(!username.trim().equals("")){
            options.setUserName(username);}// 设置连接的密码
        options.setPassword(password.toCharArray());// 设置连接的地址
        options.setServerURIs(newString[]{url});// 设置超时时间 单位为秒
        options.setConnectionTimeout(100);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线// 但这个方法并没有重连的机制
        options.setKeepAliveInterval(30);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        options.setWill("willTopic", WILL_DATA,2,false);return options;}/**
     * MQTT客户端
     */@BeanpublicMqttPahoClientFactorysenderMqttClientFactory(){DefaultMqttPahoClientFactory factory =newDefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getSenderMqttConnectOptions());return factory;}/**
     * MQTT消息处理器(生产者)
     */@Bean@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)publicMessageHandlermqttOutbound(){MqttPahoMessageHandler messageHandler =newMqttPahoMessageHandler(clientsId,senderMqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultsTopic);return messageHandler;}/**
     * MQTT信息通道(生产者)
     */@Bean(name = CHANNEL_NAME_OUT)publicMessageChannelmqttOutboundChannel(){DirectChannel channel =newDirectChannel();return channel;}/**
     * MQTT信息通道(消费者)
     */@Bean(name = CHANNEL_NAME_IN)publicMessageChannelmqttInboundChannel(){DirectChannel channel =newDirectChannel();return channel;}/**
     * MQTT消息订阅绑定(消费者)
     */@BeanpublicMessageProducerinbound(){String[] receiverTopics =StringUtils.split(defaultcTopic,",");// 可以同时消费(订阅)多个TopicMqttPahoMessageDrivenChannelAdapter adapter =newMqttPahoMessageDrivenChannelAdapter("re"+ clientcId,senderMqttClientFactory(),
                        receiverTopics);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(newDefaultPahoMessageConverter());
        adapter.setQos(1);// 设置订阅通道
        adapter.setOutputChannel(mqttInboundChannel());return adapter;}/**
     * MQTT消息处理器(消费者)
     */@Bean@ServiceActivator(inputChannel = CHANNEL_NAME_IN)publicMessageHandlerhandler(){MqttCaseServiceImpl service =newMqttCaseServiceImpl();return service;}}

1.6、mqtt发布接口

IMqttSender

packagesqy.mqtt;importorg.springframework.integration.annotation.MessagingGateway;importorg.springframework.integration.mqtt.support.MqttHeaders;importorg.springframework.messaging.handler.annotation.Header;importorg.springframework.stereotype.Component;importsqy.config.mqtt.MqttConfig;/**
 * MQTT生产者消息发送接口
 * 通过接口将数据传递到集成流
 */@Component@MessagingGateway(defaultRequestChannel =MqttConfig.CHANNEL_NAME_OUT)publicinterfaceIMqttSender{/**
     * 发送信息到MQTT服务器
     *
     * @param data 发送的文本
     */voidsendToMqtt(String data);/**
     * 发送信息到MQTT服务器
     *
     * @param topic   主题
     * @param payload 消息主体
     */voidsendToMqtt(@Header(MqttHeaders.TOPIC)String topic,String payload);/**
     * 发送信息到MQTT服务器
     *
     * @param topic   主题
     * @param qos     对消息处理的几种机制。
     *                0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
     *                1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
     *                2 多了一次去重的动作,确保订阅者收到的消息有一次。
     * @param payload 消息主体
     */voidsendToMqtt(@Header(MqttHeaders.TOPIC)String topic,@Header(MqttHeaders.QOS)int qos,String payload);}

1.7、mqtt接收消息

MqttCaseServiceImpl

packagesqy.service.mqtt;importcom.google.gson.Gson;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.messaging.Message;importorg.springframework.messaging.MessageHandler;importorg.springframework.messaging.MessagingException;importorg.springframework.stereotype.Service;importsqy.bean.IotData;/**
 * MQTT接收消息
 */@ServicepublicclassMqttCaseServiceImplimplementsMessageHandler{/**
     * MessageHeaders:
     * public static final String PREFIX = "mqtt_";
     * public static final String QOS = "mqtt_qos";
     * public static final String ID = "mqtt_id";
     * public static final String RECEIVED_QOS = "mqtt_receivedQos";
     * public static final String DUPLICATE = "mqtt_duplicate";
     * public static final String RETAINED = "mqtt_retained";
     * public static final String RECEIVED_RETAINED = "mqtt_receivedRetained";
     * public static final String TOPIC = "mqtt_topic";
     * public static final String RECEIVED_TOPIC = "mqtt_receivedTopic";
     * public static final String MESSAGE_EXPIRY_INTERVAL = "mqtt_messageExpiryInterval";
     * public static final String TOPIC_ALIAS = "mqtt_topicAlias";
     * public static final String RESPONSE_TOPIC = "mqtt_responseTopic";
     * public static final String CORRELATION_DATA = "mqtt_correlationData";
     */@AutowiredGson gson;@OverridepublicvoidhandleMessage(Message<?> message)throwsMessagingException{String topic =(String) message.getHeaders().get("mqtt_receivedTopic");String payload =(String) message.getPayload();System.out.println("headers:"+ topic +" 接收的数据:"+ payload);if(topic.contains("receive_iot_topic")){System.out.println("硬件的信息的主题");IotData entity = gson.fromJson(payload,IotData.class);if(entity!=null){//不是心跳数据if(!entity.getTypes().equals("heartbeat")){//判断硬件来源if(entity.getTypes().equals("esp32")){System.out.println("来着esp32的数据");//写入数据库 ...}}}else{System.out.println("序列化失败");}}if(topic.contains("receive_chat_topic")){System.out.println("对话的主题");//...构建聊天室.....//...相互订阅发送消息就可以了....//...逻辑代码...}}}

1.8、集成Swagger2配置

Swagger2Config

packagesqy.config.swagger;importio.swagger.annotations.Api;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importspringfox.documentation.builders.ApiInfoBuilder;importspringfox.documentation.builders.PathSelectors;importspringfox.documentation.builders.RequestHandlerSelectors;importspringfox.documentation.service.ApiInfo;importspringfox.documentation.spi.DocumentationType;importspringfox.documentation.spring.web.plugins.Docket;importspringfox.documentation.swagger2.annotations.EnableSwagger2;/**
 * @author suqinyi
 * @Date 2022/4/15
 * Swagger2API文档的配置
 * http://localhost:8081/swagger-ui.html
 */@Configuration@EnableSwagger2publicclassSwagger2Config{@BeanpublicDocketcreateRestApi(){returnnewDocket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()//为当前包下controller生成API文档.apis(RequestHandlerSelectors.basePackage("sqy.controller"))//为有@Api注解的Controller生成API文档.apis(RequestHandlerSelectors.withClassAnnotation(Api.class)).paths(PathSelectors.any()).build();}privateApiInfoapiInfo(){returnnewApiInfoBuilder().title("SwaggerUI演示").description("mqtt-demo").contact("sqy").version("1.0").build();}}

1.9、mqtt测试类

MqttController

packagesqy.controller;importcom.google.gson.Gson;importio.swagger.annotations.Api;importio.swagger.annotations.ApiOperation;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.PostMapping;importorg.springframework.web.bind.annotation.RequestBody;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importsqy.bean.IotData;importsqy.mqtt.IMqttSender;importsqy.rvo.ApiResult;/**
 * @author suqinyi
 * @Date 2022/4/15
 * MQTT测试接口
 */@Api(tags ="MqttController", description ="MQTT测试接口")@RestController@RequestMapping("/mqtt")publicclassMqttController{@AutowiredIMqttSender mqttSender;@AutowiredGson gson;//这个是外面配置文件里面的设置的接收主题之一privatefinalstaticString SEND_TOPIC_PREFIX ="receive_iot_topic/";@ApiOperation("向指定主题发送消息")@PostMapping("/sendToTopic")publicApiResultsendToTopic(String topic,String payload){/**
         * 想接收方方法消息-主题:receive_iot_topic/#,receive_chat_topic/#
         */
        mqttSender.sendToMqtt(topic,payload);System.out.println("发送成功=>"+"主题:"+ topic +"  载荷:"+ payload);returnApiResult.success(null,"发送成功");}/**
     * 127.0.0.1:8081/mqtt/control_command
     * post、json
     * {
     *   "createtime": "2022-04-17T07:02:23.707Z",
     *   "deviceid": "001设备",
     *   "humi": 30,
     *   "light": 55,
     *   "loraid": "r001",
     *   "sensorid": "123456789",
     *   "temp": 100,
     *   "types": "esp32"
     * }
     */@ApiOperation("模拟硬件发送的数据或控制指令")@PostMapping("/control_command")publicApiResultcontrolCommand(@RequestBodyIotData iotData){String deviceId = iotData.getDeviceid();// 前缀 + 设备号String topic = SEND_TOPIC_PREFIX + deviceId;String payload=gson.toJson(iotData);
        mqttSender.sendToMqtt( topic,payload);System.out.println("发送成功=>"+"主题:"+ topic +"  载荷:"+ payload);returnApiResult.success(null,"发送成功");}}

1.10、测试效果

  1. 登入swagger测试:http://localhost:8081/swagger-ui.html#/
  2. 或用post测试

或者使用postman测试:

127.0.0.1:8081/mqtt/control_command

{"deviceid":"001设备","humi":30,"light":55,"loraid":"r001","sensorid":"123456789","temp":100,"types":"esp32","createtime":"2022-04-17T07:02:23.707Z"}

如图:

postman测试图:
在这里插入图片描述
后台打印
在这里插入图片描述
mqttbox订阅接收
在这里插入图片描述

swagger测试gif图:
在这里插入图片描述

标签: mqtt springboot

本文转载自: https://blog.csdn.net/qq_44413835/article/details/124249715
版权归原作者 suqinyi 所有, 如有侵权,请联系我们删除。

“springboot集成mqtt”的评论:

还没有评论