0


springboot+Netty搭建MQTT协议的服务端

本文基于基础版的netty实现mqtt
在此功能基础上,进行了功能强化,新增了用户鉴权、多用户订阅推送,qos2级别消息处理,后续新增topic filter功能,本人会持续更新
Netty是业界最流行的nio框架之一,结合springboot可以满足快速开发

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上的。MQTT协议的可以用在物联网、小型设备、还有移动应用上。

Netty也可以实现MQTT协议,他的内部封装了MQTT协议的相关对象。
MQTT消息包含固定头、可变头、载体
固定头
在这里插入图片描述
可变头
在这里插入图片描述

配置文件(appliction.yml)

配置文件配置了mqtt服务的端口、账号、密码,

server:port:8880netty:mqtt:port:1883#    mqtt账号username: admin
#mqtt密码password:123456# 日记配置logging:level:# 开启debug日记打印com.hyx: debug

jar包依赖

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.10</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.hyx</groupId><artifactId>superNetty</artifactId><version>0.0.1-SNAPSHOT</version><name>superNetty</name><description>superNetty</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--      netty包  --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.75.Final</version></dependency><!--   常用JSON工具包 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.80</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--        mqtt服务端--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies><build><plugins><!-- 打包成一个可执行jar --><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>

代码结构

在这里插入图片描述
消息发送类

packagecom.hyx.supernetty.MQTTServer.callBack;importjava.util.ArrayList;importjava.util.List;importjava.util.Set;importjava.util.stream.Collectors;importio.netty.channel.ChannelId;importio.netty.handler.codec.mqtt.*;importlombok.RequiredArgsConstructor;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importio.netty.channel.Channel;importorg.springframework.stereotype.Component;importcom.hyx.supernetty.MQTTServer.config.MQTTServerProperties;importstaticcom.hyx.supernetty.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceAdd;importstaticcom.hyx.supernetty.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;importstaticcom.hyx.supernetty.MQTTServer.server.MQTTServer.*;/**
 * 大黄
 */@Component@RequiredArgsConstructorpublicclassBootNettyMqttMsgBack{privatestaticfinalLogger log =LoggerFactory.getLogger(BootNettyMqttMsgBack.class);privatefinalMQTTServerPropertiesMQTTserverProperties;/**
     *     确认连接请求
     * @param channel
     * @param mqttMessage
     */publicvoid connack (Channel channel,MqttMessage mqttMessage){MqttConnectMessage mqttConnectMessage =(MqttConnectMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();//    构建返回报文, 可变报头MqttConnAckVariableHeader mqttConnAckVariableHeaderBack =newMqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());//    构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack =newMqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(),MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(),0x02);//    构建CONNACK消息体MqttConnAckMessage connAck =newMqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);//log.info("back--"+connAck.toString());
        log.debug("设备上线,channelId:{}", channel.id());MQTTdeviceAdd(channel);
        channel.writeAndFlush(connAck);}publicvoid disconnack (Channel channel,MqttMessage mqttMessage){MqttConnectMessage mqttConnectMessage =(MqttConnectMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();//    构建返回报文, 可变报头MqttConnAckVariableHeader mqttConnAckVariableHeaderBack =newMqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BANNED, mqttConnectVariableHeaderInfo.isCleanSession());//    构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack =newMqttFixedHeader(MqttMessageType.DISCONNECT,mqttFixedHeaderInfo.isDup(),MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(),0x02);//    构建CONNACK消息体MqttConnAckMessage connAck =newMqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);//log.info("back--"+connAck.toString());
        channel.writeAndFlush(connAck);
        log.debug("设备下线,channelId:{}", channel.id());MQTTdeviceRemove(channel);}/**
     *     根据qos发布确认
     * @param channel
     * @param mqttMessage
     */publicvoid puback (Channel channel,MqttMessage mqttMessage){MqttPublishMessage mqttPublishMessage =(MqttPublishMessage) mqttMessage;MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();MqttQoS qos =  mqttFixedHeaderInfo.qosLevel();//注意:    readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置    读指针byte[] headBytes =newbyte[mqttPublishMessage.payload().readableBytes()];
        mqttPublishMessage.payload().readBytes(headBytes);String data =newString(headBytes);System.out.println("publish data-->"+data);//重置读取的指针
        mqttPublishMessage.payload().resetReaderIndex();switch(qos){case AT_MOST_ONCE://    至多一次//推送到订阅的客户端subscribSend(mqttMessage);break;case AT_LEAST_ONCE://    至少一次//    构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack =MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());//    构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack =newMqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(),MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(),0x02);//    构建PUBACK消息体MqttPubAckMessage pubAck =newMqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
                log.info("back--"+pubAck.toString());
                channel.writeAndFlush(pubAck);//推送到订阅的客户端subscribSend(mqttMessage);break;case EXACTLY_ONCE://    刚好一次//    构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack2 =newMqttFixedHeader(MqttMessageType.PUBREC,false,MqttQoS.AT_LEAST_ONCE,false,0x02);//    构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 =MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());MqttMessage mqttMessageBack =newMqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);//服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态,// 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。//接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。int mqttMessageId=mqttPublishMessage.variableHeader().packetId();if(!mqttMessageIdMap.containsKey(mqttMessageId)){//不存在此消息,将此消息暂存
                    mqttMessageIdMap.put(mqttMessageId, mqttMessage);
                    log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存");}else{//重复发送消息,直接返回
                    log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup());return;}
                channel.writeAndFlush(mqttMessageBack);break;default:break;}}/**
     *     发布完成 qos2
     * @param channel
     * @param mqttMessage
     */publicvoid pubcomp (Channel channel,MqttMessage mqttMessage){MqttMessageIdVariableHeader messageIdVariableHeader =(MqttMessageIdVariableHeader) mqttMessage.variableHeader();//    构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack =newMqttFixedHeader(MqttMessageType.PUBCOMP,false,MqttQoS.AT_MOST_ONCE,false,0x02);//    构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack =MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());MqttMessage mqttMessageBack =newMqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);//log.info("back--"+mqttMessageBack.toString());
        channel.writeAndFlush(mqttMessageBack);}/**
     *     订阅确认
     * @param channel
     * @param mqttMessage
     */publicvoidsuback(Channel channel,MqttMessage mqttMessage){MqttSubscribeMessage mqttSubscribeMessage =(MqttSubscribeMessage) mqttMessage;MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();//    构建返回报文, 可变报头MqttMessageIdVariableHeader variableHeaderBack =MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());//log.info(topics.toString());List<Integer> grantedQoSLevels =newArrayList<>(topics.size());for(int i =0; i < topics.size(); i++){
            grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());}//    构建返回报文    有效负载MqttSubAckPayload payloadBack =newMqttSubAckPayload(grantedQoSLevels);//    构建返回报文    固定报头MqttFixedHeader mqttFixedHeaderBack =newMqttFixedHeader(MqttMessageType.SUBACK,false,MqttQoS.AT_MOST_ONCE,false,2+topics.size());//    构建返回报文    订阅确认MqttSubAckMessage subAck =newMqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);
        channel.writeAndFlush(subAck);}/**
     *     取消订阅确认
     * @param channel
     * @param mqttMessage
     */publicvoidunsuback(Channel channel,MqttMessage mqttMessage){MqttMessageIdVariableHeader messageIdVariableHeader =(MqttMessageIdVariableHeader) mqttMessage.variableHeader();//    构建返回报文    可变报头MqttMessageIdVariableHeader variableHeaderBack =MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());//    构建返回报文    固定报头MqttFixedHeader mqttFixedHeaderBack =newMqttFixedHeader(MqttMessageType.UNSUBACK,false,MqttQoS.AT_MOST_ONCE,false,2);//    构建返回报文    取消订阅确认MqttUnsubAckMessage unSubAck =newMqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);
        channel.writeAndFlush(unSubAck);}/**
     *     心跳响应
     * @param channel
     * @param mqttMessage
     */publicvoid pingresp (Channel channel,MqttMessage mqttMessage){//    心跳响应报文    11010000 00000000  固定报文MqttFixedHeader fixedHeader =newMqttFixedHeader(MqttMessageType.PINGRESP,false,MqttQoS.AT_MOST_ONCE,false,0);MqttMessage mqttMessageBack =newMqttMessage(fixedHeader);
        channel.writeAndFlush(mqttMessageBack);}/**
     * 订阅推送
     */publicvoidsubscribSend(MqttMessage mqttMessage){MqttPublishMessage mqttPublishMessage =(MqttPublishMessage) mqttMessage;Object obj=mqttMessage.variableHeader();MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;String topicName=variableHeader.topicName();int packetId=variableHeader.packetId();//固定消息头 注意此处的消息类型PUBLISH mqtt协议MqttFixedHeaderFixedHeader=newMqttFixedHeader(MqttMessageType.PUBLISH,false,MqttQoS.AT_LEAST_ONCE,false,0);//可变消息头MqttPublishVariableHeader mqttPublishVariableHeader=newMqttPublishVariableHeader(topicName,packetId);//推送消息体MqttPublishMessage mqttPublishMessageResult=newMqttPublishMessage(FixedHeader,mqttPublishVariableHeader, mqttPublishMessage.content());
        log.info("推送地址————》"+topicName);if(subscribeMap.containsKey(topicName)){List<ChannelId> channelList=subscribeMap.get(topicName);for(int i =0; i < channelList.size(); i++){//订阅次此topic的Mqtt客户端搜到此消息,Channel channelSub=MQTTdeviceChannelGroup.find(channelList.get(i));//writeAndFlush会将ByteBuf的引用释放refCnt会减去1,使用retain加1
                mqttPublishMessageResult.retain();
                channelSub.writeAndFlush(mqttPublishMessageResult);}
            mqttPublishMessageResult.release();}}/**
     * 用户鉴权
     */publicbooleanauthentication(MqttConnectPayload payload){String username=MQTTserverProperties.getUsername();String password=MQTTserverProperties.getPassword();//无账号或者无密码通过if(stringEmptyCheck(password)||stringEmptyCheck(username)){returntrue;}else{//消息中账号密码为空if(payload.passwordInBytes()==null||payload.userName()==null){returnfalse;}String passwordAuthen=newString(payload.passwordInBytes());String usernameAuthen=payload.userName();if(password.equals(passwordAuthen)&&username.equals(usernameAuthen)){returntrue;}else{returnfalse;}}}//判断字符字符为空privatebooleanstringEmptyCheck(String str){if(str==null||"".equals(str)){returntrue;}else{returnfalse;}}}

初始化消息通道的handler

packagecom.hyx.supernetty.MQTTServer.handler;/**
 * @author HuangYaoXuan
 * @date 2023/4/21 15:46
 */importcom.hyx.supernetty.MQTTServer.callBack.BootNettyMqttMsgBack;importio.netty.channel.*;importio.netty.handler.codec.mqtt.*;importlombok.RequiredArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.io.IOException;importjava.util.ArrayList;importjava.util.List;importstaticcom.hyx.supernetty.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;importstaticcom.hyx.supernetty.MQTTServer.server.MQTTServer.*;/**
 * 消息处理,单例启动
 *
 * @author qiding
 */@Slf4j@[email protected]@RequiredArgsConstructorpublicclassMQTTMessageHandlerextendsChannelInboundHandlerAdapter{@AutowiredprivateBootNettyMqttMsgBackBootNettyMqttMsgBack;@OverridepublicvoidchannelRead(ChannelHandlerContext ctx,Object msg)throwsException{if(null!= msg){MqttMessage mqttMessage =(MqttMessage) msg;
            log.info("info--"+mqttMessage.toString());MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();Channel channel = ctx.channel();if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){//用户鉴权(配置文件中配置账号和密码,如果没有默认)boolean authentag=BootNettyMqttMsgBack.authentication((MqttConnectPayload)mqttMessage.payload());if(!authentag){return;}//    在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接if(MQTTdeviceChannelGroup.contains(channel)){//移除次设备channel和topicBootNettyMqttMsgBack.disconnack(channel,mqttMessage);}//    to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息BootNettyMqttMsgBack.connack(channel, mqttMessage);}//对于没有鉴权的设备,请求不处理if(!MQTTdeviceChannelGroup.contains(channel)){
                log.warn(channel.id()+"无鉴权操作");return;}switch(mqttFixedHeader.messageType()){case PUBLISH://    客户端发布消息//    PUBACK报文是对QoS 1等级的PUBLISH报文的响应BootNettyMqttMsgBack.puback(channel, mqttMessage);break;// PUBREL    Qos2级别消息,客户端返回case PUBREL://    PUBREL(客户端发给服务端)报文是对PUBREC(服务端发给客户端)报文的响应//服务端收到pubrel之后,正式将消息投递给上层应用层。MqttMessageIdVariableHeaderVariableHeader=(MqttMessageIdVariableHeader)mqttMessage.variableHeader();if(mqttMessageIdMap.containsKey(VariableHeader.messageId())){
                        log.warn("移除消息缓存-->消息id"+VariableHeader.messageId());BootNettyMqttMsgBack.subscribSend(mqttMessageIdMap.get(VariableHeader.messageId()));BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
                        mqttMessageIdMap.remove(VariableHeader.messageId());}else{//后续多次收到REL消息,制作comp响应BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);}break;case SUBSCRIBE://    客户端订阅主题//    客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。//    为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。//    SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端//     to doBootNettyMqttMsgBack.suback(channel, mqttMessage);MqttSubscribePayloadSubscribePayload=(MqttSubscribePayload) mqttMessage.payload();;for(int i =0; i <SubscribePayload.topicSubscriptions().size(); i++){String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();boolean tag=subscribeMap.containsKey(topicname);if(tag){List<ChannelId> channelIds=subscribeMap.get(topicname);if(!channelIds.contains(channel.id())){
                                channelIds.add(channel.id());}else{
                                log.warn(channel.id()+"重复订阅");}
                            subscribeMap.put(topicname, channelIds);}else{List<ChannelId> channelIds=newArrayList<>();
                            channelIds.add(channel.id());
                            subscribeMap.put(topicname,channelIds);}
                        log.info(channel.id()+"订阅地址————》"+topicname);}break;case UNSUBSCRIBE://    客户端取消订阅//    客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题//    to doBootNettyMqttMsgBack.unsuback(channel, mqttMessage);ObjectUnsubscribe=mqttMessage.payload();MqttUnsubscribePayload unsubscribePayload=(MqttUnsubscribePayload)Unsubscribe;int len=unsubscribePayload.topics().size();for(int i =0; i < len; i++){String topicname=unsubscribePayload.topics().get(i);boolean tag=subscribeMap.containsKey(topicname);if(tag){List<ChannelId> channelIds=subscribeMap.get(topicname);
                            channelIds.remove(channel.id());
                            subscribeMap.put(topicname,channelIds);}else{
                           log.error("不存在订阅地址——>"+topicname);}
                        log.info(channel.id()+"取消订阅地址————》"+topicname);}break;case PINGREQ://    客户端发起心跳//    客户端发送PINGREQ报文给服务端的//    在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着//    请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开BootNettyMqttMsgBack.pingresp(channel, mqttMessage);break;case DISCONNECT://    客户端主动断开连接
                    log.debug("设备下线,channelId:{}", channel.id());MQTTdeviceRemove(channel);//    DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0//    to dobreak;default:break;}}else{return;}}/**
     *     从客户端收到新的数据、读取完成时调用
     */@OverridepublicvoidchannelReadComplete(ChannelHandlerContext ctx)throwsIOException{}/**
     *     客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
     */@OverridepublicvoidchannelRegistered(ChannelHandlerContext ctx)throwsException{super.channelRegistered(ctx);}/**
     *     客户端与服务端 断连时执行 channelInactive方法之后执行
     */@OverridepublicvoidchannelUnregistered(ChannelHandlerContext ctx)throwsException{super.channelUnregistered(ctx);}/**
     *     当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     */@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx,Throwable cause)throwsException{super.exceptionCaught(ctx, cause);
        ctx.close();}@OverridepublicvoidchannelInactive(ChannelHandlerContext ctx){
        log.debug("\n");}@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{super.channelActive(ctx);}/**
     *     服务端 当读超时时 会调用这个方法
     */@OverridepublicvoiduserEventTriggered(ChannelHandlerContext ctx,Object evt)throwsException{super.userEventTriggered(ctx, evt);
        ctx.close();}@OverridepublicvoidchannelWritabilityChanged(ChannelHandlerContext ctx)throwsException{super.channelWritabilityChanged(ctx);}}

配置类

importlombok.Data;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Configuration;/**
 * 读取YML中的服务配置
 *
 * /**
 *  * @author HuangYaoXuan
 *  * @date 2023/4/21 15:44
 *
 */@Configuration@ConfigurationProperties(prefix =MQTTServerProperties.MQTTPREFIX)@DatapublicclassMQTTServerProperties{publicstaticfinalString MQTTPREFIX ="netty.mqtt";/**
     * 服务器端口
     */privateInteger port;/**
     * mqtt服务器用户名
     */privateString username;/**
     * mqtt服务器密码
     */privateString password;}

消息处理类

/**
 * @author HuangYaoXuan
 * @date 2023/4/21 15:46
 */importcom.hyx.supernetty.MQTTServer.callBack.BootNettyMqttMsgBack;importio.netty.channel.*;importio.netty.handler.codec.mqtt.*;importlombok.RequiredArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.io.IOException;importjava.util.ArrayList;importjava.util.List;importstaticcom.hyx.supernetty.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;importstaticcom.hyx.supernetty.MQTTServer.server.MQTTServer.MQTTdeviceChannelGroup;importstaticcom.hyx.supernetty.MQTTServer.server.MQTTServer.subscribeMap;/**
 * 消息处理,单例启动
 *
 * @author qiding
 */@Slf4j@[email protected]@RequiredArgsConstructorpublicclassMQTTMessageHandlerextendsChannelInboundHandlerAdapter{@AutowiredprivateBootNettyMqttMsgBackBootNettyMqttMsgBack;@OverridepublicvoidchannelRead(ChannelHandlerContext ctx,Object msg)throwsException{if(null!= msg){MqttMessage mqttMessage =(MqttMessage) msg;
            log.info("info--"+mqttMessage.toString());MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();Channel channel = ctx.channel();if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){//用户鉴权(配置文件中配置账号和密码,如果没有默认通过)boolean authentag=BootNettyMqttMsgBack.authentication((MqttConnectPayload)mqttMessage.payload());if(!authentag){return;}//    在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接if(MQTTdeviceChannelGroup.contains(channel)){//移除次设备channel和topicBootNettyMqttMsgBack.disconnack(channel,mqttMessage);}//    to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息BootNettyMqttMsgBack.connack(channel, mqttMessage);}//对于没有鉴权的设备,拒绝发表和订阅等一系列操作if(!MQTTdeviceChannelGroup.contains(channel)){
                log.warn(channel.id()+"无鉴权操作");return;}switch(mqttFixedHeader.messageType()){case PUBLISH://    客户端发布消息//    PUBACK报文是对QoS 1等级的PUBLISH报文的响应BootNettyMqttMsgBack.puback(channel, mqttMessage);break;case PUBREL://    发布释放//    PUBREL报文是对PUBREC报文的响应//    to do 回应报文BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);break;case SUBSCRIBE://    客户端订阅主题//    客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。//    为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。//    SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端//     to doBootNettyMqttMsgBack.suback(channel, mqttMessage);MqttSubscribePayloadSubscribePayload=(MqttSubscribePayload) mqttMessage.payload();;for(int i =0; i <SubscribePayload.topicSubscriptions().size(); i++){String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();boolean tag=subscribeMap.containsKey(topicname);if(tag){List<ChannelId> channelIds=subscribeMap.get(topicname);if(!channelIds.contains(channel.id())){
                                channelIds.add(channel.id());}else{
                                log.warn(channel.id()+"重复订阅");}
                            subscribeMap.put(topicname, channelIds);}else{List<ChannelId> channelIds=newArrayList<>();
                            channelIds.add(channel.id());
                            subscribeMap.put(topicname,channelIds);}
                        log.info(channel.id()+"订阅地址————》"+topicname);}break;case UNSUBSCRIBE://    客户端取消订阅//    客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题//    to doBootNettyMqttMsgBack.unsuback(channel, mqttMessage);ObjectUnsubscribe=mqttMessage.payload();MqttUnsubscribePayload unsubscribePayload=(MqttUnsubscribePayload)Unsubscribe;int len=unsubscribePayload.topics().size();for(int i =0; i < len; i++){String topicname=unsubscribePayload.topics().get(i);boolean tag=subscribeMap.containsKey(topicname);if(tag){List<ChannelId> channelIds=subscribeMap.get(topicname);
                            channelIds.remove(channel.id());
                            subscribeMap.put(topicname,channelIds);}else{
                           log.error("不存在订阅地址——>"+topicname);}
                        log.info(channel.id()+"取消订阅地址————》"+topicname);}break;case PINGREQ://    客户端发起心跳//    客户端发送PINGREQ报文给服务端的//    在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着//    请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开BootNettyMqttMsgBack.pingresp(channel, mqttMessage);break;case DISCONNECT://    客户端主动断开连接
                    log.debug("设备下线,channelId:{}", channel.id());MQTTdeviceRemove(channel);//    DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0//    to dobreak;default:break;}}}/**
     *     从客户端收到新的数据、读取完成时调用
     */@OverridepublicvoidchannelReadComplete(ChannelHandlerContext ctx)throwsIOException{}/**
     *     客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
     */@OverridepublicvoidchannelRegistered(ChannelHandlerContext ctx)throwsException{super.channelRegistered(ctx);}/**
     *     客户端与服务端 断连时执行 channelInactive方法之后执行
     */@OverridepublicvoidchannelUnregistered(ChannelHandlerContext ctx)throwsException{super.channelUnregistered(ctx);}/**
     *     当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     */@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx,Throwable cause)throwsException{super.exceptionCaught(ctx, cause);
        ctx.close();}@OverridepublicvoidchannelInactive(ChannelHandlerContext ctx){
        log.debug("\n");}@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{super.channelActive(ctx);}/**
     *     服务端 当读超时时 会调用这个方法
     */@OverridepublicvoiduserEventTriggered(ChannelHandlerContext ctx,Object evt)throwsException{super.userEventTriggered(ctx, evt);
        ctx.close();}@OverridepublicvoidchannelWritabilityChanged(ChannelHandlerContext ctx)throwsException{super.channelWritabilityChanged(ctx);}}

netty主程序

importcom.hyx.supernetty.MQTTServer.channel.MqttChannelInit;importcom.hyx.supernetty.MQTTServer.config.MQTTServerProperties;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.ChannelId;importio.netty.channel.ChannelOption;importio.netty.channel.EventLoopGroup;importio.netty.channel.group.ChannelGroup;importio.netty.channel.group.DefaultChannelGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.nio.NioServerSocketChannel;importio.netty.util.concurrent.GlobalEventExecutor;importlombok.RequiredArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Component;importjavax.annotation.PreDestroy;importjava.net.InetSocketAddress;importjava.util.List;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;/**
 * 启动 Server
 *
 */@Component@Slf4j@RequiredArgsConstructorpublicclassMQTTServerimplementsIMQTTServer{privatefinalMqttChannelInit mqttChannelInit;privatefinalMQTTServerPropertiesMQTTserverProperties;//保存接入的MQTT设备channelpublicstaticChannelGroupMQTTdeviceChannelGroup;//保存订阅地址和chanelid,当推送数据时,会向此订阅地址的多个channel发送数据publicstaticMap<String,List<ChannelId>> subscribeMap =newConcurrentHashMap();//保存设备名称和通道编号,向设备发送消息可以通过名称找到通道publicstaticConcurrentHashMap<String,ChannelId>MQTTdeviceMap=newConcurrentHashMap<>();//存放Qos2消息等级的消息ID,这里可使用redis之类的工具做缓存,为了简化配置,使用map暂存publicstaticConcurrentHashMap<Integer,MqttMessage> mqttMessageIdMap=newConcurrentHashMap();privateEventLoopGroup bossGroup;privateEventLoopGroup workerGroup;@Overridepublicvoidstart(){
        log.info("初始化 Mqttserver ...");
        bossGroup =newNioEventLoopGroup();
        workerGroup =newNioEventLoopGroup();MQTTdeviceChannelGroup=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);this.MqttServer();}/**
     * 初始化
     */privatevoidMqttServer(){try{newServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress(newInetSocketAddress(MQTTserverProperties.getPort()))// 配置 编码器、解码器、业务处理.childHandler(mqttChannelInit)// tcp缓冲区.option(ChannelOption.SO_BACKLOG,128)// 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true.childOption(ChannelOption.TCP_NODELAY,true)// 保持长连接.childOption(ChannelOption.SO_KEEPALIVE,true)// 绑定端口,开始接收进来的连接.bind().sync();
            log.info("MQTT服务启动成功!开始监听端口:{}",MQTTserverProperties.getPort());}catch(Exception e){
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}}/**
     * 销毁
     */@PreDestroy@Overridepublicvoiddestroy(){
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();}}
importjavax.annotation.PreDestroy;/**
 * @author HuangYaoXuan
 * @date 2023/4/21 15:53
 */publicinterfaceIMQTTServer{/**
     * 主启动程序,初始化参数
     *
     * @throws Exception 初始化异常
     */voidstart()throwsException;/**
     * 优雅的结束服务器
     *
     * @throws InterruptedException 提前中断异常
     */@PreDestroyvoiddestroy()throwsInterruptedException;}

最后,启动服务

importcom.hyx.supernetty.MQTTServer.server.MQTTServer;importcom.hyx.supernetty.TcpServer.server.TcpServer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;/**
 * @author HuangYaoXuan
 * @date 2023/4/21 15:58
 */@Componentpublicclass startSrver {@AutowiredprivateTcpServer tcpServer;@AutowiredprivateMQTTServerMQTTServer;@PostConstructpublicvoidstartNetty(){newThread(()->{try{MQTTServer.start();}catch(Exception e){
                e.printStackTrace();}}).start();}}
标签: spring boot java 后端

本文转载自: https://blog.csdn.net/qq_39560017/article/details/130424780
版权归原作者 狂热的苹果汁 所有, 如有侵权,请联系我们删除。

“springboot+Netty搭建MQTT协议的服务端”的评论:

还没有评论