前言:
在写这篇文章之前,我其实是碰到这个需求的,因为当时我需要这个MQTT服务器功能集成到我的程序中,而不是去用第三方公共的,但是我查阅了大量文章却都没有太大的收获,可能是我搜索的关键字不对,现在自己弄懂了大概之后就顺便做个记录。有错误的地方欢迎大家指出!
这里就只讲解与之相关的内容了,相关概念可以去看MQTT协议入门、websocket详情
关于mqtt的代码我是参考的mqtt参考
如何搭建MQTT本地服务器:
1. 要在pom.xml中添加相关maven第三方依赖至于Moquette是什么大家可以自行去查阅
<!-- Moquette是 MQTT Broker实例库-->
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.12.1</version>
<!-- 这里是因为我的其他配置信息里面好像跟这个冲突了,所以没要它-->
<exclusions>
<exclusion>
<groupId>ch.qos.reload4j</groupId>
<artifactId>reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
2. 添加mqtt的第三方依赖
<!--添加mqtt依赖-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
3. mqtt服务器及其客户端相关配置信息代码
import io.moquette.broker.Server;
import io.moquette.broker.config.MemoryConfig;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
@Configuration
public class MqttAndBrokerConfig {
// Mqtt服务器
@Bean
public Server MqttBrokerServer() throws Exception{
// MQTT broker 服务器
Server mqttBroker = new Server();
Properties configProps = new Properties();
configProps.setProperty("port","1883");
configProps.setProperty("host","127.0.0.1");
configProps.setProperty("websocket_port","8087");//设置 WebSocket端口,用于支持 WebSocket连接
try {
// 启动服务器
mqttBroker.startServer(new MemoryConfig(configProps));
System.out.println("Moquette MQTT Broker started. Press Ctrl+C to shutdown.");
// 程序关闭时执行的操作
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Stopping Moquette MQTT Broker...");
mqttBroker.stopServer();
System.out.println("Moquette MQTT Broker Stopped.");
}));
}catch (Exception e){
// 捕获并输出异常信息。
e.printStackTrace();
}
return mqttBroker;
}
//这个可以根据自己的需要进行相关的配置,因为我是需求简单才这样写的
//Mqtt客户端的option配置
@Bean
public MqttConnectOptions mqttConnectOptions(){
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(60);//设置保活时间
options.setConnectionTimeout(30);//设置连接超时时间
options.setUserName("mqtt_demo");
options.setPassword("123456".toCharArray());
options.setCleanSession(true);//配置持久会话。
//true:表示创建一个新的会话,在客户端断开连接时,会话将自动销毁.
//false:创建一个持久会话,在客户端断开连接后会话仍然保持,直到会话超时注销.
return options;
}
}
实现mqtt的发布/订阅模式
1. mqtt controller实现层
注意:到这里的时候已经可以启动相关程序,并且可以调用其方法。我还是讲一下吧。
init():在执行方法之前,分别创建发布者/订阅者的客户端,并连接服务器。其中我还设置了当程序启动时自动订阅一个topic(这个可以根据自己的需要来进行设置)
cleanup():在关闭程序时执行,即容器销毁bean时调用。
publisher方法:发布者发布信息时调用的方法。
subscriber方法:订阅者订阅的消息的方法(这个可以不用像我这样添加,因为订阅者是不需要做出操作就可以获取信息的,我只是顺便写了),这个方法也可以直接放在publisher方法中直接使用。
注意:相关类在文章最下面
import com.example.intelligentparking.pojo.Result;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.web.bind.annotation.*;
import java.util.UUID;
@RestController
@RequestMapping("/mqtt")
@DependsOn("MqttBrokerServer")//确保MqttBrokerServer在controller层之前被初始化
public class MqttController {
private MqttClient publisherClient;
private MqttClient subscriberClient;
private final String brokerUrl = "tcp://localhost:1883";
private int qos = 1;
private String topic = "mqtt/demo";
@Autowired
private MqttConnectOptions options;
@PostConstruct //在类的依赖项被注入后调用,但在类被使用之前执行
public void init(){
try {
// 发布者
// 创建 MQTT 客户端
publisherClient = new MqttClient(brokerUrl, "publisher_" + UUID.randomUUID().toString(), new MemoryPersistence());
//new MemoryPersistence()使用内存持久化存储。
// 设置连接配置
publisherClient.connect(options);
System.out.println("发布者客户端连接成功");
// 订阅者
subscriberClient = new MqttClient(brokerUrl, "subscriber_" + UUID.randomUUID().toString(), new MemoryPersistence());
subscriberClient.connect(options);
System.out.println("订阅者客户端连接成功");
// 在应用启动时订阅主题
subscriberClient.subscribe(topic, qos);
System.out.println("Subscribed to topic: " + topic );
// 设置回调函数
subscriberClient.setCallback(new MqttCallback() {
// 连接丢失时调用
@Override
public void connectionLost(Throwable throwable) {
System.out.println("connectionLost: " + throwable.getMessage());
}
// 接收到消息时被调用
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String messageContent = new String(message.getPayload());
System.out.println("我是订阅消息的!");
System.out.println("topic: " + topic);
System.out.println("Qos: " + message.getQos());
System.out.println("message content: " + messageContent);
}
// 消息发送完成时被调用
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
@PostMapping("/publish")
public Result publisher(@RequestParam String topic, @RequestParam String content) throws MqttException {
// 创建消息(在 MQTT 协议中,消息的内容是以字节形式发送的,所以用.getBytes()方法)并设置 QoS
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(1);
// 发布消息
publisherClient.publish(topic, message);
System.out.println("我是发布消息的!");
System.out.println("Message published");
System.out.println("topic: " + topic);
System.out.println("message content: " + content);
return Result.success("发布成功");
}
@PostMapping("/subscribe")
public Result subscriber(@RequestParam String topic) throws MqttException {
subscriberClient.subscribe(topic, qos);
System.out.println("Subscribed to topic: " + topic);
return Result.success("订阅成功");
}
@PreDestroy //Spring 容器销毁 bean 时调用
public void cleanup() {
try {
if (publisherClient != null && publisherClient.isConnected()) {
publisherClient.disconnect();
publisherClient.close();
System.out.println("Publisher client disconnected and closed.");
}
if (subscriberClient != null && subscriberClient.isConnected()) {
// 关闭连接
subscriberClient.disconnect();
// 关闭客户端
subscriberClient.close();
System.out.println("MQTT client disconnected and closed.");
}
} catch (MqttException e) {
e.printStackTrace();
}
}
}
websocket相关配置信息
1. 添加相关依赖
<!--引入 WebSocket 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2. 配置websocket
import com.example.intelligentparking.service.MqttWebSocketHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
//在指定路径 /mqtt-websocket 上添加 MqttWebSocketHandler处理器。这个路径是客户端连接时使用的 WebSocket URL。
registry.addHandler(new MqttWebSocketHandler(), "/mqtt-websocket")
//允许所有来源的客户端连接(CORS 设置),这对于开发环境通常是方便的,但在生产环境中应该指定允许的来源以提高安全性
.setAllowedOrigins("*");
}
}
3. websocket处理器
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
public class MqttWebSocketHandler extends TextWebSocketHandler {
//声明了一个名为 sessions 的变量,用于存储所有当前活跃的 WebSocket连接(即客户端会话),并使用线程安全的 CopyOnWriteArrayList 来确保在多线程环境中安全访问和修改这个列表。
private static final CopyOnWriteArrayList<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
sessions.add(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
sessions.remove(session);
}
//将指定的消息发送到所有连接的 WebSocket 客户端。
public void sendMessageToAll(String message) {
for (WebSocketSession session : sessions) {
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
注意:现在这两个还不能直接传递消息,因为mqtt和websocket这两个算是跨域信息,所以需要配置CORS信息,以允许来自特定源或所有源的连接。
配置CORS信息:
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class WebConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**") // 所有接口
.allowCredentials(true) // 是否发送 Cookie
.allowedOriginPatterns("*") // 支持域
.allowedMethods("GET", "POST", "PUT", "DELETE") // 支持方法
.allowedHeaders("*") // 表示所有请求头都被允许,客户端可以发送任何请求头到服务器。
.exposedHeaders("*");// 表示所有响应头都将被暴露给客户端,客户端可以访问所有的响应头信息。
}
}
注意:这个时候就可以实现信息跨域,但是前面的代码中需要实现将消息传递到前端
信息跨域实现:
MqttController类中添加
import com.example.intelligentparking.service.MqttWebSocketHandler;
import com.example.intelligentparking.pojo.MessageDTO;
import com.fasterxml.jackson.databind.ObjectMapper;
private final MqttWebSocketHandler webSocketHandler = new MqttWebSocketHandler();
private ObjectMapper objectMapper = new ObjectMapper();
// 接收到消息时被调用
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String messageContent = new String(message.getPayload());
System.out.println("我是订阅消息的!");
System.out.println("topic: " + topic);
System.out.println("Qos: " + message.getQos());
System.out.println("message content: " + messageContent);
// 将消息内容转换为 JSON(websocket中以json数据传递)
MessageDTO messageDTO = new MessageDTO(messageContent);
String jsonMessage = objectMapper.writeValueAsString(messageDTO);
// 通过 WebSocket 发送消息到前端
webSocketHandler.sendMessageToAll(jsonMessage);
}
对应websocket前端的js代码
// client.js
const WebSocket = require('ws');
// 创建 WebSocket 连接
const socket = new WebSocket('ws://localhost:8086/mqtt-websocket');
// 连接成功时的回调
socket.on('open', () => {
console.log('WebSocket 连接已建立');
});
// 接收到消息时的回调
socket.on('message', (res) => {
console.log('收到消息:', JSON.parse(res));
// 在这里可以进行消息验证或处理
});
// 连接关闭时的回调
socket.on('close', () => {
console.log('WebSocket 连接已关闭');
});
// 连接错误时的回调
socket.on('error', (error) => {
console.error('WebSocket 错误:', error);
});
相关类:Result、MessageDTO
package com.example.intelligentparking.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
// 统一响应结果
@NoArgsConstructor//无参构造
@AllArgsConstructor//全参构造
@Data
public class Result<T> {
private Integer code; //业务状态码 0-成功 1-失败
private String message; //提示信息
private T data; //响应数据
//返回操作成功响应结果(带参数)
public static <E> Result<E> success(E data){
return new Result<>(0,"操作成功",data);
}
//返回操作成功响应结果
public static Result success(){
return new Result(0,"操作成功",null);
}
//返回操作失败的响应结果
public static Result error(String message){
return new Result( 1,message,null);
}
}
package com.example.intelligentparking.pojo;
public class MessageDTO {
private String content;
public MessageDTO(String content) {
this.content = content;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
综上就可以实现title的功能了
版权归原作者 緋儚 所有, 如有侵权,请联系我们删除。