0


spring boot整合MQTT

随着MQTT的盛行,Java也整合了MQTT的相关依赖以实现在Java中实现MQTT消息的发布和订阅,接下来我基于spring boot来整合mqtt。

首先我们需要mqtt的环境,也就是服务器和本地测试软件,如果没有配置好,可以看我的上一篇文章去部署一下

文章链接:EMQX服务器本地部署和MQTTX连接发接消息-CSDN博客https://blog.csdn.net/zhdbshiai/article/details/143319892?utm_medium=notify.im.blog_audit.20241029.a&username=zhdbshiai

这里我们直接开始(mqtt环境ok的情况下)

新建一个springboot项目,然后我这里用的maven,在pom.xml文件中添加依赖

<!-- mqtt -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

接下来是yml文件的配置,注意ip和账号密码一定得是自己的,不然要报错。

server:
  port: 8001

spring:
  application:
    name: mqtt-send

#mqtt属性配置
mqtt:
  username: admin
  password: 2811456516ZWQzwq
  host: tcp://192.168.172.102:1883

然后是编写代码了,这是我的层级

上代码,我们搜先编写一个实体类用于接收yml文件的配置,也用于后续服务启动后的相关数据,

代码如下

@Configuration
// MQTT配置类
public class MqttConfig {

    // exmq服务器地址
    @Value("${mqtt.host}")
    private String host;

    // 定义客户端ID,使用"DC"加上一个随机生成的数字
    private final String clientId = "DC" + new Random().nextInt(100000000);

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    // 定义连接超时时间,默认为10秒,如果未在属性中指定,则使用默认值
    @Value("${mqtt.connection.timeout:10}")
    private int connectionTimeout;

    private static MqttClient mqttClient;

    /*
     * MQTT连接参数设置
     */
    private MqttConnectOptions mqttConnectOptions(String userName, String passWord, String host) throws MqttException {
        mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        options.setConnectionTimeout(connectionTimeout); // 设置连接超时时间
        options.setAutomaticReconnect(true); // 开启自动重连
        options.setCleanSession(false); // 设置为false,表示不清除会话session
        // 可以根据需要设置其他参数,例如 options.setKeepAliveInterval(20); 设置心跳间隔时间,默认为60秒
        return options;
    }

    // 创建一个MqttClient的Bean实例,用于连接MQTT代理
    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttConnectOptions options = mqttConnectOptions(username, password, host);
        try {
            mqttClient.connect(options);
        } catch (MqttException e) {
            System.out.println("连接失败:" + e.getMessage());
        }
        return mqttClient;
    }

    // 发布消息
    public void publish(String topic, String msg, int qos) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos); // 设置消息质量
        mqttMessage.setRetained(true); // 设置保留消息
        mqttMessage.setPayload(msg.getBytes()); // 设置消息内容
        try {
            MqttTopic mqttTopic = mqttClient.getTopic(topic);
            MqttDeliveryToken token = mqttTopic.publish(mqttMessage); // 发布消息
            token.waitForCompletion(); // 等待发布完成
            System.out.println("发送消息:Topic=" + topic + ", Message=" + msg.getBytes());
        } catch (MqttException e) {
            MqttConnectOptions options = mqttConnectOptions(username, password, host);
            reconnect(mqttClient, options, topic, mqttMessage); // 递归重新连接并重试发送消息
        }
    }

    // 重新连接并重试发送消息
    private static void reconnect(MqttClient mqttClient, MqttConnectOptions mqttConnectOptions, String topic, MqttMessage mqttMessage) throws MqttException {
        try {
            // 等待一段时间,可以根据需要调整等待时间
            Thread.sleep(5000);
            // 重新连接 MqttClient
            mqttClient.connect(mqttConnectOptions);
            // 判断是否连接成功
            if (mqttClient.isConnected()) {
                System.out.println("发送方MQTT 客户端已成功连接到 MQTT 代理。");
                MqttTopic mqttTopic = mqttClient.getTopic(topic);
                MqttDeliveryToken token = mqttTopic.publish(mqttMessage); // 重新发布消息
                token.waitForCompletion();
            }
        } catch (MqttException | InterruptedException e) {
            System.out.println("发送方重新连接失败:" + e.getMessage());
            reconnect(mqttClient, mqttConnectOptions, topic, mqttMessage); // 重连失败,继续重试
        }
    }

接下来就可以编写一个controller来实现消息的发送了,接下来是controller的代码

@RestController
public class MessageController {
 
    @Autowired
    private MqttConfig mqttConfig;
 
 
    @PostMapping("/publish")
    public String publishMessage(@RequestBody String message) {
        try {
            mqttConfig.publish("li", message,1);
            return "发送成功.";
        } catch (MqttException e) {
            return "发送失败: " + e.getMessage();
        }
    }
}

这里我们通过controller在我们启动web后访问这个链接就可以实现发送一条自己自定义的消息,现在我们还缺少一个服务端,用于我么解释对应主题发送过来的消息,这里我使用的是mqttx,当然也可以通过代码实现,接下来是服务端代码,注意ip和账号密码一定得是自己的,不然要报错。

public class Client {
    public static void main(String[] args) {

        String host = "tcp://192.168.172.102:1883";
        String clientId = "Client_B";
        String topic = "li";
        try {
            MqttClient mqttClient = new MqttClient(host, clientId);
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setUserName("admin");
            mqttConnectOptions.setPassword("public".toCharArray());

            // 连接到 EMQ X Broker
            mqttClient.connect(mqttConnectOptions);

            // 设置消息回调
            mqttClient.setCallback(new MqttCallback() {
                @SneakyThrows
                @Override
                public void connectionLost(Throwable cause) {
                    // 处理连接丢失的情况
                    System.out.println("连接丢失,尝试重新连接...");
                    reconnect(mqttClient, mqttConnectOptions, topic);
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    // 处理接收到的消息
                    String payload = new String(message.getPayload());
                    System.out.println("收到消息:Topic=" + topic + ", Message=" + payload);
                }

                @SneakyThrows
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // 处理消息发送完成的情况
                    System.out.println("消息发送完成: " + token.getMessage().getPayload());
                }
            });

            // 订阅主题
            mqttClient.subscribe(topic);

            // 保持连接,防止程序退出
            // 这里可以根据需要,设置一个条件或者等待一段时间,确保程序能够保持连接
            while (true) {
                Thread.sleep(1000);
            }
        } catch (MqttException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void reconnect(MqttClient mqttClient, MqttConnectOptions mqttConnectOptions, String topic) throws MqttException {
        try {
            // 等待一段时间,可以根据需要调整等待时间
            Thread.sleep(5000);
            // 重新连接 MqttClient
            mqttClient.connect(mqttConnectOptions);
            // 判断是否连接成功
            if (mqttClient.isConnected()) {
                System.out.println("MQTT 客户端已成功连接到 MQTT 代理。");
                // 重新订阅主题
                mqttClient.subscribe(topic);
            }
        } catch (MqttException | InterruptedException e) {
            System.out.println("重新连接失败:" + e.getMessage());
            reconnect(mqttClient, mqttConnectOptions, topic);
        }
    }
}

这里代码就已经全部ok了,接下来本地启动。

这里可以看见都连接上了,接下来就是发送一条消息了,这里我使用Apifox来发送消息的,如果大家也需要。

官网链接Apifox - API 文档、调试、Mock、测试一体化协作平台。拥有接口文档管理、接口调试、Mock、自动化测试等功能,接口开发、测试、联调效率,提升 10 倍。最好用的接口文档管理工具,接口自动化测试工具。https://apifox.com/

添加一个接口

输入我们对应的接口信息,然后保存。

然后运行,这里直接自动生成数据,然后发送消息

这里就可以看见我们的mqttx接收到了数据,并且代码写的服务端也接收到了数据。

这样,springboot整合mqtt就整合完毕了,是不是感觉很简单

如果有其他编程问题,欢迎来q裙463727795一起来探讨哦!!!

标签: spring boot 后端 java

本文转载自: https://blog.csdn.net/zhdbshiai/article/details/143320916
版权归原作者 zhdbshiai 所有, 如有侵权,请联系我们删除。

“spring boot整合MQTT”的评论:

还没有评论