0


Spring Boot 整合 Netty 现实TCP通信

第一章、Spring Boot 整合 Netty 现实TCP通信(服务端)

第一章、Spring Boot 整合 Netty 现实TCP通信(客户端)

第二章、Spring Boot 整合 Netty 现实TCP通信实践

第三章、Spring Boot 整合 Netty 现实WebSocket通信

第四章、Spring Boot 整合 Netty 现实WebSocket通信实践

1、POM引入包

    <dependencies>
        <dependency>
            <groupId>com.cdkjframework</groupId>
            <artifactId>cdkj-util</artifactId>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>log4j-to-slf4j</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

版本控制及cdkj-util基础包请访问开源工具包

common: 运用cdkj Framework 框架只需要简单,速搭建 Spring Boot + Spring Cloud 微服务项目,也可以单独搭建 Spring Boot 项目。

2、资源目录下新建 META-INF\spring

新建文件 org.springframework.boot.autoconfigure.AutoConfiguration.imports 内容

com.cdkjframework.socket.config.SocketAutoConfiguration

3、启动类

EnableAutoSocket
package com.cdkjframework.socket.annotation;

import com.cdkjframework.socket.config.SocketMarkerConfiguration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.swagger.annotation
 * @ClassName: EnableAutoSocket
 * @Description: socket 启动注解
 * @Author: xiaLin
 * @Date: 2023/7/18 9:20
 * @Version: 1.0
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({SocketMarkerConfiguration.class})
public @interface EnableAutoSocket {
}

4、Socket标记配置

package com.cdkjframework.socket.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.swagger.config
 * @ClassName: SwaggerMarkerConfiguration
 * @Description: Socket标记配置
 * @Author: xiaLin
 * @Date: 2023/7/18 9:21
 * @Version: 1.0
 */
@Configuration(proxyBeanMethods = false)
public class SocketMarkerConfiguration {

  @Bean
  public Marker socketMarker() {
    return new Marker();
  }

  public static class Marker {

  }
}

5、配置读取

package com.cdkjframework.socket.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.web.socket.config
 * @ClassName: SocketConfig
 * @Description: socket配置
 * @Author: xiaLin
 * @Version: 1.0
 */
@Data
@Component
@Configuration
@ConfigurationProperties(prefix = "spring.custom.socket")
public class SocketConfig {

    /**
     * 端口号
     */
    private List<Integer> port;
}

6、Socket自动配置

package com.cdkjframework.socket.config;

import com.cdkjframework.socket.NettySocketServer;
import com.cdkjframework.socket.listener.SocketListener;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.web.reactive.function.client.WebClientAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.socket.config
 * @ClassName: SocketAutoConfiguration
 * @Description: Socket自动配置
 * @Author: xiaLin
 * @Date: 2023/7/18 9:26
 * @Version: 1.0
 */
@Lazy(false)
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(SocketConfig.class)
@AutoConfigureAfter({WebClientAutoConfiguration.class})
@ConditionalOnBean(SocketMarkerConfiguration.Marker.class)
public class SocketAutoConfiguration {

    /**
     * 读取配置
     */
    private final SocketConfig customConfig;

    /**
     * 网状通道处理器
     */
    private final SocketListener socketListener;

    /**
     * 创建 bean
     *
     * @return 返回结果
     */
    @Bean(initMethod = "start")
    @ConditionalOnMissingBean
    public NettySocketServer nettySocketServer() {
        return new NettySocketServer(customConfig, socketListener);
    }
}

7、Netty服务端初始化

主业务初始化服务端配置及监听端口等

package com.cdkjframework.socket;

import com.cdkjframework.constant.IntegerConsts;
import com.cdkjframework.socket.config.SocketConfig;
import com.cdkjframework.socket.handler.NettyChannelInitializer;
import com.cdkjframework.socket.listener.SocketListener;
import com.cdkjframework.util.log.LogUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @ProjectName: socket-algorithm
 * @Package: com.lesmarthome.socket.netty
 * @ClassName: NettySocketServer
 * @Author: frank
 * @Version: 1.0
 * @Description: Netty服务端初始化
 */
public class NettySocketServer {

    /**
     * 日志
     */
    private LogUtils logUtils = LogUtils.getLogger(NettySocketServer.class);

    /**
     * 读取配置
     */
    private final SocketConfig customConfig;

    /**
     * 网状通道处理器
     */
    private final SocketListener socketListener;

    /**
     * 构建函数
     */
    public NettySocketServer(SocketConfig customConfig, SocketListener socketListener) {
        this.customConfig = customConfig;
        this.socketListener = socketListener;
    }

    /**
     * 线程启动
     */
    public void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            // 保持长连接
            final boolean value = true;
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, value);
            serverBootstrap.option(ChannelOption.SO_BACKLOG, IntegerConsts.BYTE_LENGTH);
            serverBootstrap.childHandler(new NettyChannelInitializer(socketListener));
            // 服务器的逻辑
//            serverBootstrap.childHandler(new NettyChannelHandler(socketListener));
            // 监听多个端口
            List<Integer> portList = customConfig.getPort();
            for (Integer port :
                    portList) {
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                // 给cf 注册监听器,监控我们关心的事件
                channelFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (channelFuture.isSuccess()) {
                            logUtils.info("监听端口[" + port + "]成功!");
                        } else {
                            logUtils.error("监听端口[" + port + "]失败!");
                        }
                    }
                });
                // channelFuture.channel().closeFuture().sync(); 对关闭通道进行监听
            }
        } catch (InterruptedException e) {
            logUtils.error(e);
        } finally {
            // bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
        }
    }
}

8、监听接口

SocketListener需要业务端自行实现
package com.cdkjframework.socket.listener;

/**
 * @ProjectName: socket-algorithm
 * @Package: com.lesmarthome.socket.netty.listener
 * @ClassName: SocketListener
 * @Author: frank
 * @Version: 1.0
 * @Description: 监听接口
 */
public interface SocketListener {

    /**
     * 消息
     *
     * @param channelId 通话ID
     * @param bytes     消息内容字节数据
     */
    void onMessage(String channelId, byte[] bytes);

    /**
     * 心跳
     *
     * @param channelId 通话ID
     */
    void heartbeat(String channelId);

    /**
     * 断开连接
     *
     * @param channelId 通道ID
     */
    void onDisconnect(String channelId);
}

9、Netty通道初始化器

package com.cdkjframework.socket.handler;

import com.cdkjframework.constant.IntegerConsts;
import com.cdkjframework.socket.listener.SocketListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * @ProjectName: socket-algorithm
 * @Package: com.lesmarthome.socket.netty.handler
 * @ClassName: NettyChannelInitializer
 * @Author: frank
 * @Version: 1.0
 * @Description: Netty通道初始化器
 */
@Component
@ChannelHandler.Sharable
public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {

    /**
     * 网状通道处理器
     */
    private final SocketListener socketListener;

    /**
     * 构造函数
     */
    public NettyChannelInitializer(SocketListener socketListener) {
        this.socketListener = socketListener;
    }

    /**
     * 初始化通道
     *
     * @param socketChannel 插座通道
     * @throws Exception 异常信息
     */
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        // 服务器的逻辑
        pipeline.addLast(new NettyChannelHandler(socketListener));

        // 处理心跳
        pipeline.addLast(new IdleStateHandler(IntegerConsts.FIVE, IntegerConsts.ZERO, IntegerConsts.ZERO, TimeUnit.MINUTES));
        //处理日志
        pipeline.addLast(new LoggingHandler(LogLevel.INFO));

        // 自定义 handler
        pipeline.addLast(new NettyHeartbeatHandler(socketListener));
    }
}

10、Netty通道处理器

package com.cdkjframework.socket.handler;

import com.cdkjframework.constant.IntegerConsts;
import com.cdkjframework.socket.NettySocketUtils;
import com.cdkjframework.socket.listener.SocketListener;
import com.cdkjframework.util.log.LogUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.stereotype.Component;

/**
 * @ProjectName: socket-algorithm
 * @Package: com.lesmarthome.socket.netty.handler
 * @ClassName: NettyChannelHandler
 * @Author: frank
 * @Version: 1.0
 * @Description: Netty通道处理器
 */
@Component
@ChannelHandler.Sharable
public class NettyChannelHandler extends SimpleChannelInboundHandler<ByteBuf> {

    /**
     * 日志
     */
    private static final LogUtils LOG_UTILS = LogUtils.getLogger(NettyChannelHandler.class);

    /**
         * socket 监听
         */
        private final SocketListener socketListener;

    /**
         * 构造函数
         *
         * @param socketListener 监听接口
         */
        public NettyChannelHandler(SocketListener socketListener) {
            this.socketListener = socketListener;
        }

    /**
     * 连接成功
     *
     * @param ctx 通道处理程序上下文
     * @throws Exception 异常信息
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        LOG_UTILS.info("RemoteAddress : " + channel.remoteAddress().toString() + " add !");
        NettySocketUtils.getClients().add(channel);
        NettySocketUtils.onlineChannelsHeart.put(ctx.channel().id().asLongText(), IntegerConsts.ONE);
    }

    /**
     * 断开成功
     *
     * @param ctx 通道处理程序上下文
     * @throws Exception 异常信息
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            NettySocketUtils.getClients().remove(channel);
            String channelId = channel.id().asLongText();
            NettySocketUtils.onlineChannelsHeart.remove(channelId);
            if (socketListener != null) {
                socketListener.onDisconnect(channelId);
            }
        }

    /**
     * 有客户端终止连接服务器会触发此函数
     *
     * @param ctx 通道进程
     * @throws Exception 异常信息
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            String channelId = channel.id().asLongText();
            LOG_UTILS.info("RemoteAddress : " + channel.remoteAddress().toString() + " remove !");
            NettySocketUtils.getClients().remove(channel);
            NettySocketUtils.onlineChannelsHeart.remove(channelId);
            if (socketListener != null) {
                socketListener.onDisconnect(channelId);
            }
        }

    /**
     * 读取数据
     *
     * @param ctx 通道处理程序上下文
     * @param buf 消息
     * @throws Exception 异常信息
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
        String channelId = ctx.channel().id().asLongText();
        if (socketListener == null) {
                    return;
                }
            byte[] bytes = new byte[buf.readableBytes()];
        buf.getBytes(buf.readerIndex(), bytes);
            socketListener.onMessage(channelId, bytes);
        }

    /**
     * 异常处理
     *
     * @param ctx   通道处理程序上下文
     * @param cause 异常信息
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            Channel channel = ctx.channel();
            String channelId = channel.id().asLongText();
            LOG_UTILS.error("异常处理 - 通道ID:" + channelId + cause.getMessage());
            if (socketListener != null) {
                socketListener.onDisconnect(channelId);
            }
            if (channel.isActive()) {
                ctx.close();
                NettySocketUtils.getClients().remove(channel);
                NettySocketUtils.onlineChannelsHeart.remove(channelId);
            }
        }
}

11、Netty心跳处理器

package com.cdkjframework.socket.handler;

import com.cdkjframework.constant.IntegerConsts;
import com.cdkjframework.socket.NettySocketUtils;
import com.cdkjframework.socket.listener.SocketListener;
import com.cdkjframework.util.log.LogUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * @ProjectName: com.lesmarthome.iot
 * @Package: com.lesmarthome.iot.netty.handler
 * @ClassName: TcpHeartbeatHandler
 * @Description: Netty心跳处理器
 * @Author: xiaLin
 * @Version: 1.0
 */
public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {

    /**
     * 监听数据
     */
    private final SocketListener listener;

    /**
     * 日志
     */
    private LogUtils logUtils = LogUtils.getLogger(NettyHeartbeatHandler.class);

    /**
     * 构造函数
     */
    public NettyHeartbeatHandler(SocketListener listener) {
        this.listener = listener;
    }

    /**
     * 用户事件已触发
     *
     * @param ctx 通道处理程序上下文
     * @param evt 事件
     * @throws Exception 异常信息
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            Channel channel = ctx.channel();
            String channelId = channel.id().asLongText();

            // 该通道非法
            if (!NettySocketUtils.onlineChannelsHeart.containsKey(channelId)) {
                channel.close().sync();
                return;
            }
            IdleStateEvent event = (IdleStateEvent) evt;

            switch (event.state()) {
                // 进入读写空闲
                case ALL_IDLE:
                                    // 空闲60s之后触发 (心跳包丢失)
                                    Integer counter = NettySocketUtils.onlineChannelsHeart.get(channelId) + IntegerConsts.ONE;
                                    // 重置心跳丢失次数
                                    NettySocketUtils.onlineChannelsHeart.replace(channelId, counter);
                                    logUtils.info("通道【" + channelId + "】丢失了第 " + counter + " 个心跳包");
                                    if (counter < IntegerConsts.THREE) {
                                        return;
                                    }
                                    // 通道关闭
                                    logUtils.info("已与通道【%s】断开连接,地址:%s", channelId, channel.remoteAddress());
                                    if (listener == null) {
                                        // 连续丢失3个心跳包 (断开连接)
                                        channel.close().sync();
                                    } else {
                                        listener.heartbeat(channelId);
                                    }
                                    break;
                // 进入读空闲...
                case READER_IDLE:
                    logUtils.info("通道【%s】进入读空闲!", channelId);
                    break;
                // 进入写空闲...
                case WRITER_IDLE:
                    logUtils.info("通道【%s】进入写空闲!", channelId);
                    break;
            }
        }
    }
}

12、工具类

package com.cdkjframework.socket;

import com.cdkjframework.constant.IntegerConsts;
import com.cdkjframework.util.log.LogUtils;
import com.cdkjframework.util.tool.HexUtils;
import com.cdkjframework.util.tool.StringUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @ProjectName: socket-algorithm
 * @Package: com.lesmarthome.socket.netty
 * @ClassName: NettySocketUtils
 * @Author: frank
 * @Version: 1.0
 * @Description: 工具类
 */
public class NettySocketUtils {

    /**
     * 日志
     */
    private static final LogUtils logUtils = LogUtils.getLogger(NettySocketUtils.class);

    /**
     * 锁
     */
    private static final ReentrantLock Lock = new ReentrantLock();

    /**
     * 客户端集合
     */
    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 记录每一个channel的心跳包丢失次数
     */
    public static HashMap<String, Integer> onlineChannelsHeart = new HashMap<>();

    public static ChannelGroup getClients() {
        return clients;
    }

    public static void setClients(ChannelGroup clients) {
        NettySocketUtils.clients = clients;
    }

    /**
     * 发送消息
     *
     * @param message 消息内容
     */
    public static void sendRunnableMessage(String channelId, String message) {
        Lock.lock();
        try {
            Channel channel = findChannel(channelId);
            if (StringUtils.isNullAndSpaceOrEmpty(message) || channel == null) {
                return;
            }
            if (channel.isOpen()) {
                NettySocketUtils.onlineChannelsHeart.replace(channelId, IntegerConsts.ZERO);
                logUtils.info("channelId:" + channelId);
                message = message + System.lineSeparator();
                final ByteBuf buf = Unpooled.copiedBuffer(message, CharsetUtil.UTF_8);
                channel.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        channel.writeAndFlush(buf);
                    }
                });
            }
        } finally {
            Lock.unlock();
        }
    }

    /**
     * 发送消息
     *
     * @param message 消息内容
     */
    public static void sendMessageString(String channelId, String message) {
        Lock.lock();
        try {
            Channel channel = findChannel(channelId);
            if (StringUtils.isNullAndSpaceOrEmpty(message) || channel == null) {
                return;
            }
            if (channel.isOpen()) {
                NettySocketUtils.onlineChannelsHeart.replace(channelId, IntegerConsts.ZERO);
                ChannelFuture future = channel.writeAndFlush(Unpooled.wrappedBuffer(HexUtils.hexToByteArray(message)));
                future.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        //写操作完成,并没有错误发生
                        if (future.isSuccess()) {
                            logUtils.info("发送成功!");
                        } else {
                            //记录错误
                            logUtils.error(future.cause(), "发送失败!");
                        }
                    }
                });
            }
        } finally {
            Lock.unlock();
        }
    }

    /**
     * 发送消息
     *
     * @param message 消息内容
     */
    public static void sendMessage(String channelId, String message) {
        Lock.lock();
        try {
            Channel channel = findChannel(channelId);
            if (StringUtils.isNullAndSpaceOrEmpty(message) || channel == null) {
                return;
            }
            if (channel.isOpen()) {
                NettySocketUtils.onlineChannelsHeart.replace(channelId, IntegerConsts.ZERO);
                logUtils.info("channelId:" + channelId);
                final ByteBuf buf = Unpooled.copiedBuffer(message, CharsetUtil.UTF_8);
                channel.writeAndFlush(buf);
            }
        } finally {
            Lock.unlock();
        }
    }

    /**
     * 发送消息
     *
     * @param message 消息内容
     */
    public static void sendFourGBraceletByteMessage(String channelId, byte[] message) {
        Lock.lock();
        try {
            Channel channel = findChannel(channelId);
            if (StringUtils.isNullAndSpaceOrEmpty(message) || channel == null) {
                return;
            }
            if (channel.isOpen()) {
                logUtils.info("channelId:" + channelId);
                final ByteBuf buf = Unpooled.copiedBuffer(message, IntegerConsts.ZERO, message.length);
                channel.writeAndFlush(buf);
            }
        } finally {
            Lock.unlock();
        }
    }

    /**
     * 发送消息
     *
     * @param message 消息内容
     */
    public static void sendMessage(String channelId, byte [] message) {
        Lock.lock();
        try {
            Channel channel = findChannel(channelId);
            if (StringUtils.isNullAndSpaceOrEmpty(message) || channel == null) {
                return;
            }
            if (channel.isOpen()) {
                logUtils.info("channelId:" + channelId);
                final ByteBuf buf = Unpooled.copiedBuffer(message);
                channel.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        channel.writeAndFlush(buf);
                    }
                });
            }
        } finally {
            Lock.unlock();
        }
    }

    /**
     * 是否打开
     *
     * @param channelId 通道ID
     * @return 返回布尔
     */
    public static boolean isOpen(String channelId) {
        Channel channel = findChannel(channelId);
        return channel != null;
    }

    /**
     * 获取通道
     *
     * @param channelId 通道ID
     * @return 返回通道
     */
    public synchronized static Channel findChannel(String channelId) {
        Optional<Channel> optional = clients.stream()
                .filter(f -> f.id().asLongText().equals(channelId))
                .findFirst();

        // 验证是否查询到结果
        if (!optional.isPresent()) {
            return null;
        }
        // 验证连接是否存在
        Channel channel = optional.get();
        if (!channel.isOpen()) {
            clients.remove(channel);
            return null;
        }
        // 返回结果
        return channel;
    }
}

13、总结

以前内容为Spring Boot整合Netty实现TCP通信封装,在后面文章会详细说明如果去实践完成和硬件通信


本文转载自: https://blog.csdn.net/JPST228/article/details/141326477
版权归原作者 维基框架 所有, 如有侵权,请联系我们删除。

“Spring Boot 整合 Netty 现实TCP通信”的评论:

还没有评论