0


SpringBoot集成MQTT服务器和客户端并通过WebSocket将消息实时发送到前端(本地搭建MQTT服务器)

前言:

  1. 在写这篇文章之前,我其实是碰到这个需求的,因为当时我需要这个MQTT服务器功能集成到我的程序中,而不是去用第三方公共的,但是我查阅了大量文章却都没有太大的收获,可能是我搜索的关键字不对,现在自己弄懂了大概之后就顺便做个记录。有错误的地方欢迎大家指出!
  2. 这里就只讲解与之相关的内容了,相关概念可以去看MQTT协议入门、websocket详情
  3. 关于mqtt的代码我是参考的mqtt参考

如何搭建MQTT本地服务器:

1. 要在pom.xml中添加相关maven第三方依赖至于Moquette是什么大家可以自行去查阅
  1. <!-- Moquette是 MQTT Broker实例库-->
  2. <dependency>
  3. <groupId>io.moquette</groupId>
  4. <artifactId>moquette-broker</artifactId>
  5. <version>0.12.1</version>
  6. <!-- 这里是因为我的其他配置信息里面好像跟这个冲突了,所以没要它-->
  7. <exclusions>
  8. <exclusion>
  9. <groupId>ch.qos.reload4j</groupId>
  10. <artifactId>reload4j</artifactId>
  11. </exclusion>
  12. </exclusions>
  13. </dependency>
2. 添加mqtt的第三方依赖
  1. <!--添加mqtt依赖-->
  2. <dependency>
  3. <groupId>org.eclipse.paho</groupId>
  4. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  5. <version>1.2.5</version>
  6. </dependency>
3. mqtt服务器及其客户端相关配置信息代码
  1. import io.moquette.broker.Server;
  2. import io.moquette.broker.config.MemoryConfig;
  3. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import java.util.Properties;
  7. @Configuration
  8. public class MqttAndBrokerConfig {
  9. // Mqtt服务器
  10. @Bean
  11. public Server MqttBrokerServer() throws Exception{
  12. // MQTT broker 服务器
  13. Server mqttBroker = new Server();
  14. Properties configProps = new Properties();
  15. configProps.setProperty("port","1883");
  16. configProps.setProperty("host","127.0.0.1");
  17. configProps.setProperty("websocket_port","8087");//设置 WebSocket端口,用于支持 WebSocket连接
  18. try {
  19. // 启动服务器
  20. mqttBroker.startServer(new MemoryConfig(configProps));
  21. System.out.println("Moquette MQTT Broker started. Press Ctrl+C to shutdown.");
  22. // 程序关闭时执行的操作
  23. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  24. System.out.println("Stopping Moquette MQTT Broker...");
  25. mqttBroker.stopServer();
  26. System.out.println("Moquette MQTT Broker Stopped.");
  27. }));
  28. }catch (Exception e){
  29. // 捕获并输出异常信息。
  30. e.printStackTrace();
  31. }
  32. return mqttBroker;
  33. }
  34. //这个可以根据自己的需要进行相关的配置,因为我是需求简单才这样写的
  35. //Mqtt客户端的option配置
  36. @Bean
  37. public MqttConnectOptions mqttConnectOptions(){
  38. MqttConnectOptions options = new MqttConnectOptions();
  39. options.setKeepAliveInterval(60);//设置保活时间
  40. options.setConnectionTimeout(30);//设置连接超时时间
  41. options.setUserName("mqtt_demo");
  42. options.setPassword("123456".toCharArray());
  43. options.setCleanSession(true);//配置持久会话。
  44. //true:表示创建一个新的会话,在客户端断开连接时,会话将自动销毁.
  45. //false:创建一个持久会话,在客户端断开连接后会话仍然保持,直到会话超时注销.
  46. return options;
  47. }
  48. }

实现mqtt的发布/订阅模式

1. mqtt controller实现层
  1. 注意:到这里的时候已经可以启动相关程序,并且可以调用其方法。我还是讲一下吧。
  2. init():在执行方法之前,分别创建发布者/订阅者的客户端,并连接服务器。其中我还设置了当程序启动时自动订阅一个topic(这个可以根据自己的需要来进行设置)
  3. cleanup():在关闭程序时执行,即容器销毁bean时调用。
  4. publisher方法:发布者发布信息时调用的方法。
  5. subscriber方法:订阅者订阅的消息的方法(这个可以不用像我这样添加,因为订阅者是不需要做出操作就可以获取信息的,我只是顺便写了),这个方法也可以直接放在publisher方法中直接使用。
  6. 注意:相关类在文章最下面
  1. import com.example.intelligentparking.pojo.Result;
  2. import jakarta.annotation.PostConstruct;
  3. import jakarta.annotation.PreDestroy;
  4. import org.eclipse.paho.client.mqttv3.*;
  5. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.context.annotation.DependsOn;
  8. import org.springframework.web.bind.annotation.*;
  9. import java.util.UUID;
  10. @RestController
  11. @RequestMapping("/mqtt")
  12. @DependsOn("MqttBrokerServer")//确保MqttBrokerServer在controller层之前被初始化
  13. public class MqttController {
  14. private MqttClient publisherClient;
  15. private MqttClient subscriberClient;
  16. private final String brokerUrl = "tcp://localhost:1883";
  17. private int qos = 1;
  18. private String topic = "mqtt/demo";
  19. @Autowired
  20. private MqttConnectOptions options;
  21. @PostConstruct //在类的依赖项被注入后调用,但在类被使用之前执行
  22. public void init(){
  23. try {
  24. // 发布者
  25. // 创建 MQTT 客户端
  26. publisherClient = new MqttClient(brokerUrl, "publisher_" + UUID.randomUUID().toString(), new MemoryPersistence());
  27. //new MemoryPersistence()使用内存持久化存储。
  28. // 设置连接配置
  29. publisherClient.connect(options);
  30. System.out.println("发布者客户端连接成功");
  31. // 订阅者
  32. subscriberClient = new MqttClient(brokerUrl, "subscriber_" + UUID.randomUUID().toString(), new MemoryPersistence());
  33. subscriberClient.connect(options);
  34. System.out.println("订阅者客户端连接成功");
  35. // 在应用启动时订阅主题
  36. subscriberClient.subscribe(topic, qos);
  37. System.out.println("Subscribed to topic: " + topic );
  38. // 设置回调函数
  39. subscriberClient.setCallback(new MqttCallback() {
  40. // 连接丢失时调用
  41. @Override
  42. public void connectionLost(Throwable throwable) {
  43. System.out.println("connectionLost: " + throwable.getMessage());
  44. }
  45. // 接收到消息时被调用
  46. @Override
  47. public void messageArrived(String topic, MqttMessage message) throws Exception {
  48. String messageContent = new String(message.getPayload());
  49. System.out.println("我是订阅消息的!");
  50. System.out.println("topic: " + topic);
  51. System.out.println("Qos: " + message.getQos());
  52. System.out.println("message content: " + messageContent);
  53. }
  54. // 消息发送完成时被调用
  55. @Override
  56. public void deliveryComplete(IMqttDeliveryToken token) {
  57. System.out.println("deliveryComplete---------" + token.isComplete());
  58. }
  59. });
  60. } catch (MqttException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. @PostMapping("/publish")
  65. public Result publisher(@RequestParam String topic, @RequestParam String content) throws MqttException {
  66. // 创建消息(在 MQTT 协议中,消息的内容是以字节形式发送的,所以用.getBytes()方法)并设置 QoS
  67. MqttMessage message = new MqttMessage(content.getBytes());
  68. message.setQos(1);
  69. // 发布消息
  70. publisherClient.publish(topic, message);
  71. System.out.println("我是发布消息的!");
  72. System.out.println("Message published");
  73. System.out.println("topic: " + topic);
  74. System.out.println("message content: " + content);
  75. return Result.success("发布成功");
  76. }
  77. @PostMapping("/subscribe")
  78. public Result subscriber(@RequestParam String topic) throws MqttException {
  79. subscriberClient.subscribe(topic, qos);
  80. System.out.println("Subscribed to topic: " + topic);
  81. return Result.success("订阅成功");
  82. }
  83. @PreDestroy //Spring 容器销毁 bean 时调用
  84. public void cleanup() {
  85. try {
  86. if (publisherClient != null && publisherClient.isConnected()) {
  87. publisherClient.disconnect();
  88. publisherClient.close();
  89. System.out.println("Publisher client disconnected and closed.");
  90. }
  91. if (subscriberClient != null && subscriberClient.isConnected()) {
  92. // 关闭连接
  93. subscriberClient.disconnect();
  94. // 关闭客户端
  95. subscriberClient.close();
  96. System.out.println("MQTT client disconnected and closed.");
  97. }
  98. } catch (MqttException e) {
  99. e.printStackTrace();
  100. }
  101. }
  102. }

websocket相关配置信息

1. 添加相关依赖
  1. <!--引入 WebSocket 依赖-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-websocket</artifactId>
  5. </dependency>
2. 配置websocket
  1. import com.example.intelligentparking.service.MqttWebSocketHandler;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  4. import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
  5. import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
  6. @Configuration
  7. @EnableWebSocket
  8. public class WebSocketConfig implements WebSocketConfigurer {
  9. @Override
  10. public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
  11. //在指定路径 /mqtt-websocket 上添加 MqttWebSocketHandler处理器。这个路径是客户端连接时使用的 WebSocket URL。
  12. registry.addHandler(new MqttWebSocketHandler(), "/mqtt-websocket")
  13. //允许所有来源的客户端连接(CORS 设置),这对于开发环境通常是方便的,但在生产环境中应该指定允许的来源以提高安全性
  14. .setAllowedOrigins("*");
  15. }
  16. }
3. websocket处理器
  1. import org.springframework.web.socket.CloseStatus;
  2. import org.springframework.web.socket.TextMessage;
  3. import org.springframework.web.socket.WebSocketSession;
  4. import org.springframework.web.socket.handler.TextWebSocketHandler;
  5. import java.io.IOException;
  6. import java.util.concurrent.CopyOnWriteArrayList;
  7. public class MqttWebSocketHandler extends TextWebSocketHandler {
  8. //声明了一个名为 sessions 的变量,用于存储所有当前活跃的 WebSocket连接(即客户端会话),并使用线程安全的 CopyOnWriteArrayList 来确保在多线程环境中安全访问和修改这个列表。
  9. private static final CopyOnWriteArrayList<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
  10. @Override
  11. public void afterConnectionEstablished(WebSocketSession session) {
  12. sessions.add(session);
  13. }
  14. @Override
  15. public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
  16. sessions.remove(session);
  17. }
  18. //将指定的消息发送到所有连接的 WebSocket 客户端。
  19. public void sendMessageToAll(String message) {
  20. for (WebSocketSession session : sessions) {
  21. try {
  22. session.sendMessage(new TextMessage(message));
  23. } catch (IOException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }
  28. }

注意:现在这两个还不能直接传递消息,因为mqtt和websocket这两个算是跨域信息,所以需要配置CORS信息,以允许来自特定源或所有源的连接。

配置CORS信息:

  1. import org.springframework.context.annotation.Configuration;
  2. import org.springframework.web.servlet.config.annotation.CorsRegistry;
  3. import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
  4. @Configuration
  5. public class WebConfig implements WebMvcConfigurer {
  6. @Override
  7. public void addCorsMappings(CorsRegistry registry) {
  8. registry.addMapping("/**") // 所有接口
  9. .allowCredentials(true) // 是否发送 Cookie
  10. .allowedOriginPatterns("*") // 支持域
  11. .allowedMethods("GET", "POST", "PUT", "DELETE") // 支持方法
  12. .allowedHeaders("*") // 表示所有请求头都被允许,客户端可以发送任何请求头到服务器。
  13. .exposedHeaders("*");// 表示所有响应头都将被暴露给客户端,客户端可以访问所有的响应头信息。
  14. }
  15. }

注意:这个时候就可以实现信息跨域,但是前面的代码中需要实现将消息传递到前端

信息跨域实现:

MqttController类中添加

  1. import com.example.intelligentparking.service.MqttWebSocketHandler;
  2. import com.example.intelligentparking.pojo.MessageDTO;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. private final MqttWebSocketHandler webSocketHandler = new MqttWebSocketHandler();
  5. private ObjectMapper objectMapper = new ObjectMapper();
  6. // 接收到消息时被调用
  7. @Override
  8. public void messageArrived(String topic, MqttMessage message) throws Exception {
  9. String messageContent = new String(message.getPayload());
  10. System.out.println("我是订阅消息的!");
  11. System.out.println("topic: " + topic);
  12. System.out.println("Qos: " + message.getQos());
  13. System.out.println("message content: " + messageContent);
  14. // 将消息内容转换为 JSON(websocket中以json数据传递)
  15. MessageDTO messageDTO = new MessageDTO(messageContent);
  16. String jsonMessage = objectMapper.writeValueAsString(messageDTO);
  17. // 通过 WebSocket 发送消息到前端
  18. webSocketHandler.sendMessageToAll(jsonMessage);
  19. }

对应websocket前端的js代码

  1. // client.js
  2. const WebSocket = require('ws');
  3. // 创建 WebSocket 连接
  4. const socket = new WebSocket('ws://localhost:8086/mqtt-websocket');
  5. // 连接成功时的回调
  6. socket.on('open', () => {
  7. console.log('WebSocket 连接已建立');
  8. });
  9. // 接收到消息时的回调
  10. socket.on('message', (res) => {
  11. console.log('收到消息:', JSON.parse(res));
  12. // 在这里可以进行消息验证或处理
  13. });
  14. // 连接关闭时的回调
  15. socket.on('close', () => {
  16. console.log('WebSocket 连接已关闭');
  17. });
  18. // 连接错误时的回调
  19. socket.on('error', (error) => {
  20. console.error('WebSocket 错误:', error);
  21. });

相关类:Result、MessageDTO

  1. package com.example.intelligentparking.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. // 统一响应结果
  6. @NoArgsConstructor//无参构造
  7. @AllArgsConstructor//全参构造
  8. @Data
  9. public class Result<T> {
  10. private Integer code; //业务状态码 0-成功 1-失败
  11. private String message; //提示信息
  12. private T data; //响应数据
  13. //返回操作成功响应结果(带参数)
  14. public static <E> Result<E> success(E data){
  15. return new Result<>(0,"操作成功",data);
  16. }
  17. //返回操作成功响应结果
  18. public static Result success(){
  19. return new Result(0,"操作成功",null);
  20. }
  21. //返回操作失败的响应结果
  22. public static Result error(String message){
  23. return new Result( 1,message,null);
  24. }
  25. }
  1. package com.example.intelligentparking.pojo;
  2. public class MessageDTO {
  3. private String content;
  4. public MessageDTO(String content) {
  5. this.content = content;
  6. }
  7. public String getContent() {
  8. return content;
  9. }
  10. public void setContent(String content) {
  11. this.content = content;
  12. }
  13. }

综上就可以实现title的功能了


本文转载自: https://blog.csdn.net/m0_74773277/article/details/143337779
版权归原作者 緋儚 所有, 如有侵权,请联系我们删除。

“SpringBoot集成MQTT服务器和客户端并通过WebSocket将消息实时发送到前端(本地搭建MQTT服务器)”的评论:

还没有评论