文章目录
一、WINDOWS下搭建MQTT服务EMQX
1、下载服务
官网地址:
进入官网后下载window压缩包
2、 启动服务
解压zip文件,修改白名单。在etc目录下找到acl.conf文件。
在回到上一级目录,进入bin目录,在终端输入pushd + bin目录地址,例:
pushd D:\Environment\emqx\bin
启动服务,终端输入:
emqx.cmd start
打开浏览器输入:
localhost:18083
出现如下界面便是EMQX服务已经启动。
EMQX初始的用户名:admin 密码:public 。登录后设置中文页面。
二、测试工具
1、安装
官网地址:
安装过于简单,自行下载安装。
2、汉化及使用
添加一个连接
刷新管理端页面
三、整合SpringBoot
1、导入Maven依赖
<!-- mqtt依赖 start --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!-- mqtt依赖 end -->
2、配置
2.1、配置文件
# MQTT配置
mqtt:
# emqx服务器地址
host: tcp://127.0.0.1:1883
# 用户名
username: ces
# 密码
password: ces1
# 客户端id
clientId: service_${random.uuid}
# 定义连接超时时间 默认为10秒 如果未在属性中指定 则使用默认值
connectionTimeout:10
# 消息服务质量
qos:2
# 连接保持检查周期 秒
keepAliveInterval:20
# 开启自动重连
automaticReconnect:true
# 是否清除会话session
cleanSession:true
# 默认订阅主题
defaultSubscribeTopic: washingMachine/online/+,airConditioner/online/+
# 是否保留发布消息
retained:false
2.2、常量
packagenet.rakan.distributedservice.common.constant;importlombok.Data;importlombok.extern.slf4j.Slf4j;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Configuration;/**
* MQTT常量
* @author LiChangRui on 2024/4/21 16:53
*/@Data@Slf4j@Configuration@ConfigurationProperties(prefix ="mqtt")publicclassMqttConstants{/**
* emqx服务器地址
*/privateString host;/**
* 用户名
*/privateString username;/**
* 密码
*/privateString password;/**
* 客户端id
*/privateString clientId;/**
* 定义连接超时时间 默认为10秒 如果未在属性中指定 则使用默认值
*/privateint connectionTimeout;/**
* 消息服务质量
*/privateint qos;/**
* 连接保持检查周期 秒
*/privateint keepAliveInterval;/**
* 开启自动重连
*/privateBoolean automaticReconnect;/**
* 是否清除会话session
*/privateBoolean cleanSession;/**
* 默认订阅主题
*/privateString defaultSubscribeTopic;/**
* 是否保留发布消息
*/privateBoolean retained;}
3、MQTT服务类
packagenet.rakan.distributedservice.common.service;importlombok.extern.slf4j.Slf4j;importnet.rakan.distributedservice.common.callback.MqttCallback;importnet.rakan.distributedservice.common.constant.MqttConstants;importorg.eclipse.paho.client.mqttv3.MqttClient;importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.eclipse.paho.client.mqttv3.MqttException;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.util.Arrays;/**
* MQTT服务
* @author LiChangRui on 2024/4/21 17:03
*/@Slf4j@ComponentpublicclassMqttService{/**
* MQTT常量
*/@AutowiredprivateMqttConstants mqttConstants;/**
* MQTT连接
*/privateMqttClient mqttClient;privatevoidsetMqttClient(MqttClient mqttClient){this.mqttClient = mqttClient;}/**
* 推送消息
* @author LiChangRui on 2024/4/21 18:02
*/publicvoidpublish(String topic,String msg){try{
mqttClient.publish(topic, msg.getBytes(), mqttConstants.getQos(), mqttConstants.getRetained());}catch(MqttException e){
log.error("MQTT推送消息失败!");}}/**
* 订阅消息
* @author LiChangRui on 2024/4/21 21:54
*/publicvoidsubscribe(String topic){
log.info("开始订阅主题:"+ topic +"。");try{
mqttClient.subscribe(topic, mqttConstants.getQos());}catch(MqttException e){
log.error("MQTT订阅主题失败!");}}/**
* 订阅消息
* @author LiChangRui on 2024/4/21 21:54
*/publicvoidsubscribe(String[] topic){int[] qos =newint[topic.length];Arrays.fill(qos, mqttConstants.getQos());
log.info("开始订阅主题:"+String.join(",", topic)+"。");try{
mqttClient.subscribe(topic, qos);}catch(MqttException e){
log.error("MQTT订阅主题失败!");}}/**
* MQTT连接
* @author LiChangRui on 2024/4/22 9:48
*/publicvoidconnect(String host,String clientId,MqttCallback mqttCallback,MqttConnectOptions options){MqttClient mqttClient;try{// 设置连接参数
mqttClient =newMqttClient(host, clientId);// 设置回调
mqttClient.setCallback(mqttCallback);// 连接
mqttClient.connect(options);}catch(MqttException e){
log.error("连接失败:"+ e.getMessage()+"!");return;}this.setMqttClient(mqttClient);
log.info("连接成功!");}}
4、MQTT服务回调类
packagenet.rakan.distributedservice.common.callback;importlombok.extern.slf4j.Slf4j;importnet.rakan.distributedservice.common.constant.MqttConstants;importnet.rakan.distributedservice.common.service.MqttService;importorg.eclipse.paho.client.mqttv3.IMqttDeliveryToken;importorg.eclipse.paho.client.mqttv3.MqttCallbackExtended;importorg.eclipse.paho.client.mqttv3.MqttMessage;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Configuration;/**
* MQTT回调
* @author LiChangRui on 2024/4/21 17:03
*/@Slf4j@ConfigurationpublicclassMqttCallbackimplementsMqttCallbackExtended{/**
* MQTT常量
*/@AutowiredprivateMqttConstants mqttConstants;/**
* MQTT服务
*/@AutowiredprivateMqttService mqttService;/**
* 客户端断开后触发
* @author LiChangRui on 2024/4/21 18:19
*/@OverridepublicvoidconnectionLost(Throwable throwable){
log.info("客户端连接断开!");// 已经设置断线重新连接 所以这里不用写重连的逻辑 只需要写断开连接的业务逻辑}/**
* 客户端收到消息触发
* @author LiChangRui on 2024/4/21 18:21
*/@OverridepublicvoidmessageArrived(String topic,MqttMessage mqttMessage){
log.info("接收消息主题 : "+ topic);
log.info("接收消息Qos : "+ mqttMessage.getQos());
log.info("接收消息内容 : "+newString(mqttMessage.getPayload()));}/**
* 发布消息成功
* @author LiChangRui on 2024/4/21 18:23
*/@OverridepublicvoiddeliveryComplete(IMqttDeliveryToken iMqttDeliveryToken){String[] topics = iMqttDeliveryToken.getTopics();for(String topic : topics){
log.info("向主题:"+ topic +"发送消息成功!");}// try {// // 在消息被传递之前,正在传递的消息将被返回。一旦消息被传递,将返回null// MqttMessage message = iMqttDeliveryToken.getMessage();// byte[] payload = message.getPayload();// String s = new String(payload, StandardCharsets.UTF_8);// log.info("消息的内容是:" + s + "。");// } catch (MqttException e) {// log.error("获取发送消息内容失败!");// }}/**
* 客户端连接成功
* @author LiChangRui on 2024/4/21 18:23
*/@OverridepublicvoidconnectComplete(boolean reconnect,String serverURI){
log.info("客户端连接成功!");String[] topic = mqttConstants.getDefaultSubscribeTopic().split(",");
mqttService.subscribe(topic);}}
5、MQTT配置类
packagenet.rakan.distributedservice.common.config;importjakarta.annotation.PostConstruct;importlombok.extern.slf4j.Slf4j;importnet.rakan.distributedservice.common.callback.MqttCallback;importnet.rakan.distributedservice.common.constant.MqttConstants;importnet.rakan.distributedservice.common.service.MqttService;importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Configuration;/**
* MQTT配置
* @author LiChangRui on 2024/4/21 22:26
*/@Slf4j@ConfigurationpublicclassMqttConfig{/**
* MQTT常量
*/@AutowiredprivateMqttConstants mqttConstants;/**
* MQTT服务
*/@AutowiredprivateMqttService mqttService;/**
* MQTT回调
*/@AutowiredprivateMqttCallback mqttCallback;/**
* MQTT连接
* @author LiChangRui on 2024/4/22 9:51
*/@PostConstructpublicvoidmqttClient(){MqttConnectOptions options =newMqttConnectOptions();// 用户名
options.setUserName(mqttConstants.getUsername());// 密码
options.setPassword(mqttConstants.getPassword().toCharArray());// 设置连接超时时间
options.setConnectionTimeout(mqttConstants.getConnectionTimeout());// 开启自动重连
options.setAutomaticReconnect(mqttConstants.getAutomaticReconnect());// 是否清除会话session
options.setCleanSession(mqttConstants.getCleanSession());// 设置心跳间隔时间
options.setKeepAliveInterval(mqttConstants.getKeepAliveInterval());
mqttService.connect(mqttConstants.getHost(), mqttConstants.getClientId(), mqttCallback, options);}}
6、测试类
packagenet.rakan.distributedservice.testserver.controller;importio.swagger.v3.oas.annotations.Operation;importio.swagger.v3.oas.annotations.tags.Tag;importnet.rakan.distributedservice.common.dto.CancelOrderDTO;importnet.rakan.distributedservice.common.dto.PublishDTO;importnet.rakan.distributedservice.common.dto.SendMailDTO;importnet.rakan.distributedservice.common.service.MqttService;importnet.rakan.distributedservice.common.service.RabbitMqService;importnet.rakan.distributedservice.common.vo.Result;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.validation.annotation.Validated;importorg.springframework.web.bind.annotation.PostMapping;importorg.springframework.web.bind.annotation.RequestBody;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;/**
* 测试
* @author LiChangRui on 2024/3/15 10:14
*/@Tag(name ="测试")@RestController@RequestMapping("/test")publicclassTestController{@AutowiredprivateMqttService mqttService;/**
* 推送消息
* @author LiChangRui on 2024/3/15 13:59
*/@Operation(summary ="推送消息")@PostMapping("/publish")publicResult<?>publish(@RequestBody@ValidatedPublishDTO dto){
mqttService.publish(dto.getTopic(), dto.getMsg());returnResult.ok();}}
控制台消息:
总结
如果您发现错误,还望及时提醒,共同进步。
版权归原作者 搖了我吧 所有, 如有侵权,请联系我们删除。