第一章、Spring Boot 整合 Netty 现实TCP通信(服务端)
第一章、Spring Boot 整合 Netty 现实TCP通信(客户端)
第二章、Spring Boot 整合 Netty 现实TCP通信实践
第三章、Spring Boot 整合 Netty 现实WebSocket通信
第四章、Spring Boot 整合 Netty 现实WebSocket通信实践
1、POM引入包
<dependencies>
<dependency>
<groupId>com.cdkjframework</groupId>
<artifactId>cdkj-util</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j-to-slf4j</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
版本控制及cdkj-util基础包请访问开源工具包
common: 运用cdkj Framework 框架只需要简单,速搭建 Spring Boot + Spring Cloud 微服务项目,也可以单独搭建 Spring Boot 项目。
2、资源目录下新建 META-INF\spring
新建文件 org.springframework.boot.autoconfigure.AutoConfiguration.imports 内容
com.cdkjframework.socket.config.SocketAutoConfiguration
3、启动类
EnableAutoSocket
package com.cdkjframework.socket.annotation;
import com.cdkjframework.socket.config.SocketMarkerConfiguration;
import org.springframework.context.annotation.Import;
import java.lang.annotation.*;
/**
* @ProjectName: cdkj-framework
* @Package: com.cdkjframework.swagger.annotation
* @ClassName: EnableAutoSocket
* @Description: socket 启动注解
* @Author: xiaLin
* @Date: 2023/7/18 9:20
* @Version: 1.0
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({SocketMarkerConfiguration.class})
public @interface EnableAutoSocket {
}
4、Socket标记配置
package com.cdkjframework.socket.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ProjectName: cdkj-framework
* @Package: com.cdkjframework.swagger.config
* @ClassName: SwaggerMarkerConfiguration
* @Description: Socket标记配置
* @Author: xiaLin
* @Date: 2023/7/18 9:21
* @Version: 1.0
*/
@Configuration(proxyBeanMethods = false)
public class SocketMarkerConfiguration {
@Bean
public Marker socketMarker() {
return new Marker();
}
public static class Marker {
}
}
5、配置读取
package com.cdkjframework.socket.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @ProjectName: cdkj-framework
* @Package: com.cdkjframework.web.socket.config
* @ClassName: SocketConfig
* @Description: socket配置
* @Author: xiaLin
* @Version: 1.0
*/
@Data
@Component
@Configuration
@ConfigurationProperties(prefix = "spring.custom.socket")
public class SocketConfig {
/**
* 端口号
*/
private List<Integer> port;
}
6、Socket自动配置
package com.cdkjframework.socket.config;
import com.cdkjframework.socket.NettySocketServer;
import com.cdkjframework.socket.listener.SocketListener;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.web.reactive.function.client.WebClientAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
/**
* @ProjectName: cdkj-framework
* @Package: com.cdkjframework.socket.config
* @ClassName: SocketAutoConfiguration
* @Description: Socket自动配置
* @Author: xiaLin
* @Date: 2023/7/18 9:26
* @Version: 1.0
*/
@Lazy(false)
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(SocketConfig.class)
@AutoConfigureAfter({WebClientAutoConfiguration.class})
@ConditionalOnBean(SocketMarkerConfiguration.Marker.class)
public class SocketAutoConfiguration {
/**
* 读取配置
*/
private final SocketConfig customConfig;
/**
* 网状通道处理器
*/
private final SocketListener socketListener;
/**
* 创建 bean
*
* @return 返回结果
*/
@Bean(initMethod = "start")
@ConditionalOnMissingBean
public NettySocketServer nettySocketServer() {
return new NettySocketServer(customConfig, socketListener);
}
}
7、Netty服务端初始化
主业务初始化服务端配置及监听端口等
package com.cdkjframework.socket;
import com.cdkjframework.constant.IntegerConsts;
import com.cdkjframework.socket.config.SocketConfig;
import com.cdkjframework.socket.handler.NettyChannelInitializer;
import com.cdkjframework.socket.listener.SocketListener;
import com.cdkjframework.util.log.LogUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @ProjectName: socket-algorithm
* @Package: com.lesmarthome.socket.netty
* @ClassName: NettySocketServer
* @Author: frank
* @Version: 1.0
* @Description: Netty服务端初始化
*/
public class NettySocketServer {
/**
* 日志
*/
private LogUtils logUtils = LogUtils.getLogger(NettySocketServer.class);
/**
* 读取配置
*/
private final SocketConfig customConfig;
/**
* 网状通道处理器
*/
private final SocketListener socketListener;
/**
* 构建函数
*/
public NettySocketServer(SocketConfig customConfig, SocketListener socketListener) {
this.customConfig = customConfig;
this.socketListener = socketListener;
}
/**
* 线程启动
*/
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
// 保持长连接
final boolean value = true;
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, value);
serverBootstrap.option(ChannelOption.SO_BACKLOG, IntegerConsts.BYTE_LENGTH);
serverBootstrap.childHandler(new NettyChannelInitializer(socketListener));
// 服务器的逻辑
// serverBootstrap.childHandler(new NettyChannelHandler(socketListener));
// 监听多个端口
List<Integer> portList = customConfig.getPort();
for (Integer port :
portList) {
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
// 给cf 注册监听器,监控我们关心的事件
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (channelFuture.isSuccess()) {
logUtils.info("监听端口[" + port + "]成功!");
} else {
logUtils.error("监听端口[" + port + "]失败!");
}
}
});
// channelFuture.channel().closeFuture().sync(); 对关闭通道进行监听
}
} catch (InterruptedException e) {
logUtils.error(e);
} finally {
// bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
}
}
}
8、监听接口
SocketListener需要业务端自行实现
package com.cdkjframework.socket.listener;
/**
* @ProjectName: socket-algorithm
* @Package: com.lesmarthome.socket.netty.listener
* @ClassName: SocketListener
* @Author: frank
* @Version: 1.0
* @Description: 监听接口
*/
public interface SocketListener {
/**
* 消息
*
* @param channelId 通话ID
* @param bytes 消息内容字节数据
*/
void onMessage(String channelId, byte[] bytes);
/**
* 心跳
*
* @param channelId 通话ID
*/
void heartbeat(String channelId);
/**
* 断开连接
*
* @param channelId 通道ID
*/
void onDisconnect(String channelId);
}
9、Netty通道初始化器
package com.cdkjframework.socket.handler;
import com.cdkjframework.constant.IntegerConsts;
import com.cdkjframework.socket.listener.SocketListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @ProjectName: socket-algorithm
* @Package: com.lesmarthome.socket.netty.handler
* @ClassName: NettyChannelInitializer
* @Author: frank
* @Version: 1.0
* @Description: Netty通道初始化器
*/
@Component
@ChannelHandler.Sharable
public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
/**
* 网状通道处理器
*/
private final SocketListener socketListener;
/**
* 构造函数
*/
public NettyChannelInitializer(SocketListener socketListener) {
this.socketListener = socketListener;
}
/**
* 初始化通道
*
* @param socketChannel 插座通道
* @throws Exception 异常信息
*/
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 服务器的逻辑
pipeline.addLast(new NettyChannelHandler(socketListener));
// 处理心跳
pipeline.addLast(new IdleStateHandler(IntegerConsts.FIVE, IntegerConsts.ZERO, IntegerConsts.ZERO, TimeUnit.MINUTES));
//处理日志
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
// 自定义 handler
pipeline.addLast(new NettyHeartbeatHandler(socketListener));
}
}
10、Netty通道处理器
package com.cdkjframework.socket.handler;
import com.cdkjframework.constant.IntegerConsts;
import com.cdkjframework.socket.NettySocketUtils;
import com.cdkjframework.socket.listener.SocketListener;
import com.cdkjframework.util.log.LogUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.stereotype.Component;
/**
* @ProjectName: socket-algorithm
* @Package: com.lesmarthome.socket.netty.handler
* @ClassName: NettyChannelHandler
* @Author: frank
* @Version: 1.0
* @Description: Netty通道处理器
*/
@Component
@ChannelHandler.Sharable
public class NettyChannelHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 日志
*/
private static final LogUtils LOG_UTILS = LogUtils.getLogger(NettyChannelHandler.class);
/**
* socket 监听
*/
private final SocketListener socketListener;
/**
* 构造函数
*
* @param socketListener 监听接口
*/
public NettyChannelHandler(SocketListener socketListener) {
this.socketListener = socketListener;
}
/**
* 连接成功
*
* @param ctx 通道处理程序上下文
* @throws Exception 异常信息
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
LOG_UTILS.info("RemoteAddress : " + channel.remoteAddress().toString() + " add !");
NettySocketUtils.getClients().add(channel);
NettySocketUtils.onlineChannelsHeart.put(ctx.channel().id().asLongText(), IntegerConsts.ONE);
}
/**
* 断开成功
*
* @param ctx 通道处理程序上下文
* @throws Exception 异常信息
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
NettySocketUtils.getClients().remove(channel);
String channelId = channel.id().asLongText();
NettySocketUtils.onlineChannelsHeart.remove(channelId);
if (socketListener != null) {
socketListener.onDisconnect(channelId);
}
}
/**
* 有客户端终止连接服务器会触发此函数
*
* @param ctx 通道进程
* @throws Exception 异常信息
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
String channelId = channel.id().asLongText();
LOG_UTILS.info("RemoteAddress : " + channel.remoteAddress().toString() + " remove !");
NettySocketUtils.getClients().remove(channel);
NettySocketUtils.onlineChannelsHeart.remove(channelId);
if (socketListener != null) {
socketListener.onDisconnect(channelId);
}
}
/**
* 读取数据
*
* @param ctx 通道处理程序上下文
* @param buf 消息
* @throws Exception 异常信息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
String channelId = ctx.channel().id().asLongText();
if (socketListener == null) {
return;
}
byte[] bytes = new byte[buf.readableBytes()];
buf.getBytes(buf.readerIndex(), bytes);
socketListener.onMessage(channelId, bytes);
}
/**
* 异常处理
*
* @param ctx 通道处理程序上下文
* @param cause 异常信息
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
String channelId = channel.id().asLongText();
LOG_UTILS.error("异常处理 - 通道ID:" + channelId + cause.getMessage());
if (socketListener != null) {
socketListener.onDisconnect(channelId);
}
if (channel.isActive()) {
ctx.close();
NettySocketUtils.getClients().remove(channel);
NettySocketUtils.onlineChannelsHeart.remove(channelId);
}
}
}
11、Netty心跳处理器
package com.cdkjframework.socket.handler;
import com.cdkjframework.constant.IntegerConsts;
import com.cdkjframework.socket.NettySocketUtils;
import com.cdkjframework.socket.listener.SocketListener;
import com.cdkjframework.util.log.LogUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
/**
* @ProjectName: com.lesmarthome.iot
* @Package: com.lesmarthome.iot.netty.handler
* @ClassName: TcpHeartbeatHandler
* @Description: Netty心跳处理器
* @Author: xiaLin
* @Version: 1.0
*/
public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
/**
* 监听数据
*/
private final SocketListener listener;
/**
* 日志
*/
private LogUtils logUtils = LogUtils.getLogger(NettyHeartbeatHandler.class);
/**
* 构造函数
*/
public NettyHeartbeatHandler(SocketListener listener) {
this.listener = listener;
}
/**
* 用户事件已触发
*
* @param ctx 通道处理程序上下文
* @param evt 事件
* @throws Exception 异常信息
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
Channel channel = ctx.channel();
String channelId = channel.id().asLongText();
// 该通道非法
if (!NettySocketUtils.onlineChannelsHeart.containsKey(channelId)) {
channel.close().sync();
return;
}
IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) {
// 进入读写空闲
case ALL_IDLE:
// 空闲60s之后触发 (心跳包丢失)
Integer counter = NettySocketUtils.onlineChannelsHeart.get(channelId) + IntegerConsts.ONE;
// 重置心跳丢失次数
NettySocketUtils.onlineChannelsHeart.replace(channelId, counter);
logUtils.info("通道【" + channelId + "】丢失了第 " + counter + " 个心跳包");
if (counter < IntegerConsts.THREE) {
return;
}
// 通道关闭
logUtils.info("已与通道【%s】断开连接,地址:%s", channelId, channel.remoteAddress());
if (listener == null) {
// 连续丢失3个心跳包 (断开连接)
channel.close().sync();
} else {
listener.heartbeat(channelId);
}
break;
// 进入读空闲...
case READER_IDLE:
logUtils.info("通道【%s】进入读空闲!", channelId);
break;
// 进入写空闲...
case WRITER_IDLE:
logUtils.info("通道【%s】进入写空闲!", channelId);
break;
}
}
}
}
12、工具类
package com.cdkjframework.socket;
import com.cdkjframework.constant.IntegerConsts;
import com.cdkjframework.util.log.LogUtils;
import com.cdkjframework.util.tool.HexUtils;
import com.cdkjframework.util.tool.StringUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;
/**
* @ProjectName: socket-algorithm
* @Package: com.lesmarthome.socket.netty
* @ClassName: NettySocketUtils
* @Author: frank
* @Version: 1.0
* @Description: 工具类
*/
public class NettySocketUtils {
/**
* 日志
*/
private static final LogUtils logUtils = LogUtils.getLogger(NettySocketUtils.class);
/**
* 锁
*/
private static final ReentrantLock Lock = new ReentrantLock();
/**
* 客户端集合
*/
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 记录每一个channel的心跳包丢失次数
*/
public static HashMap<String, Integer> onlineChannelsHeart = new HashMap<>();
public static ChannelGroup getClients() {
return clients;
}
public static void setClients(ChannelGroup clients) {
NettySocketUtils.clients = clients;
}
/**
* 发送消息
*
* @param message 消息内容
*/
public static void sendRunnableMessage(String channelId, String message) {
Lock.lock();
try {
Channel channel = findChannel(channelId);
if (StringUtils.isNullAndSpaceOrEmpty(message) || channel == null) {
return;
}
if (channel.isOpen()) {
NettySocketUtils.onlineChannelsHeart.replace(channelId, IntegerConsts.ZERO);
logUtils.info("channelId:" + channelId);
message = message + System.lineSeparator();
final ByteBuf buf = Unpooled.copiedBuffer(message, CharsetUtil.UTF_8);
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
channel.writeAndFlush(buf);
}
});
}
} finally {
Lock.unlock();
}
}
/**
* 发送消息
*
* @param message 消息内容
*/
public static void sendMessageString(String channelId, String message) {
Lock.lock();
try {
Channel channel = findChannel(channelId);
if (StringUtils.isNullAndSpaceOrEmpty(message) || channel == null) {
return;
}
if (channel.isOpen()) {
NettySocketUtils.onlineChannelsHeart.replace(channelId, IntegerConsts.ZERO);
ChannelFuture future = channel.writeAndFlush(Unpooled.wrappedBuffer(HexUtils.hexToByteArray(message)));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//写操作完成,并没有错误发生
if (future.isSuccess()) {
logUtils.info("发送成功!");
} else {
//记录错误
logUtils.error(future.cause(), "发送失败!");
}
}
});
}
} finally {
Lock.unlock();
}
}
/**
* 发送消息
*
* @param message 消息内容
*/
public static void sendMessage(String channelId, String message) {
Lock.lock();
try {
Channel channel = findChannel(channelId);
if (StringUtils.isNullAndSpaceOrEmpty(message) || channel == null) {
return;
}
if (channel.isOpen()) {
NettySocketUtils.onlineChannelsHeart.replace(channelId, IntegerConsts.ZERO);
logUtils.info("channelId:" + channelId);
final ByteBuf buf = Unpooled.copiedBuffer(message, CharsetUtil.UTF_8);
channel.writeAndFlush(buf);
}
} finally {
Lock.unlock();
}
}
/**
* 发送消息
*
* @param message 消息内容
*/
public static void sendFourGBraceletByteMessage(String channelId, byte[] message) {
Lock.lock();
try {
Channel channel = findChannel(channelId);
if (StringUtils.isNullAndSpaceOrEmpty(message) || channel == null) {
return;
}
if (channel.isOpen()) {
logUtils.info("channelId:" + channelId);
final ByteBuf buf = Unpooled.copiedBuffer(message, IntegerConsts.ZERO, message.length);
channel.writeAndFlush(buf);
}
} finally {
Lock.unlock();
}
}
/**
* 发送消息
*
* @param message 消息内容
*/
public static void sendMessage(String channelId, byte [] message) {
Lock.lock();
try {
Channel channel = findChannel(channelId);
if (StringUtils.isNullAndSpaceOrEmpty(message) || channel == null) {
return;
}
if (channel.isOpen()) {
logUtils.info("channelId:" + channelId);
final ByteBuf buf = Unpooled.copiedBuffer(message);
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
channel.writeAndFlush(buf);
}
});
}
} finally {
Lock.unlock();
}
}
/**
* 是否打开
*
* @param channelId 通道ID
* @return 返回布尔
*/
public static boolean isOpen(String channelId) {
Channel channel = findChannel(channelId);
return channel != null;
}
/**
* 获取通道
*
* @param channelId 通道ID
* @return 返回通道
*/
public synchronized static Channel findChannel(String channelId) {
Optional<Channel> optional = clients.stream()
.filter(f -> f.id().asLongText().equals(channelId))
.findFirst();
// 验证是否查询到结果
if (!optional.isPresent()) {
return null;
}
// 验证连接是否存在
Channel channel = optional.get();
if (!channel.isOpen()) {
clients.remove(channel);
return null;
}
// 返回结果
return channel;
}
}
13、总结
以前内容为Spring Boot整合Netty实现TCP通信封装,在后面文章会详细说明如果去实践完成和硬件通信
版权归原作者 维基框架 所有, 如有侵权,请联系我们删除。