Spring Boot需要
与硬件设备进行交互
,使用
MQTT
作为通信协议。对于这种情况,
MQTT
的服务端
(Broker)
需要在服务器上安装和配置,
硬件设备和Java应用程序(Spring Boot)通过该Broker进行消息的发布和订阅
。以下是详细步骤:
一、服务器安装步骤
安装 Mosquitto (MQTT Broker)
Ubuntu:
# 更新软件包列表sudoapt-get update
# 安装 Mosquitto 及其客户端工具sudoapt-getinstall mosquitto mosquitto-clients
# 配置 Mosquitto 开机自启动sudo systemctl enable mosquitto
# 启动 Mosquitto 服务sudo systemctl start mosquitto
CentOS:
# 更新软件包列表sudo yum update
# 安装 EPEL 仓库sudo yum install epel-release
# 安装 Mosquittosudo yum install mosquitto
# 配置 Mosquitto 开机自启动sudo systemctl enable mosquitto
# 启动 Mosquitto 服务sudo systemctl start mosquitto
配置 Mosquitto(可选)
默认情况下,
Mosquitto
允许匿名连接,但为了安全性,可以配置用户名和密码。在
/etc/mosquitto/mosquitto.conf
文件中进行配置:
# 启用监听1883端口
listener 1883# 允许匿名访问
allow_anonymous true
二、Spring Boot 配置和工具类
添加依赖
在
pom.xml
文件中添加以下依赖:
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId></dependency>
application.yml配置
可以将所有
MQTT
相关的配置,包括
client_id
和
默认主题
,配置在
application.yml
中
mqtt:broker:# 设备MQTT应用配置url: tcp://your_server_ip:1883username: your_username
password: your_password
# 自定义约定client_idclientId: your_client_id
topics:# 默认监听default: your/default_topic
# 自定义监听subscriptions:- your/topic1
- your/topic2
配置类
创建一个
MqttConfig
配置类:
importorg.eclipse.paho.client.mqttv3.MqttConnectOptions;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.integration.annotation.ServiceActivator;importorg.springframework.integration.channel.DirectChannel;importorg.springframework.integration.core.MessageProducer;importorg.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;importorg.springframework.integration.mqtt.core.MqttPahoClientFactory;importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;importorg.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;importorg.springframework.messaging.MessageChannel;importorg.springframework.messaging.MessageHandler;importorg.springframework.beans.factory.annotation.Autowired;@ConfigurationpublicclassMqttConfig{@AutowiredprivateMqttUtil mqttUtil;@Value("${mqtt.broker.url}")privateString brokerUrl;@Value("${mqtt.broker.username}")privateString username;@Value("${mqtt.broker.password}")privateString password;@Value("${mqtt.broker.clientId}")privateString clientId;@Value("${mqtt.topics.default}")privateString defaultTopic;@Value("${mqtt.topics.subscriptions}")privateString[] topics;// 配置MQTT客户端工厂@BeanpublicMqttPahoClientFactorymqttClientFactory(){DefaultMqttPahoClientFactory factory =newDefaultMqttPahoClientFactory();MqttConnectOptions options =newMqttConnectOptions();
options.setServerURIs(newString[]{brokerUrl});
options.setUserName(username);
options.setPassword(password.toCharArray());
factory.setConnectionOptions(options);return factory;}// (接收消息:步骤一)定义一个消息通道,用于接收消息@BeanpublicMessageChannelmqttInputChannel(){returnnewDirectChannel();}// (接收消息:步骤二)配置消息驱动通道适配器,用于接收指定主题的消息@BeanpublicMessageProducerinbound(){MqttPahoMessageDrivenChannelAdapter adapter =newMqttPahoMessageDrivenChannelAdapter(clientId,mqttClientFactory(), topics);
adapter.setOutputChannel(mqttInputChannel());return adapter;}// (接收消息:步骤三)配置消息处理器,用于处理接收到的消息@Bean@ServiceActivator(inputChannel ="mqttInputChannel")publicMessageHandlerhandler(){return message ->{String topic = message.getHeaders().get("mqtt_receivedTopic").toString();String payload = message.getPayload().toString();System.out.println("接收到消息: 主题 = "+ topic +", 内容 = "+ payload);// 发布接收到的消息事件
mqttUtil.publishReceivedMessage(topic, payload);};}// 配置MQTT消息发送处理器@Bean@ServiceActivator(inputChannel ="mqttOutboundChannel")publicMessageHandlermqttOutbound(){MqttPahoMessageHandler messageHandler =newMqttPahoMessageHandler(clientId,mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);// 设置默认主题return messageHandler;}// 定义一个消息通道,用于发送消息@BeanpublicMessageChannelmqttOutboundChannel(){returnnewDirectChannel();}}
MQTT 工具类
创建一个
MqttUtil
工具类:
importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.ApplicationEventPublisher;importorg.springframework.messaging.MessageChannel;importorg.springframework.messaging.support.MessageBuilder;importorg.springframework.stereotype.Component;@ComponentpublicclassMqttUtil{@AutowiredprivateMessageChannel mqttOutboundChannel;@AutowiredprivateApplicationEventPublisher eventPublisher;/**
* 发送消息到默认主题
* @param payload 消息内容
*/publicvoidsendMessage(String payload){
mqttOutboundChannel.send(MessageBuilder.withPayload(payload).build());}/**
* 发送消息到指定主题
* @param topic 主题
* @param payload 消息内容
*/publicvoidsendMessage(String topic,String payload){
mqttOutboundChannel.send(MessageBuilder.withPayload(payload).setHeader("mqtt_topic", topic).build());}/**
* (接收消息:步骤四)发布接收到的MQTT消息事件
* @param topic 主题
* @param payload 消息内容
*/publicvoidpublishReceivedMessage(String topic,String payload){MqttMessageEvent event =newMqttMessageEvent(this, topic, payload);
eventPublisher.publishEvent(event);}}
MQTT 消息事件( MqttUtil.publishReceivedMessage() 方法使用)
创建一个事件类,用于封装接收到的
MQTT
消息:
importorg.springframework.context.ApplicationEvent;publicclassMqttMessageEventextendsApplicationEvent{privatefinalString topic;privatefinalString payload;publicMqttMessageEvent(Object source,String topic,String payload){super(source);this.topic = topic;this.payload = payload;}publicStringgetTopic(){return topic;}publicStringgetPayload(){return payload;}}
消息监听器( MqttUtil.publishReceivedMessage() 方法使用)
创建一个监听器,用于处理接收到的
MQTT
消息:
importorg.springframework.context.event.EventListener;importorg.springframework.stereotype.Component;@ComponentpublicclassMqttMessageListener{/**
* (接收消息:步骤五)事件监听器中处理消息
* 说明:此方法将接收到上方 MqttUtil.publishReceivedMessage() 发布的事件并处理这些消息
* @param event 接收到的事件
*/@EventListenerpublicvoidhandleMqttMessage(MqttMessageEvent event){String topic = event.getTopic();String payload = event.getPayload();System.out.println("处理接收到的消息: 主题 = "+ topic +", 内容 = "+ payload);// 根据不同的主题进行不同的处理if("your/topic1".equals(topic)){handleTopic1Message(payload);}elseif("your/topic2".equals(topic)){handleTopic2Message(payload);}}privatevoidhandleTopic1Message(String payload){// 处理来自 topic1 的消息System.out.println("处理 topic1 消息: "+ payload);}privatevoidhandleTopic2Message(String payload){// 处理来自 topic2 的消息System.out.println("处理 topic2 消息: "+ payload);}}
使用示例
在你的Spring Boot应用中,可以通过注入
MqttUtil
来发送消息:
importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassMqttController{@AutowiredprivateMqttUtil mqttUtil;@GetMapping("/send")publicStringsendMessage(){
mqttUtil.sendMessage("your/topic1","Hello from Spring Boot");return"Message sent!";}}
运行流程
- 1.在
application.yml
文件中配置MQTT
服务器的URL、用户名、密码、client_id、默认主题和订阅的主题
。 - 2.启动
MQTT Broker(如Mosquitto)
。 - 3.确保硬件设备连接到
MQTT Broker
,并发布和订阅消息。 - 4.启动Spring Boot应用程序,确保它可以
发送
和接收MQTT
消息。 - 5.在控制台中查看接收到的消息,并根据不同的主题进行处理。
通过以上步骤,你可以在
Spring Boot
中配置和使用
MQTT
,并将其封装为工具类和事件监听机制,方便其他项目集成和使用。这种配置方式使得
MQTT
连接认证更加安全,代码更加灵活。
30岁终于买了一台不卡的电脑,可 坐在电脑前我的 却卡住了…
版权归原作者 吴思伟 所有, 如有侵权,请联系我们删除。