0


EMQX(MQTT协议)服务和测试工具安装及整合SringBoot后的使用

文章目录


一、WINDOWS下搭建MQTT服务EMQX

1、下载服务

官网地址:

https://www.emqx.io/downloads

进入官网后下载window压缩包
在这里插入图片描述

2、 启动服务

解压zip文件,修改白名单。在etc目录下找到acl.conf文件。
在这里插入图片描述
在回到上一级目录,进入bin目录,在终端输入pushd + bin目录地址,例:

pushd D:\Environment\emqx\bin

启动服务,终端输入:

emqx.cmd start

打开浏览器输入:

localhost:18083

出现如下界面便是EMQX服务已经启动。
在这里插入图片描述
EMQX初始的用户名:admin 密码:public 。登录后设置中文页面。

在这里插入图片描述
在这里插入图片描述

二、测试工具

1、安装

官网地址:

https://mqttx.app/zh/downloads

安装过于简单,自行下载安装。

2、汉化及使用

在这里插入图片描述

添加一个连接
在这里插入图片描述
刷新管理端页面
在这里插入图片描述

三、整合SpringBoot

1、导入Maven依赖

<!-- mqtt依赖 start --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!-- mqtt依赖 end -->

2、配置

2.1、配置文件

# MQTT配置
mqtt:
  # emqx服务器地址
  host: tcp://127.0.0.1:1883
  # 用户名
  username: ces
  # 密码
  password: ces1
  # 客户端id
  clientId: service_${random.uuid}
  # 定义连接超时时间 默认为10秒 如果未在属性中指定 则使用默认值
  connectionTimeout:10
  # 消息服务质量
  qos:2
  # 连接保持检查周期 秒
  keepAliveInterval:20
  # 开启自动重连
  automaticReconnect:true
  # 是否清除会话session
  cleanSession:true
  # 默认订阅主题
  defaultSubscribeTopic: washingMachine/online/+,airConditioner/online/+
  # 是否保留发布消息
  retained:false

2.2、常量

packagenet.rakan.distributedservice.common.constant;importlombok.Data;importlombok.extern.slf4j.Slf4j;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Configuration;/**
 * MQTT常量
 * @author LiChangRui on 2024/4/21 16:53
 */@Data@Slf4j@Configuration@ConfigurationProperties(prefix ="mqtt")publicclassMqttConstants{/**
     * emqx服务器地址
     */privateString host;/**
     * 用户名
     */privateString username;/**
     * 密码
     */privateString password;/**
     * 客户端id
     */privateString clientId;/**
     * 定义连接超时时间 默认为10秒 如果未在属性中指定 则使用默认值
     */privateint connectionTimeout;/**
     * 消息服务质量
     */privateint qos;/**
     * 连接保持检查周期 秒
     */privateint keepAliveInterval;/**
     * 开启自动重连
     */privateBoolean automaticReconnect;/**
     * 是否清除会话session
     */privateBoolean cleanSession;/**
     * 默认订阅主题
     */privateString defaultSubscribeTopic;/**
     * 是否保留发布消息
     */privateBoolean retained;}

3、MQTT服务类

packagenet.rakan.distributedservice.common.service;importlombok.extern.slf4j.Slf4j;importnet.rakan.distributedservice.common.callback.MqttCallback;importnet.rakan.distributedservice.common.constant.MqttConstants;importorg.eclipse.paho.client.mqttv3.MqttClient;importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.eclipse.paho.client.mqttv3.MqttException;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.util.Arrays;/**
 * MQTT服务
 * @author LiChangRui on 2024/4/21 17:03
 */@Slf4j@ComponentpublicclassMqttService{/**
     * MQTT常量
     */@AutowiredprivateMqttConstants mqttConstants;/**
     * MQTT连接
     */privateMqttClient mqttClient;privatevoidsetMqttClient(MqttClient mqttClient){this.mqttClient = mqttClient;}/**
     * 推送消息
     * @author LiChangRui on 2024/4/21 18:02
     */publicvoidpublish(String topic,String msg){try{
            mqttClient.publish(topic, msg.getBytes(), mqttConstants.getQos(), mqttConstants.getRetained());}catch(MqttException e){
            log.error("MQTT推送消息失败!");}}/**
     * 订阅消息
     * @author LiChangRui on 2024/4/21 21:54
     */publicvoidsubscribe(String topic){
        log.info("开始订阅主题:"+ topic +"。");try{
            mqttClient.subscribe(topic, mqttConstants.getQos());}catch(MqttException e){
            log.error("MQTT订阅主题失败!");}}/**
     * 订阅消息
     * @author LiChangRui on 2024/4/21 21:54
     */publicvoidsubscribe(String[] topic){int[] qos =newint[topic.length];Arrays.fill(qos, mqttConstants.getQos());
        log.info("开始订阅主题:"+String.join(",", topic)+"。");try{
            mqttClient.subscribe(topic, qos);}catch(MqttException e){
            log.error("MQTT订阅主题失败!");}}/**
     * MQTT连接
     * @author LiChangRui on 2024/4/22 9:48
     */publicvoidconnect(String host,String clientId,MqttCallback mqttCallback,MqttConnectOptions options){MqttClient mqttClient;try{// 设置连接参数
            mqttClient =newMqttClient(host, clientId);// 设置回调
            mqttClient.setCallback(mqttCallback);// 连接
            mqttClient.connect(options);}catch(MqttException e){
            log.error("连接失败:"+ e.getMessage()+"!");return;}this.setMqttClient(mqttClient);
        log.info("连接成功!");}}

4、MQTT服务回调类

packagenet.rakan.distributedservice.common.callback;importlombok.extern.slf4j.Slf4j;importnet.rakan.distributedservice.common.constant.MqttConstants;importnet.rakan.distributedservice.common.service.MqttService;importorg.eclipse.paho.client.mqttv3.IMqttDeliveryToken;importorg.eclipse.paho.client.mqttv3.MqttCallbackExtended;importorg.eclipse.paho.client.mqttv3.MqttMessage;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Configuration;/**
 * MQTT回调
 * @author LiChangRui on 2024/4/21 17:03
 */@Slf4j@ConfigurationpublicclassMqttCallbackimplementsMqttCallbackExtended{/**
     * MQTT常量
     */@AutowiredprivateMqttConstants mqttConstants;/**
     * MQTT服务
     */@AutowiredprivateMqttService mqttService;/**
     * 客户端断开后触发
     * @author LiChangRui on 2024/4/21 18:19
     */@OverridepublicvoidconnectionLost(Throwable throwable){
        log.info("客户端连接断开!");// 已经设置断线重新连接 所以这里不用写重连的逻辑 只需要写断开连接的业务逻辑}/**
     * 客户端收到消息触发
     * @author LiChangRui on 2024/4/21 18:21
     */@OverridepublicvoidmessageArrived(String topic,MqttMessage mqttMessage){
        log.info("接收消息主题 : "+ topic);
        log.info("接收消息Qos : "+ mqttMessage.getQos());
        log.info("接收消息内容 : "+newString(mqttMessage.getPayload()));}/**
     * 发布消息成功
     * @author LiChangRui on 2024/4/21 18:23
     */@OverridepublicvoiddeliveryComplete(IMqttDeliveryToken iMqttDeliveryToken){String[] topics = iMqttDeliveryToken.getTopics();for(String topic : topics){
            log.info("向主题:"+ topic +"发送消息成功!");}//        try {//            // 在消息被传递之前,正在传递的消息将被返回。一旦消息被传递,将返回null//            MqttMessage message = iMqttDeliveryToken.getMessage();//            byte[] payload = message.getPayload();//            String s = new String(payload, StandardCharsets.UTF_8);//            log.info("消息的内容是:" + s + "。");//        } catch (MqttException e) {//            log.error("获取发送消息内容失败!");//        }}/**
     * 客户端连接成功
     * @author LiChangRui on 2024/4/21 18:23
     */@OverridepublicvoidconnectComplete(boolean reconnect,String serverURI){
        log.info("客户端连接成功!");String[] topic = mqttConstants.getDefaultSubscribeTopic().split(",");
        mqttService.subscribe(topic);}}

5、MQTT配置类

packagenet.rakan.distributedservice.common.config;importjakarta.annotation.PostConstruct;importlombok.extern.slf4j.Slf4j;importnet.rakan.distributedservice.common.callback.MqttCallback;importnet.rakan.distributedservice.common.constant.MqttConstants;importnet.rakan.distributedservice.common.service.MqttService;importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Configuration;/**
 * MQTT配置
 * @author LiChangRui on 2024/4/21 22:26
 */@Slf4j@ConfigurationpublicclassMqttConfig{/**
     * MQTT常量
     */@AutowiredprivateMqttConstants mqttConstants;/**
     * MQTT服务
     */@AutowiredprivateMqttService mqttService;/**
     * MQTT回调
     */@AutowiredprivateMqttCallback mqttCallback;/**
     * MQTT连接
     * @author LiChangRui on 2024/4/22 9:51
     */@PostConstructpublicvoidmqttClient(){MqttConnectOptions options =newMqttConnectOptions();// 用户名
        options.setUserName(mqttConstants.getUsername());// 密码
        options.setPassword(mqttConstants.getPassword().toCharArray());// 设置连接超时时间
        options.setConnectionTimeout(mqttConstants.getConnectionTimeout());// 开启自动重连
        options.setAutomaticReconnect(mqttConstants.getAutomaticReconnect());// 是否清除会话session
        options.setCleanSession(mqttConstants.getCleanSession());// 设置心跳间隔时间
        options.setKeepAliveInterval(mqttConstants.getKeepAliveInterval());
        mqttService.connect(mqttConstants.getHost(), mqttConstants.getClientId(), mqttCallback, options);}}

6、测试类

packagenet.rakan.distributedservice.testserver.controller;importio.swagger.v3.oas.annotations.Operation;importio.swagger.v3.oas.annotations.tags.Tag;importnet.rakan.distributedservice.common.dto.CancelOrderDTO;importnet.rakan.distributedservice.common.dto.PublishDTO;importnet.rakan.distributedservice.common.dto.SendMailDTO;importnet.rakan.distributedservice.common.service.MqttService;importnet.rakan.distributedservice.common.service.RabbitMqService;importnet.rakan.distributedservice.common.vo.Result;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.validation.annotation.Validated;importorg.springframework.web.bind.annotation.PostMapping;importorg.springframework.web.bind.annotation.RequestBody;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;/**
 * 测试
 * @author LiChangRui on 2024/3/15 10:14
 */@Tag(name ="测试")@RestController@RequestMapping("/test")publicclassTestController{@AutowiredprivateMqttService mqttService;/**
     * 推送消息
     * @author LiChangRui on 2024/3/15 13:59
     */@Operation(summary ="推送消息")@PostMapping("/publish")publicResult<?>publish(@RequestBody@ValidatedPublishDTO dto){

        mqttService.publish(dto.getTopic(), dto.getMsg());returnResult.ok();}}

在这里插入图片描述
控制台消息:
在这里插入图片描述

总结

如果您发现错误,还望及时提醒,共同进步。

标签: java springboot

本文转载自: https://blog.csdn.net/qq_44598725/article/details/138071499
版权归原作者 搖了我吧 所有, 如有侵权,请联系我们删除。

“EMQX(MQTT协议)服务和测试工具安装及整合SringBoot后的使用”的评论:

还没有评论