引言
本文是springboot集成mqtt的一个实战案例。
gitee代码库地址:源码地址
一、什么是MQTT
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于 TCP/IP 协议上,由 IBM 于 1999 年发明。MQTT 协议的主要特征是开放、简单、轻量级和易于实现,这些特征使得它适用于受约束的应用环境,如:
网络受限:网络带宽较低且传输不可靠
终端受限:协议运行在嵌入式设备上,嵌入式终端的处理器、内存等是受限的
MQTT 非常适用于物联网领域,如传感器与服务器的通信、传感器信息采集等。
二、发布/订阅模式
发布/订阅模式(Publish/Subscribe Pattern,简称Pub/Sub)是一种消息通信模式,在这种模式下,消息的发送者(发布者)不会将消息直接发送给特定的接收者(订阅者)。而是将代表消息内容的通知(事件)发布到一个特定的主题或频道上,而订阅了这个主题的接收者会收到所有在这个主题上发布的通知。这种模式解耦了消息的发送者和接收者,使得系统更加灵活和可扩展。
主要组成部分
- 发布者(Publisher):负责生成消息并将其发布到特定的主题或频道。
- 订阅者(Subscriber):注册对特定主题的兴趣,并接收该主题上的所有消息。
- 消息代理(Message Broker):作为中间件,它接收来自发布者的消息,并将这些消息传递给所有相关的订阅者。
优点
- 解耦:发布者和订阅者之间不需要直接交互,这降低了系统的耦合度。
- 灵活性:可以动态添加或删除订阅者,不影响其他组件。
- 可扩展性:系统容易扩展,可以轻松增加新的发布者或订阅者。
缺点
- 复杂性:引入了额外的组件(如消息代理),增加了系统的复杂性和管理成本。
- 性能开销:消息的传递需要通过中间件,可能会有延迟和性能损失。
应用场景
- 事件驱动架构:在微服务架构中,不同的服务通过发布/订阅模式进行异步通信。
- 数据流处理:如实时数据分析,多个组件可以订阅数据流并进行处理。
- 分布式系统:用于跨系统或跨服务的消息传递。
发布/订阅模式并不是 MQTT 协议特有的模式,很多消息中间件都有使用发布/订阅模式,有同学可能认为这就是观察者模式,还真不是,这两个模式很容易混淆。观察者模式只有观察者 + 被观察者两个角色,而发布/订阅模式还有一个经纪人 Broker;往更深层次的讲观察者和被观察者,是松耦合的关系,而发布者和订阅者,则完全不存在耦合。
三、Windows下安装MQTT消息服务器
非常遗憾,EMQ X Broker 在 5.4.0 版本的发行版中已不支持 windows 版本的安装包了,笔者从网上找了一个最后支持版本的压缩包,已上传资源。
- 解压后,在bin文件下,使用cmd执行运行命令 .\emqx console
- 访问MQTT管理页面 http://localhost:18083/#/ 用户名密码 admin/public
如果报错缺少Erlang环境,需要自行安装下该环境
浏览器访问:http://localhost:18083/#,输入账号密码进入,会要求你修改密码,可以暂时跳过
四、Windows安装MQTT消息代理客户端MQTTX
下载地址:MQTTX下载地址
点击免费下载
选择64位版本
下好后点击安装,启动运行界面如下:
语言是英文,可以在设置按钮里调成中文。这个客户端代理主要是进行消息发送的测试服务。
五、新建MQTT集成项目
随便新建了一个springboot应用,用的是JDK17,在pom文件中引入如下依赖:
<!-- MQTT --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
5.1 yml配置
server:port:8081#允许循环依赖spring:main:allow-circular-references:truecustomer:mqtt:broker: tcp://localhost:1883clientList:#发布客户端ID-clientId: nays_service
#监听主题 同时订阅多个主题 使用 - 分割开subscribeTopic: mqtt/publish
#用户名userName: admin
#密码password: public
#接收客户端ID-clientId: receive_service
#监听主题 同时订阅多个主题 使用 - 分割开subscribeTopic: mqtt/receive
#用户名userName: admin
#密码password: public
5.2 Mqtt配置类
packagecom.hulei.mqttproject.config;importlombok.Data;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Configuration;importjava.util.List;/**
* Mqtt配置类
*/@Data@Configuration@ConfigurationProperties(prefix ="customer.mqtt")publicclassMqttConfig{/**
* mqtt broker地址
*/String broker;/**
* 需要创建的MQTT客户端
*/List<MqttClient> clientList;}
5.3 MQTT客户端
packagecom.hulei.mqttproject.config;importlombok.Data;/**
* MQTT客户端
*/@DatapublicclassMqttClient{/**
* 客户端ID
*/privateString clientId;/**
* 监听主题
*/privateString subscribeTopic;/**
* 用户名
*/privateString userName;/**
* 密码
*/privateString password;}
5.4 MQTT客户端管理类
packagecom.hulei.mqttproject.config;importjakarta.annotation.Resource;importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.eclipse.paho.client.mqttv3.MqttException;importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;importorg.eclipse.paho.client.mqttv3.MqttClient;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;/**
* MQTT客户端管理类,如果客户端非常多后续可入redis缓存
*/@Slf4j@ComponentpublicclassMqttClientManager{@Value("${customer.mqtt.broker}")privateString mqttBroker;@ResourceprivateMqttCallBackContext mqttCallBackContext;/**
* 存储MQTT客户端
*/publicstaticMap<String,MqttClient>MQTT_CLIENT_MAP=newConcurrentHashMap<>();publicMqttClientgetMqttClientById(String clientId){returnMQTT_CLIENT_MAP.get(clientId);}/**
* 创建mqtt客户端
*
* @param clientId 客户端ID
* @param subscribeTopic 订阅主题,可为空
* @param userName 用户名,可为空
* @param password 密码,可为空
*/publicvoidcreateMqttClient(String clientId,String subscribeTopic,String userName,String password){MemoryPersistence persistence =newMemoryPersistence();try{MqttClient client =newMqttClient(mqttBroker, clientId, persistence);MqttConnectOptions connOpts =newMqttConnectOptions();if(null!= userName &&!userName.isEmpty()){
connOpts.setUserName(userName);}if(null!= password &&!password.isEmpty()){
connOpts.setPassword(password.toCharArray());}
connOpts.setCleanSession(true);if(null!= subscribeTopic &&!subscribeTopic.isEmpty()){AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);if(null== callBack){
callBack = mqttCallBackContext.getCallBack("default");}
callBack.setClientId(clientId);
callBack.setConnectOptions(connOpts);
client.setCallback(callBack);}//连接mqtt服务端broker
client.connect(connOpts);// 订阅主题if(null!= subscribeTopic &&!subscribeTopic.isEmpty()){if(subscribeTopic.contains("-"))
client.subscribe(subscribeTopic.split("-"));else{
client.subscribe(subscribeTopic);}}MQTT_CLIENT_MAP.putIfAbsent(clientId, client);}catch(MqttException e){
log.error("Create mqttClient failed!", e);}}}
5.5 MQTT客户端创建
packagecom.hulei.mqttproject.config;importjakarta.annotation.PostConstruct;importjakarta.annotation.Resource;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Component;importjava.util.List;/**
* MQTT客户端创建
*/@Component@Slf4jpublicclassMqttClientCreate{@ResourceprivateMqttClientManager mqttClientManager;@ResourceprivateMqttConfig mqttConfig;/**
* 创建MQTT客户端
*/@PostConstructpublicvoidcreateMqttClient(){List<MqttClient> mqttClientList = mqttConfig.getClientList();for(MqttClient mqttClient : mqttClientList){
log.info("{}", mqttClient);//创建客户端,客户端ID:demo,回调类跟客户端ID一致
mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());}}}
5.6 MQTT回调抽象类
packagecom.hulei.mqttproject.config;importjakarta.annotation.Resource;importlombok.Getter;importlombok.Setter;importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.IMqttDeliveryToken;importorg.eclipse.paho.client.mqttv3.MqttCallback;importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.eclipse.paho.client.mqttv3.MqttMessage;/**
* MQTT回调抽象类
*/@Setter@Getter@Slf4jpublicabstractclassAbsMqttCallBackimplementsMqttCallback{privateString clientId;privateMqttConnectOptions connectOptions;@ResourceMqttClientManager mqttClientManager;/**
* 失去连接操作,进行重连
*
* @param throwable 异常
*/@OverridepublicvoidconnectionLost(Throwable throwable){try{if(null!= clientId){if(null!= connectOptions){
mqttClientManager.getMqttClientById(clientId).connect(connectOptions);}else{
mqttClientManager.getMqttClientById(clientId).connect();}}}catch(Exception e){
log.error("{} reconnect failed!", e.getMessage(), e);}}/**
* 接收订阅消息
* @param topic 主题
* @param mqttMessage 接收消息
* @throws Exception 异常
*/@OverridepublicvoidmessageArrived(String topic,MqttMessage mqttMessage)throwsException{String content =newString(mqttMessage.getPayload());handleReceiveMessage(topic, content);}/**
* 消息发送成功
*
* @param iMqttDeliveryToken toke
*/@OverridepublicvoiddeliveryComplete(IMqttDeliveryToken iMqttDeliveryToken){
log.info("消息发送成功");}/**
* 处理接收的消息
* @param topic 主题
* @param message 消息内容
*/protectedabstractvoidhandleReceiveMessage(String topic,String message);}
5.7 MQTT订阅回调环境类
packagecom.hulei.mqttproject.config;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Component;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;/**
* MQTT订阅回调环境类
*/@Component@Slf4jpublicclassMqttCallBackContext{privatefinalMap<String,AbsMqttCallBack> callBackMap =newConcurrentHashMap<>();/**
* 默认构造函数
*
* @param callBackMap 回调集合
*/publicMqttCallBackContext(Map<String,AbsMqttCallBack> callBackMap){this.callBackMap.putAll(callBackMap);}/**
* 获取MQTT回调类
*
* @param clientId 客户端ID
* @return MQTT回调类
*/publicAbsMqttCallBackgetCallBack(String clientId){returnthis.callBackMap.get(clientId);}}
5.8 默认回调类
packagecom.hulei.mqttproject.config;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Component;/**
* 默认回调
*/@Slf4j@Component("default")publicclassDefaultMqttCallBackextendsAbsMqttCallBack{/**
* @param topic 主题
* @param message 消息内容
*/@OverrideprotectedvoidhandleReceiveMessage(String topic,String message){
log.info("接收到主题---{}", topic);
log.info("接收到消息---{}", message);// 自定义消息处理业务}}
六、测试服务类
packagecom.hulei.mqttproject.controller;importcom.hulei.mqttproject.config.MqttClientManager;importjakarta.annotation.Resource;importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.MqttMessage;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestController@Slf4jpublicclassSendController{@ResourceprivateMqttClientManager mqttClientManager;@RequestMapping("/sendMessage")publicStringsendMessage(String topic){try{MqttMessage mqttMessage =newMqttMessage("你好".getBytes());
mqttClientManager.getMqttClientById("nays_service").publish(topic,mqttMessage);return"发送成功";}catch(Exception e){
log.error("发送失败",e);return"发送失败";}}}
七、启动springboot
启动日志可以看到,mqtt消息服务器连接成功
EMQX工具显示发布客户端和接收客户端均已成功注册
使用Apifox测试下SendController中的接口,mqtt/receive是yaml中接收客户端订阅的主题,当然也可以往mqtt/publish主题发,mqtt中消息的发布者也可以订阅主题,监听某些消息。
版权归原作者 熊出没 所有, 如有侵权,请联系我们删除。