随着MQTT的盛行,Java也整合了MQTT的相关依赖以实现在Java中实现MQTT消息的发布和订阅,接下来我基于spring boot来整合mqtt。
首先我们需要mqtt的环境,也就是服务器和本地测试软件,如果没有配置好,可以看我的上一篇文章去部署一下
文章链接:EMQX服务器本地部署和MQTTX连接发接消息-CSDN博客https://blog.csdn.net/zhdbshiai/article/details/143319892?utm_medium=notify.im.blog_audit.20241029.a&username=zhdbshiai
这里我们直接开始(mqtt环境ok的情况下)
新建一个springboot项目,然后我这里用的maven,在pom.xml文件中添加依赖
<!-- mqtt -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
接下来是yml文件的配置,注意ip和账号密码一定得是自己的,不然要报错。
server:
port: 8001
spring:
application:
name: mqtt-send
#mqtt属性配置
mqtt:
username: admin
password: 2811456516ZWQzwq
host: tcp://192.168.172.102:1883
然后是编写代码了,这是我的层级
上代码,我们搜先编写一个实体类用于接收yml文件的配置,也用于后续服务启动后的相关数据,
代码如下
@Configuration
// MQTT配置类
public class MqttConfig {
// exmq服务器地址
@Value("${mqtt.host}")
private String host;
// 定义客户端ID,使用"DC"加上一个随机生成的数字
private final String clientId = "DC" + new Random().nextInt(100000000);
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
// 定义连接超时时间,默认为10秒,如果未在属性中指定,则使用默认值
@Value("${mqtt.connection.timeout:10}")
private int connectionTimeout;
private static MqttClient mqttClient;
/*
* MQTT连接参数设置
*/
private MqttConnectOptions mqttConnectOptions(String userName, String passWord, String host) throws MqttException {
mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(connectionTimeout); // 设置连接超时时间
options.setAutomaticReconnect(true); // 开启自动重连
options.setCleanSession(false); // 设置为false,表示不清除会话session
// 可以根据需要设置其他参数,例如 options.setKeepAliveInterval(20); 设置心跳间隔时间,默认为60秒
return options;
}
// 创建一个MqttClient的Bean实例,用于连接MQTT代理
@Bean
public MqttClient mqttClient() throws MqttException {
MqttConnectOptions options = mqttConnectOptions(username, password, host);
try {
mqttClient.connect(options);
} catch (MqttException e) {
System.out.println("连接失败:" + e.getMessage());
}
return mqttClient;
}
// 发布消息
public void publish(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos); // 设置消息质量
mqttMessage.setRetained(true); // 设置保留消息
mqttMessage.setPayload(msg.getBytes()); // 设置消息内容
try {
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage); // 发布消息
token.waitForCompletion(); // 等待发布完成
System.out.println("发送消息:Topic=" + topic + ", Message=" + msg.getBytes());
} catch (MqttException e) {
MqttConnectOptions options = mqttConnectOptions(username, password, host);
reconnect(mqttClient, options, topic, mqttMessage); // 递归重新连接并重试发送消息
}
}
// 重新连接并重试发送消息
private static void reconnect(MqttClient mqttClient, MqttConnectOptions mqttConnectOptions, String topic, MqttMessage mqttMessage) throws MqttException {
try {
// 等待一段时间,可以根据需要调整等待时间
Thread.sleep(5000);
// 重新连接 MqttClient
mqttClient.connect(mqttConnectOptions);
// 判断是否连接成功
if (mqttClient.isConnected()) {
System.out.println("发送方MQTT 客户端已成功连接到 MQTT 代理。");
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage); // 重新发布消息
token.waitForCompletion();
}
} catch (MqttException | InterruptedException e) {
System.out.println("发送方重新连接失败:" + e.getMessage());
reconnect(mqttClient, mqttConnectOptions, topic, mqttMessage); // 重连失败,继续重试
}
}
接下来就可以编写一个controller来实现消息的发送了,接下来是controller的代码
@RestController
public class MessageController {
@Autowired
private MqttConfig mqttConfig;
@PostMapping("/publish")
public String publishMessage(@RequestBody String message) {
try {
mqttConfig.publish("li", message,1);
return "发送成功.";
} catch (MqttException e) {
return "发送失败: " + e.getMessage();
}
}
}
这里我们通过controller在我们启动web后访问这个链接就可以实现发送一条自己自定义的消息,现在我们还缺少一个服务端,用于我么解释对应主题发送过来的消息,这里我使用的是mqttx,当然也可以通过代码实现,接下来是服务端代码,注意ip和账号密码一定得是自己的,不然要报错。
public class Client {
public static void main(String[] args) {
String host = "tcp://192.168.172.102:1883";
String clientId = "Client_B";
String topic = "li";
try {
MqttClient mqttClient = new MqttClient(host, clientId);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("public".toCharArray());
// 连接到 EMQ X Broker
mqttClient.connect(mqttConnectOptions);
// 设置消息回调
mqttClient.setCallback(new MqttCallback() {
@SneakyThrows
@Override
public void connectionLost(Throwable cause) {
// 处理连接丢失的情况
System.out.println("连接丢失,尝试重新连接...");
reconnect(mqttClient, mqttConnectOptions, topic);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 处理接收到的消息
String payload = new String(message.getPayload());
System.out.println("收到消息:Topic=" + topic + ", Message=" + payload);
}
@SneakyThrows
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 处理消息发送完成的情况
System.out.println("消息发送完成: " + token.getMessage().getPayload());
}
});
// 订阅主题
mqttClient.subscribe(topic);
// 保持连接,防止程序退出
// 这里可以根据需要,设置一个条件或者等待一段时间,确保程序能够保持连接
while (true) {
Thread.sleep(1000);
}
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}
private static void reconnect(MqttClient mqttClient, MqttConnectOptions mqttConnectOptions, String topic) throws MqttException {
try {
// 等待一段时间,可以根据需要调整等待时间
Thread.sleep(5000);
// 重新连接 MqttClient
mqttClient.connect(mqttConnectOptions);
// 判断是否连接成功
if (mqttClient.isConnected()) {
System.out.println("MQTT 客户端已成功连接到 MQTT 代理。");
// 重新订阅主题
mqttClient.subscribe(topic);
}
} catch (MqttException | InterruptedException e) {
System.out.println("重新连接失败:" + e.getMessage());
reconnect(mqttClient, mqttConnectOptions, topic);
}
}
}
这里代码就已经全部ok了,接下来本地启动。
这里可以看见都连接上了,接下来就是发送一条消息了,这里我使用Apifox来发送消息的,如果大家也需要。
官网链接Apifox - API 文档、调试、Mock、测试一体化协作平台。拥有接口文档管理、接口调试、Mock、自动化测试等功能,接口开发、测试、联调效率,提升 10 倍。最好用的接口文档管理工具,接口自动化测试工具。https://apifox.com/
添加一个接口
输入我们对应的接口信息,然后保存。
然后运行,这里直接自动生成数据,然后发送消息
这里就可以看见我们的mqttx接收到了数据,并且代码写的服务端也接收到了数据。
这样,springboot整合mqtt就整合完毕了,是不是感觉很简单
如果有其他编程问题,欢迎来q裙463727795一起来探讨哦!!!
版权归原作者 zhdbshiai 所有, 如有侵权,请联系我们删除。