0


springboot集成mqtt

文章目录

  • 前言
  • 一、MQTT是什么?
  • 二、继承步骤
    • 1.安装MQTT- 2.创建项目,引入依赖- 3. 对应步骤2的代码- 3 测试
  • 总结
    • mqtt 启动后访问地址

前言

随着物联网的火热,MQTT的应用逐渐增多

曾经也有幸使用过mqtt,今天正好总结下MQTT的使用;


一、MQTT是什么?

可以把他理解为,也是一种mq消息,设计简单且轻量级,通讯报文开销小,占用的网络带宽和资源较少,适用于低带宽、不稳定网络环境下的通讯。
MQTT采用发布/订阅模式,分为发布者和订阅者两个角色,需要一个中介来协调发布者和订阅者之间的消息传递,这个中介就是MQTT代理(Broker)。
MQTT协议在物联网领域应用广泛,包括智能家居、工业自动化、智能交通系统等。


个人简单总结:

  1. 每个客户端可以订阅一个或者多个主题(发消息,收消息)
  2. 每个客户端不订阅主题,也可以发送主题消息(只接受消息,不发送消息)
  3. 客户端A发送消息给客户端B流程为:
客户端A>>>Broker>>>客户端B
--- 
前置条件:
a: 客户端A 发送主题消息,且与客户端B的订阅主题一致
b: 客户端B 订阅主题

二、继承步骤

1.安装MQTT

这里直接采用windows版本,解压版,比较快

  • 下载地址 MQTT-windows版本
  • 解压后,在bin文件下执行运行命令 .\emqx console
  • 访问MQTT管理页面 http://localhost:18083/#/ 用户名密码admin/publicMQTT管理页面

2.创建项目,引入依赖

大致分为如下步骤:

  • yml配置 主题 用户名 密码
  • 根据配置创建客户端实例,实例订阅主题
  • 实现 MqttCallback 接口1. 重连处理 connectionLost2. 消息接受处理 messageArrived3. 消息发生成功处理 deliveryComplete
  • 根据客户端信息发送某个主题的消息

3. 对应步骤2的代码

  1. yml配置
server:port:8081# 下面这里要看你自己的需求customer:mqtt:broker: tcp://127.0.0.1:1883clientList:#发布客户端ID-clientId: nxys_service
        #监听主题 同时订阅多个主题使用 - 分割开subscribeTopic: mqtt/publish
        #用户名userName: admin
        #密码password: public
      #接受客户端ID-clientId: receive_service
        #监听主题 同时订阅多个主题使用 - 分割开subscribeTopic: mqtt/receive
        #用户名userName: admin
        #密码password: public
  1. 实例信息获取
/**
 * Mqtt配置类
 */@Data@Configuration@ConfigurationProperties(prefix ="customer.mqtt")publicclassMqttConfig{/**
     * mqtt broker地址
     */String broker;/**
     * 需要创建的MQTT客户端
     */List<MqttClient> clientList;}
/**
 * MQTT客户端
 */@DatapublicclassMqttClient{/**
     * 客户端ID
     */privateString clientId;/**
     * 监听主题
     */privateString subscribeTopic;/**
     * 用户名
     */privateString userName;/**
     * 密码
     */privateString password;}
  1. 根据信息创建实例,订阅主题
/**
 * MQTT客户端创建
 */@Component@Slf4jpublicclassMqttClientCreate{@ResourceprivateMqttClientManager mqttClientManager;@AutowiredprivateMqttConfig mqttConfig;/**
     * 创建MQTT客户端
     */@PostConstructpublicvoidcreateMqttClient(){List<MqttClient> mqttClientList = mqttConfig.getClientList();for(MqttClient mqttClient : mqttClientList){
            log.info("{}", mqttClient);//创建客户端,客户端ID:demo,回调类跟客户端ID一致
            mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());}}}
/**
 * MQTT客户端管理类,如果客户端非常多后续可入redis缓存
 */@Slf4j@ComponentpublicclassMqttClientManager{@Value("${customer.mqtt.broker}")privateString mqttBroker;@ResourceprivateMqttCallBackContext mqttCallBackContext;/**
     * 存储MQTT客户端
     */publicstaticMap<String,MqttClient>MQTT_CLIENT_MAP=newConcurrentHashMap<>();publicstaticMqttClientgetMqttClientById(String clientId){returnMQTT_CLIENT_MAP.get(clientId);}/**
     * 创建mqtt客户端
     *
     * @param clientId       客户端ID
     * @param subscribeTopic 订阅主题,可为空
     * @param userName       用户名,可为空
     * @param password       密码,可为空
     * @return mqtt客户端
     */publicvoidcreateMqttClient(String clientId,String subscribeTopic,String userName,String password){MemoryPersistence persistence =newMemoryPersistence();try{MqttClient client =newMqttClient(mqttBroker, clientId, persistence);MqttConnectOptions connOpts =newMqttConnectOptions();if(null!= userName &&!"".equals(userName)){
                connOpts.setUserName(userName);}if(null!= password &&!"".equals(password)){
                connOpts.setPassword(password.toCharArray());}

            connOpts.setCleanSession(true);if(null!= subscribeTopic &&!"".equals(subscribeTopic)){AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);if(null== callBack){
                    callBack = mqttCallBackContext.getCallBack("default");}

                callBack.setClientId(clientId);
                callBack.setConnectOptions(connOpts);
                client.setCallback(callBack);}//连接mqtt服务端broker
            client.connect(connOpts);// 订阅主题if(null!= subscribeTopic &&!"".equals(subscribeTopic)){if(subscribeTopic.contains("-"))
                    client.subscribe(subscribeTopic.split("-"));else//                    if (!subscribeTopic.equals("mqtt/receive")){
                    client.subscribe(subscribeTopic);}}MQTT_CLIENT_MAP.putIfAbsent(clientId, client);}catch(MqttException e){
            log.error("Create mqttClient failed!", e);}}}
  1. 实现 MqttCallback 接口
/**
 * MQTT回调抽象类
 */@Slf4jpublicabstractclassAbsMqttCallBackimplementsMqttCallback{privateString clientId;privateMqttConnectOptions connectOptions;publicStringgetClientId(){return clientId;}publicvoidsetClientId(String clientId){this.clientId = clientId;}publicMqttConnectOptionsgetConnectOptions(){return connectOptions;}publicvoidsetConnectOptions(MqttConnectOptions connectOptions){this.connectOptions = connectOptions;}/**
     * 失去连接操作,进行重连
     *
     * @param throwable 异常
     */@OverridepublicvoidconnectionLost(Throwable throwable){try{if(null!= clientId){if(null!= dconnectOptions){MqttClientManager.getMqttClientById(clientId).connect(connectOptions);}else{MqttClientManager.getMqttClientById(clientId).connect();}}}catch(Exception e){
            log.error("{} reconnect failed!", e);}}/**
     * 接收订阅消息
     * @param topic    主题
     * @param mqttMessage 接收消息
     * @throws Exception 异常
     */@OverridepublicvoidmessageArrived(String topic,MqttMessage mqttMessage)throwsException{String content =newString(mqttMessage.getPayload());handleReceiveMessage(topic, content);}/**
     * 消息发送成功
     *
     * @param iMqttDeliveryToken toke
     */@OverridepublicvoiddeliveryComplete(IMqttDeliveryToken iMqttDeliveryToken){
        log.info("消息发送成功");}/**
     * 处理接收的消息
     * @param topic   主题
     * @param message 消息内容
     */protectedabstractvoidhandleReceiveMessage(String topic,String message);}
/**
 * 默认回调
 */@Slf4j@Component("default")publicclassDefaultMqttCallBackextendsAbsMqttCallBack{/**
     * @param topic   主题
     * @param message 消息内容
     */@OverrideprotectedvoidhandleReceiveMessage(String topic,String message){
        log.info("接收到主题---{}", topic);
        log.info("接收到消息---{}", message);// 你自己的消息处理业务}}
/**
 * MQTT订阅回调环境类
 */@Component@Slf4jpublicclassMqttCallBackContext{privatefinalMap<String,AbsMqttCallBack> callBackMap =newConcurrentHashMap<>();/**
     * 默认构造函数
     *
     * @param callBackMap 回调集合
     */publicMqttCallBackContext(Map<String,AbsMqttCallBack> callBackMap){this.callBackMap.clear();this.callBackMap.putAll(callBackMap);}/**
     * 获取MQTT回调类
     *
     * @param clientId 客户端ID
     * @return MQTT回调类
     */publicAbsMqttCallBackgetCallBack(String clientId){returnthis.callBackMap.get(clientId);}}
  1. 发送消息
@RestControllerpublicclassSendController{@ResourceMqttClientManager mqttClientManager;@RequestMapping("/sendMessage")publicStringsendMessage(String topic){try{MqttMessage mqttMessage =newMqttMessage("你好".getBytes());
            mqttClientManager.getMqttClientById("nxys_service").publish(topic,mqttMessage);return"发送成功";}catch(Exception e){
            e.printStackTrace();return"发送失败";}}}

3 测试

  1. 启动订阅,查看MQTT 管理页面两个实例
  2. 测试发送消息,查看发送情况,接受情况http://localhost:8081/sendMessage?topic=mqtt/receive发送成功,并接受到消息

总结

文中涉及的所有代码: MQTT-Demo

mqtt 启动后访问地址

http://localhost:18083/#/

  • 用户名/密码:
  • admin/public

  1. 每个客户端可以订阅一个或者多个主题
  2. 每个客户端不订阅主题,也可以发送主题消息
  3. 客户端A发送消息给客户端B流程为:
客户端A>>>Broker>>>客户端B
--- 
前置条件:
a: 客户端A 发送主题消息,且与客户端B的订阅主题一致
b: 客户端B 订阅主题

mqtt启动命令
在bin目录下,cmd 执行

.\emqx console
标签: spring boot java

本文转载自: https://blog.csdn.net/qq_32419139/article/details/136176948
版权归原作者 寂寞旅行 所有, 如有侵权,请联系我们删除。

“springboot集成mqtt”的评论:

还没有评论