0


SpringBoot集成MQTT服务器和客户端并通过WebSocket将消息实时发送到前端(本地搭建MQTT服务器)

前言:

    在写这篇文章之前,我其实是碰到这个需求的,因为当时我需要这个MQTT服务器功能集成到我的程序中,而不是去用第三方公共的,但是我查阅了大量文章却都没有太大的收获,可能是我搜索的关键字不对,现在自己弄懂了大概之后就顺便做个记录。有错误的地方欢迎大家指出!

    这里就只讲解与之相关的内容了,相关概念可以去看MQTT协议入门、websocket详情

    关于mqtt的代码我是参考的mqtt参考

如何搭建MQTT本地服务器:

1. 要在pom.xml中添加相关maven第三方依赖至于Moquette是什么大家可以自行去查阅
<!-- Moquette是 MQTT Broker实例库-->
        <dependency>
            <groupId>io.moquette</groupId>
            <artifactId>moquette-broker</artifactId>
            <version>0.12.1</version>
            <!-- 这里是因为我的其他配置信息里面好像跟这个冲突了,所以没要它-->            
            <exclusions>
                <exclusion>
                    <groupId>ch.qos.reload4j</groupId>
                    <artifactId>reload4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
2. 添加mqtt的第三方依赖
<!--添加mqtt依赖-->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>
3. mqtt服务器及其客户端相关配置信息代码
import io.moquette.broker.Server;
import io.moquette.broker.config.MemoryConfig;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

@Configuration
public class MqttAndBrokerConfig {

    // Mqtt服务器
    @Bean
    public Server MqttBrokerServer() throws Exception{
        // MQTT broker 服务器
        Server mqttBroker = new Server();
        Properties configProps = new Properties();
        configProps.setProperty("port","1883");
        configProps.setProperty("host","127.0.0.1");
        configProps.setProperty("websocket_port","8087");//设置 WebSocket端口,用于支持 WebSocket连接
        try {
            // 启动服务器
            mqttBroker.startServer(new MemoryConfig(configProps));
            System.out.println("Moquette MQTT Broker started. Press Ctrl+C to shutdown.");
            // 程序关闭时执行的操作
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                System.out.println("Stopping Moquette MQTT Broker...");
                mqttBroker.stopServer();
                System.out.println("Moquette MQTT Broker Stopped.");
            }));
        }catch (Exception e){
            // 捕获并输出异常信息。
            e.printStackTrace();
        }
        return mqttBroker;
    }

    //这个可以根据自己的需要进行相关的配置,因为我是需求简单才这样写的
    //Mqtt客户端的option配置
    @Bean
    public MqttConnectOptions mqttConnectOptions(){
        MqttConnectOptions options = new MqttConnectOptions();
        options.setKeepAliveInterval(60);//设置保活时间
        options.setConnectionTimeout(30);//设置连接超时时间
        options.setUserName("mqtt_demo");
        options.setPassword("123456".toCharArray());
        options.setCleanSession(true);//配置持久会话。
        //true:表示创建一个新的会话,在客户端断开连接时,会话将自动销毁.
        //false:创建一个持久会话,在客户端断开连接后会话仍然保持,直到会话超时注销.
        return options;
    }
}

实现mqtt的发布/订阅模式

1. mqtt controller实现层
    注意:到这里的时候已经可以启动相关程序,并且可以调用其方法。我还是讲一下吧。

    init():在执行方法之前,分别创建发布者/订阅者的客户端,并连接服务器。其中我还设置了当程序启动时自动订阅一个topic(这个可以根据自己的需要来进行设置)

    cleanup():在关闭程序时执行,即容器销毁bean时调用。

    publisher方法:发布者发布信息时调用的方法。

    subscriber方法:订阅者订阅的消息的方法(这个可以不用像我这样添加,因为订阅者是不需要做出操作就可以获取信息的,我只是顺便写了),这个方法也可以直接放在publisher方法中直接使用。

    注意:相关类在文章最下面
import com.example.intelligentparking.pojo.Result;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.web.bind.annotation.*;

import java.util.UUID;

@RestController
@RequestMapping("/mqtt")
@DependsOn("MqttBrokerServer")//确保MqttBrokerServer在controller层之前被初始化
public class MqttController {
    private MqttClient publisherClient;
    private MqttClient subscriberClient;
    private final String brokerUrl = "tcp://localhost:1883";
    private int qos = 1;
    private String topic = "mqtt/demo";

    @Autowired
    private MqttConnectOptions options;

    @PostConstruct //在类的依赖项被注入后调用,但在类被使用之前执行
    public void init(){
        try {
            // 发布者
            // 创建 MQTT 客户端
            publisherClient = new MqttClient(brokerUrl, "publisher_" + UUID.randomUUID().toString(), new MemoryPersistence());
            //new MemoryPersistence()使用内存持久化存储。
            // 设置连接配置
            publisherClient.connect(options);
            System.out.println("发布者客户端连接成功");

            // 订阅者
            subscriberClient = new MqttClient(brokerUrl, "subscriber_" + UUID.randomUUID().toString(), new MemoryPersistence());
            subscriberClient.connect(options);
            System.out.println("订阅者客户端连接成功");

            // 在应用启动时订阅主题
            subscriberClient.subscribe(topic, qos);
            System.out.println("Subscribed to topic: " + topic );

            // 设置回调函数
            subscriberClient.setCallback(new MqttCallback() {
                // 连接丢失时调用
                @Override
                public void connectionLost(Throwable throwable) {
                    System.out.println("connectionLost: " + throwable.getMessage());
                }

                // 接收到消息时被调用
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    String messageContent = new String(message.getPayload());
                    System.out.println("我是订阅消息的!");
                    System.out.println("topic: " + topic);
                    System.out.println("Qos: " + message.getQos());
                    System.out.println("message content: " + messageContent);

                  
                }

                // 消息发送完成时被调用
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    System.out.println("deliveryComplete---------" + token.isComplete());
                }
            });

        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    @PostMapping("/publish")
    public Result publisher(@RequestParam String topic, @RequestParam String content) throws MqttException {
        // 创建消息(在 MQTT 协议中,消息的内容是以字节形式发送的,所以用.getBytes()方法)并设置 QoS
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(1);
        // 发布消息
        publisherClient.publish(topic, message);
        System.out.println("我是发布消息的!");
        System.out.println("Message published");
        System.out.println("topic: " + topic);
        System.out.println("message content: " + content);
        return Result.success("发布成功");
    }

    @PostMapping("/subscribe")
    public Result subscriber(@RequestParam String topic) throws MqttException {
        subscriberClient.subscribe(topic, qos);
        System.out.println("Subscribed to topic: " + topic);
        return Result.success("订阅成功");
    }

    @PreDestroy  //Spring 容器销毁 bean 时调用
    public void cleanup() {
        try {
            if (publisherClient != null && publisherClient.isConnected()) {
                publisherClient.disconnect();
                publisherClient.close();
                System.out.println("Publisher client disconnected and closed.");
            }
            if (subscriberClient != null && subscriberClient.isConnected()) {
                // 关闭连接
                subscriberClient.disconnect();
                // 关闭客户端
                subscriberClient.close();
                System.out.println("MQTT client disconnected and closed.");
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

websocket相关配置信息

1. 添加相关依赖
<!--引入 WebSocket 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
2. 配置websocket
import com.example.intelligentparking.service.MqttWebSocketHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        //在指定路径 /mqtt-websocket 上添加 MqttWebSocketHandler处理器。这个路径是客户端连接时使用的 WebSocket URL。
        registry.addHandler(new MqttWebSocketHandler(), "/mqtt-websocket")
                //允许所有来源的客户端连接(CORS 设置),这对于开发环境通常是方便的,但在生产环境中应该指定允许的来源以提高安全性
                .setAllowedOrigins("*");
    }
}
3. websocket处理器
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;

public class MqttWebSocketHandler extends TextWebSocketHandler {
//声明了一个名为 sessions 的变量,用于存储所有当前活跃的 WebSocket连接(即客户端会话),并使用线程安全的 CopyOnWriteArrayList 来确保在多线程环境中安全访问和修改这个列表。
    private static final CopyOnWriteArrayList<WebSocketSession> sessions = new CopyOnWriteArrayList<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        sessions.add(session);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        sessions.remove(session);
    }

//将指定的消息发送到所有连接的 WebSocket 客户端。
    public void sendMessageToAll(String message) {
        for (WebSocketSession session : sessions) {
            try {
                session.sendMessage(new TextMessage(message));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

注意:现在这两个还不能直接传递消息,因为mqtt和websocket这两个算是跨域信息,所以需要配置CORS信息,以允许来自特定源或所有源的连接。

配置CORS信息:

import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class WebConfig implements WebMvcConfigurer {
    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/**") // 所有接口
                .allowCredentials(true) // 是否发送 Cookie
                .allowedOriginPatterns("*") // 支持域
                .allowedMethods("GET", "POST", "PUT", "DELETE") // 支持方法
                .allowedHeaders("*") // 表示所有请求头都被允许,客户端可以发送任何请求头到服务器。
                .exposedHeaders("*");// 表示所有响应头都将被暴露给客户端,客户端可以访问所有的响应头信息。
    }
}

注意:这个时候就可以实现信息跨域,但是前面的代码中需要实现将消息传递到前端

信息跨域实现:

MqttController类中添加

import com.example.intelligentparking.service.MqttWebSocketHandler;
import com.example.intelligentparking.pojo.MessageDTO;
import com.fasterxml.jackson.databind.ObjectMapper;

private final MqttWebSocketHandler webSocketHandler = new MqttWebSocketHandler();
private ObjectMapper objectMapper = new ObjectMapper();

// 接收到消息时被调用
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    String messageContent = new String(message.getPayload());
                    System.out.println("我是订阅消息的!");
                    System.out.println("topic: " + topic);
                    System.out.println("Qos: " + message.getQos());
                    System.out.println("message content: " + messageContent);

                    // 将消息内容转换为 JSON(websocket中以json数据传递)
                    MessageDTO messageDTO = new MessageDTO(messageContent);
                    String jsonMessage = objectMapper.writeValueAsString(messageDTO);

                    // 通过 WebSocket 发送消息到前端
                    webSocketHandler.sendMessageToAll(jsonMessage);
                }

对应websocket前端的js代码

// client.js
const WebSocket = require('ws');

// 创建 WebSocket 连接
const socket = new WebSocket('ws://localhost:8086/mqtt-websocket');

// 连接成功时的回调
socket.on('open', () => {
    console.log('WebSocket 连接已建立');
});

// 接收到消息时的回调
socket.on('message', (res) => {
    console.log('收到消息:', JSON.parse(res));
    // 在这里可以进行消息验证或处理
});

// 连接关闭时的回调
socket.on('close', () => {
    console.log('WebSocket 连接已关闭');
});

// 连接错误时的回调
socket.on('error', (error) => {
    console.error('WebSocket 错误:', error);
});

相关类:Result、MessageDTO

package com.example.intelligentparking.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

// 统一响应结果
@NoArgsConstructor//无参构造
@AllArgsConstructor//全参构造
@Data
public class Result<T> {

    private Integer code; //业务状态码   0-成功    1-失败
    private String message; //提示信息
    private T data; //响应数据

    //返回操作成功响应结果(带参数)
    public static <E> Result<E> success(E data){
        return new Result<>(0,"操作成功",data);
    }

    //返回操作成功响应结果
    public static Result success(){
        return new Result(0,"操作成功",null);
    }

    //返回操作失败的响应结果
    public static Result error(String message){
        return new Result(  1,message,null);
    }
}
package com.example.intelligentparking.pojo;

public class MessageDTO {
    private String content;

    public MessageDTO(String content) {
        this.content = content;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

综上就可以实现title的功能了


本文转载自: https://blog.csdn.net/m0_74773277/article/details/143337779
版权归原作者 緋儚 所有, 如有侵权,请联系我们删除。

“SpringBoot集成MQTT服务器和客户端并通过WebSocket将消息实时发送到前端(本地搭建MQTT服务器)”的评论:

还没有评论