本文只讲述如何集成并整合,对于前期工作过多概述
一、安装EMQX MQTT服务器
EMQX 是一个高性能、可扩展的物联网消息中间件,EMQX 实现了 MQTT 协议的服务器端,即 MQTT Broker,它负责接收来自客户端的连接请求,处理订阅、发布消息,并将消息转发给相应的订阅者。
进入EMQX官方网站下载EMQX并启动成功
https://www.emqx.com/en
二、业务场景
1、实时接收发送(MQ)接收发送消息并实时主动推送(Websockte)给前端(广义场景)
2、硬件采集的数据传入EMQX平台(采用MQTT协议),java通过代码连接MQTT服务器,进行采集数据接收、解析、业务处理、存储入库、数据展示。
MQTT 是基于 发布(Publish)/订阅(Subscribe) 模式来进行通信及数据交换的。
对于消息推送,广播通知类业务我还是建议用ActiveMQ(后续有时间再讲)
MQTT篇
1、引入依赖包
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2、修改yml配置
#MQTT客户端
publish:
mqtt:
host: tcp://127.0.0.1:1883
clientId: mqtt_publish
options:
# Q:465442749
userName: admin
password:public
# 这里表示会话不过期
cleanSession:false
# 配置一个默认的主题,加载时不会用到,只能在需要时手动提取
defaultTopic: dev
timeout:1000KeepAliveInterval:10
#断线重连方式,自动重新连接与会话不过期配合使用会导致
#断线重新连接后会接收到断线期间的消息。需要更改设置请看password联系我
automaticReconnect:true
connectionTimeout:3000
# 最大链接数
maxInflight:100
3、创建加载yml的config
/**
* @author kt
* @version 1.0.0
* @description mqtt配置类
* @date 2024/09/18 10:10
*/@Configuration@ConfigurationProperties(MQTTConfigBuilder.PREFIX)@DatapublicclassMQTTConfigBuilder{//配置的名称publicstaticfinalStringPREFIX="publish.mqtt";/**
* 服务端地址
*/privateString host;/**
* 客户端id
*/privateString clientId;/**
* 配置链接项
*/privateMqttConnectOptions options;}
4、创建一个MQTTClientUtils (退订是通过客户端id进行匹配的,如有需求可自行修改)
packagecom.ruoyi.web.core.config;importlombok.extern.slf4j.Slf4j;importorg.eclipse.paho.client.mqttv3.*;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Configuration;importjavax.annotation.PostConstruct;importjava.nio.charset.StandardCharsets;@Slf4j@ConfigurationpublicclassMQTTClientUtils{@AutowiredprivateMQTTConfigBuilder mqttConfig;privateMqttClient mqttClient;//private Map<String, Map<String, Boolean>> userTopicSubscriptions = new HashMap<>();publicMQTTClientUtilscreateDevOpsMQTTClient(){this.createMQTTClient();returnthis;}privateMQTTClientUtilsconnect(){try{// cleanSession 标志设置为 false 时,表示当前客户端需要建立一个持久会话,此时即使客户端断开连接,其所有信息也依然会被保留;//mqttConfig.getOptions();MqttConnectOptions mqttConnectOptions =newMqttConnectOptions();
mqttConnectOptions.setCleanSession(false);this.mqttClient.connect(mqttConnectOptions);// this.mqttClient.connect(mqttConfig.getOptions());
log.info("MQTTClient连接成功!");}catch(MqttException mqttException){
mqttException.printStackTrace();
log.error("MQTTClient连接失败!");}returnthis;}privateMqttClientcreateMQTTClient(){try{this.mqttClient =newMqttClient( mqttConfig.getHost(), mqttConfig.getClientId());
log.info("MQTTClient创建成功!");returnthis.mqttClient;}catch(MqttException exception){
exception.printStackTrace();
log.error("MQTTClient创建失败!");returnnull;}}/**
* 消息发送
* @param topicName
* @param message
* @return
*/publicbooleanpublish(String topicName,String message){
log.info("订阅主题名:{}, message:{}", topicName, message);MqttMessage mqttMessage =newMqttMessage(message.getBytes(StandardCharsets.UTF_8));try{this.mqttClient.publish(topicName, mqttMessage);returntrue;}catch(MqttException exception){
exception.printStackTrace();returnfalse;}}/**
* 消息发送 : retained 默认为 false
* "retained message" 指的是 Broker 会保留的最后一条发布到某个主题的消息。
* 当新的订阅者连接到该主题时,Broker 会将这条保留消息立即发送给订阅者,即使在订阅者订阅时该消息并未被重新发布。
* 这对于一些需要初始状态或者最后一次已知状态的应用场景非常有用。
* @param topicName
* @param message
* @param qos
* @return
*/publicbooleanpublish(String topicName,int qos,String message){
log.info("主题名:{}, qos:{}, message:{}", topicName, qos, message);MqttMessage mqttMessage =newMqttMessage(message.getBytes(StandardCharsets.UTF_8));try{this.mqttClient.publish(topicName, mqttMessage.getPayload(), qos,false);returntrue;}catch(MqttException exception){
exception.printStackTrace();returnfalse;}}/**
* 订阅某个主题
*
* @param topicName
* @param qos
*/publicvoidsubscribe(String topicName,int qos){
log.info("订阅主题名:{}, qos:{}", topicName, qos);try{this.mqttClient.subscribe(topicName, qos);}catch(MqttException e){
e.printStackTrace();}}/**
* 订阅某个主题
*
* @param topicName
* @param qos
*/publicvoidsubscribe(String topicName,int qos,IMqttMessageListener messageListener){
log.info("订阅主题名:{}, qos:{}, Listener类:{}", topicName, qos, messageListener.getClass());try{this.mqttClient.subscribe(topicName, qos, messageListener);}catch(MqttException e){
e.printStackTrace();}}/**
* 取消订阅主题
* @param topicName 主题名称
*/publicvoidcleanTopic(String topicName){
log.info("取消订阅主题名:{}", topicName);try{this.mqttClient.unsubscribe(topicName);}catch(MqttException e){
e.printStackTrace();}}//这里是初始化方法@PostConstructpublicvoidinitMqttClient(){//创建连接MQTTClientUtils mqttClientUtils =this.createDevOpsMQTTClient().connect();
mqttClientUtils.subscribe("message/call/back",2,newMessageCallbackListener());}}
5、监听器实现(方式一)
/**
* @author kt
* @version 1.0.0
* @description 消息回调返回
* @date 2024/09/20 17:27
*/@ComponentpublicclassMessageCallbackListenerimplementsIMqttMessageListener{@OverridepublicvoidmessageArrived(String topic,MqttMessage message)throwsException{String messageBody =newString(message.getPayload(),StandardCharsets.UTF_8);System.out.println("收到消息:"+topic+", 消息内容是:"+ messageBody);}}
6、监听器实现(方式二)
6.1创建注解
@Target(ElementType.METHOD)@Retention(value =RetentionPolicy.RUNTIME)public@interfaceMqttTopicListener{Stringvalue();// 主题名称intqos()default0;// QoS级别,默认为0}
6.2、从注解中提取主题并创建一个监听器监听主题消息
/**
* method.invoke(objectWithAnnotations, mqttTopic, message);
* 上面这种方式绕过了 Spring 的 AOP 代理,也就是说,这个调用并不会触发 Spring AOP 的切面逻辑。
* 也就是说直接使用 objectWithAnnotations 不会经过 Spring 容器,导致 AOP 切面无法拦截和处理这个调用。
* 改用手动代理方式
* Object targetObject = applicationContext.getBean(objectWithAnnotations.getClass());
* method.invoke(targetObject, mqttTopic, message);
* 或者采用消息适配器
*/@ComponentpublicclassMqttTopicAnnotationProcessor{privatefinalMQTTClientUtils mqttClientUtils;privatefinalApplicationContext applicationContext;publicMqttTopicAnnotationProcessor(MQTTClientUtils mqttClientUtils,ApplicationContext applicationContext){this.mqttClientUtils = mqttClientUtils;this.applicationContext = applicationContext;}publicvoidprocessAnnotations(Object objectWithAnnotations){Class<?> clazz = objectWithAnnotations.getClass();for(Method method : clazz.getDeclaredMethods()){if(method.isAnnotationPresent(MqttTopicListener.class)){MqttTopicListener annotation = method.getAnnotation(MqttTopicListener.class);String topic = annotation.value();int qos = annotation.qos();IMqttMessageListener listener =newIMqttMessageListener(){@OverridepublicvoidmessageArrived(String mqttTopic,MqttMessage message)throwsException{Object targetObject = applicationContext.getBean(objectWithAnnotations.getClass());
method.invoke(targetObject, mqttTopic, message);// method.invoke(objectWithAnnotations, mqttTopic, message);如果采用这种方式targetObject 没有被代理}};
mqttClientUtils.subscribe(topic, qos, listener);}}}}
6.3、初始化
mport javax.annotation.PostConstruct;publicabstractclassAbstractMqttMessageListenerService{protectedMqttTopicAnnotationProcessor mqttTopicAnnotationProcessor;publicAbstractMqttMessageListenerService(MqttTopicAnnotationProcessor mqttTopicAnnotationProcessor){this.mqttTopicAnnotationProcessor = mqttTopicAnnotationProcessor;}@PostConstructpublicvoidinitialize(){
mqttTopicAnnotationProcessor.processAnnotations(this);}}
7.1测试(方式二)
@Slf4j@ComponentpublicclassMqttMessageListenerServiceextendsAbstractMqttMessageListenerService{publicMqttMessageListenerService(MqttTopicAnnotationProcessor mqttTopicAnnotationProcessor){super(mqttTopicAnnotationProcessor);}/**
* 测试
*
* @param topic
* @param message
*/@MqttTopicListener(value ="test", qos =2)publicvoidtestMessage(String topic,MqttMessage message){String messageBody=newString(message.getPayload(),StandardCharsets.UTF_8);
log.info("收到{}主题消息:messageBody:{}", topic, messageBody);}}
7.2、测试(方式一)
@ApiOperation("mqtt发布消息测试")@GetMapping("/test")publicStringtest(String mgs ){//发布消息到主题主题Boolean a = mqttService.publish("test1234",1, mgs);System.out.println(a);return"Server is running!";}@ApiOperation("mqtt订阅测试")@GetMapping("/test1")publicStringtest1(){
mqttService.subscribe("test123",0,newMessageCallbackListener());return"Server is running!";
感谢技术大佬分享:
Websockte篇
1、导入依赖包
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><!-- 请使用最新的稳定版本 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency>
2、开启WebSocket支持
importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.web.socket.server.standard.ServerEndpointExporter;/**
* 开启WebSocket支持
* @author zhengkai.blog.csdn.net
*/@ConfigurationpublicclassWebSocketConfig{@BeanpublicServerEndpointExporterserverEndpointExporter(){returnnewServerEndpointExporter();}}
4、核心方法(可扩展通过id或Session 实现发送)
packagecom.ruoyi.web.core.config;importcn.hutool.log.Log;importcn.hutool.log.LogFactory;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importio.swagger.v3.oas.annotations.servers.Server;importorg.apache.commons.lang.StringUtils;importorg.springframework.stereotype.Component;importjavax.websocket.*;importjavax.websocket.server.PathParam;importjavax.websocket.server.ServerEndpoint;importjava.io.IOException;importjava.util.HashMap;importjava.util.Set;importjava.util.concurrent.ConcurrentHashMap;/**
* @author kt
*/@ServerEndpoint("/imserver/{userId}")@ComponentpublicclassWebSocketServer{staticLog log=LogFactory.get(WebSocketServer.class);/**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/privatestaticint onlineCount =0;/**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/privatestaticConcurrentHashMap<String,WebSocketServer> webSocketMap =newConcurrentHashMap<>();/**与某个客户端的连接会话,需要通过它来给客户端发送数据*/privateSession session;/**接收userId*/privateString userId="";/**
* 连接建立成功调用的方法*/@OnOpenpublicvoidonOpen(Session session,@PathParam("userId")String userId){this.session = session;
webSocketMap.put(session.toString(),this);this.userId=userId;if(webSocketMap.containsKey(userId)){
webSocketMap.remove(userId);
webSocketMap.put(userId,this);//加入set中}else{
webSocketMap.put(userId,this);//加入set中addOnlineCount();//在线数加1}
log.info("用户连接:"+userId+",当前在线人数为:"+getOnlineCount());try{sendMessage("连接成功");}catch(IOException e){
log.error("用户:"+userId+",网络异常!!!!!!");}}/**
* 连接关闭调用的方法
*/@OnClosepublicvoidonClose(){if(webSocketMap.containsKey(userId)){
webSocketMap.remove(userId);//从set中删除subOnlineCount();}
log.info("用户退出:"+userId+",当前在线人数为:"+getOnlineCount());}/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息*/@OnMessagepublicvoidonMessage(String message,Session session){
log.info("用户消息:"+userId+",报文:"+message);//可以群发消息//消息保存到数据库、redisif(StringUtils.isNotBlank(message)){try{//解析发送的报文JSONObject jsonObject =JSON.parseObject(message);//追加发送人(防止串改)
jsonObject.put("fromUserId",this.userId);String toUserId=jsonObject.getString("toUserId");//传送给对应toUserId用户的websocketif(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());}else{
log.error("请求的userId:"+toUserId+"不在该服务器上");//否则不在这个服务器上,发送到mysql或者redis}}catch(Exception e){
e.printStackTrace();}}}/**
*
* @param session
* @param error
*/@OnErrorpublicvoidonError(Session session,Throwable error){
log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
error.printStackTrace();}/**
* 实现服务器主动推送
*/@ServerpublicvoidsendMessage(String message)throwsIOException{this.session.getBasicRemote().sendText(message);}// @Server// public void sendMessages(String message,String toUserId) throws IOException {//// webSocketMap.get(toUserId).sendMessage(message);//// }/**
* 发送自定义消息
* */publicstaticvoidsendInfo(String message,@PathParam("userId")String userId)throwsIOException{
log.info("发送消息到:"+userId+",报文:"+message);if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){HashMap map=newHashMap();
map.put(1,"AAAA");
map.put(2,"BBBB");
map.put(3,"CCCC");
map.put(4,"DDDD");
webSocketMap.get(userId).sendMessage(map.toString());}else{HashMap map=newHashMap();
map.put(5,"DDDD");System.out.println(map);// log.error("用户"+userId+",不在线!");}}publicvoidsendInfos(String message)throwsIOException{Set<String> keys = webSocketMap.keySet();for(String key : keys){WebSocketServer value = webSocketMap.get(key);
value.sendMessage(message);}
log.info("发送消息到:"+",报文:"+message);}publicstaticsynchronizedintgetOnlineCount(){return onlineCount;}publicstaticsynchronizedvoidaddOnlineCount(){WebSocketServer.onlineCount++;}publicstaticsynchronizedvoidsubOnlineCount(){WebSocketServer.onlineCount--;}}
5、测试(如果使用postman测试需要同时开俩个连接方可)
@ApiOperation("websortek")@RequestMapping("/push")publicStringpushToWeb(String message,@PathVariableString toUserId)throwsIOException{WebSocketServer.sendInfo(message, toUserId);return"MSG SEND SUCCESS";}@ApiOperation("websorteks")@GetMapping("/pushs")publicStringpushToWebs()throwsIOException{String message ="asc";
webSocketServer.sendMessage(message);return"MSG SEND SUCCESS";}
有不懂的可通过yml的方式联系我 (后续会推出ActiveMQ+Websockte)
版权归原作者 好怪~ 所有, 如有侵权,请联系我们删除。