目录
Netty官网:Netty: Home
前面我们介绍了Netty的基本用法以及基本知识,但是在我们的实际开发中要用到SpringBoot,下面我们来看看SpringBoot的整合与简单的文件传输吧
一 Netty+SpringBoot环境搭建
🌈🌈依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.65.Final</version></dependency><!-- 修复 ApacheLog4j2 远程代码执行漏洞 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.0</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.17.0</version></dependency>
🌈🌈yaml配置
server:
port:8080
netty:
host:127.0.0.1
port:7397
下面我们分别搭建服务端与客户端
1.1 服务端
packagecom.shu;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.Channel;importio.netty.channel.ChannelFuture;importio.netty.channel.ChannelOption;importio.netty.channel.EventLoopGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.nio.NioServerSocketChannel;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.stereotype.Component;importjava.net.InetSocketAddress;/**
* @description:
* @author: shu
* @createDate: 2023/5/7 10:27
* @version: 1.0
*/@Component("nettyServer")publicclassNettyServer{privateLogger logger =LoggerFactory.getLogger(NettyServer.class);privatefinalEventLoopGroup parentGroup =newNioEventLoopGroup();privatefinalEventLoopGroup childGroup =newNioEventLoopGroup();privateChannel channel;/**
* 绑定端口
* @param address
* @return
*/publicChannelFuturebind(InetSocketAddress address){ChannelFuture channelFuture =null;try{ServerBootstrap b =newServerBootstrap();
b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)//非阻塞模式.option(ChannelOption.SO_BACKLOG,128).childHandler(newMyChannelInitializer());
channelFuture = b.bind(address).syncUninterruptibly();
channel = channelFuture.channel();}catch(Exception e){
logger.error(e.getMessage());}finally{if(null!= channelFuture && channelFuture.isSuccess()){
logger.info("netty server start done.");}else{
logger.error("netty server start error.");}}return channelFuture;}/**
* 销毁
*/publicvoiddestroy(){if(null== channel)return;
channel.close();
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();}/**
* 获取通道
* @return
*/publicChannelgetChannel(){return channel;}}
packagecom.shu;importio.netty.channel.ChannelInitializer;importio.netty.channel.socket.SocketChannel;importio.netty.handler.codec.LineBasedFrameDecoder;importio.netty.handler.codec.string.StringDecoder;importio.netty.handler.codec.string.StringEncoder;importio.netty.handler.logging.LogLevel;importio.netty.handler.logging.LoggingHandler;importjava.nio.charset.Charset;/**
* @description:
* @author: shu
* @createDate: 2023/5/7 10:31
* @version: 1.0
*/publicclassMyChannelInitializerextendsChannelInitializer<SocketChannel>{/**
* 初始化channel
* @param channel
* @throws Exception
*/@OverrideprotectedvoidinitChannel(SocketChannel channel)throwsException{// 日志打印
channel.pipeline().addLast(newLoggingHandler(LogLevel.INFO));// 基于换行符号
channel.pipeline().addLast(newLineBasedFrameDecoder(1024));// 解码转String,注意调整自己的编码格式GBK、UTF-8
channel.pipeline().addLast(newStringDecoder(Charset.forName("GBK")));// 解码转String,注意调整自己的编码格式GBK、UTF-8
channel.pipeline().addLast(newStringEncoder(Charset.forName("GBK")));// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(newMyServerHandler());}}
packagecom.shu;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandlerAdapter;importio.netty.channel.socket.SocketChannel;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.text.SimpleDateFormat;importjava.util.Date;/**
* @description:
* @author: shu
* @createDate: 2023/5/7 10:33
* @version: 1.0
*/publicclassMyServerHandlerextendsChannelInboundHandlerAdapter{privateLogger logger =LoggerFactory.getLogger(MyServerHandler.class);/**
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{SocketChannel channel =(SocketChannel) ctx.channel();//通知客户端链接建立成功String str ="通知客户端链接建立成功"+" "+newDate()+" "+ channel.localAddress().getHostString()+"\r\n";
ctx.writeAndFlush(str);}/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/@OverridepublicvoidchannelInactive(ChannelHandlerContext ctx)throwsException{
logger.info("客户端断开链接{}", ctx.channel().localAddress().toString());}@OverridepublicvoidchannelRead(ChannelHandlerContext ctx,Object msg)throwsException{//接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
logger.info(newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(newDate())+" 服务端接收到消息:"+ msg);//通知客户端链消息发送成功String str ="服务端收到:"+newDate()+" "+ msg +"\r\n";
ctx.writeAndFlush(str);}/**
* 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
*/@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx,Throwable cause)throwsException{
ctx.close();
logger.info("异常信息:\r\n"+ cause.getMessage());}}
packagecom.shu;importio.netty.channel.ChannelFuture;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.CommandLineRunner;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.context.annotation.ComponentScan;importjava.net.InetSocketAddress;/**
* @description:
* @author: shu
* @createDate: 2023/5/7 10:36
* @version: 1.0
*/@SpringBootApplication@ComponentScan("com.shu")publicclassNettyApplicationimplementsCommandLineRunner{@Value("${netty.host}")privateString host;@Value("${netty.port}")privateint port;@AutowiredprivateNettyServer nettyServer;publicstaticvoidmain(String[] args){SpringApplication.run(NettyApplication.class, args);}@Overridepublicvoidrun(String... args)throwsException{InetSocketAddress address =newInetSocketAddress(host, port);ChannelFuture channelFuture = nettyServer.bind(address);Runtime.getRuntime().addShutdownHook(newThread(()-> nettyServer.destroy()));
channelFuture.channel().closeFuture().syncUninterruptibly();}}
1.2 客户端
importio.netty.bootstrap.Bootstrap;importio.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioSocketChannel;importio.netty.handler.codec.LineBasedFrameDecoder;importio.netty.handler.codec.string.StringDecoder;importio.netty.handler.codec.string.StringEncoder;importio.netty.handler.logging.LogLevel;importio.netty.handler.logging.LoggingHandler;importjava.nio.charset.Charset;importjava.text.SimpleDateFormat;importjava.util.Date;/**
* @description:
* @author: shu
* @createDate: 2023/5/7 10:41
* @version: 1.0
*/publicclassApiTest{publicstaticvoidmain(String[] args){EventLoopGroup workerGroup =newNioEventLoopGroup();try{Bootstrap b =newBootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.AUTO_READ,true);
b.handler(newChannelInitializer<SocketChannel>(){@OverrideprotectedvoidinitChannel(SocketChannel channel)throwsException{// 日志打印
channel.pipeline().addLast(newLoggingHandler(LogLevel.INFO));// 基于换行符号
channel.pipeline().addLast(newLineBasedFrameDecoder(1024));// 解码转String,注意调整自己的编码格式GBK、UTF-8
channel.pipeline().addLast(newStringDecoder(Charset.forName("GBK")));// 解码转String,注意调整自己的编码格式GBK、UTF-8
channel.pipeline().addLast(newStringEncoder(Charset.forName("GBK")));// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(newChannelInboundHandlerAdapter(){@OverridepublicvoidchannelRead(ChannelHandlerContext ctx,Object msg)throwsException{//接收msg消息{与上一章节相比,此处已经不需要自己进行解码}System.out.println(newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(newDate())+" 客户端接收到消息:"+ msg);}});}});ChannelFuture f = b.connect("127.0.0.1",7397).sync();System.out.println(" client start done");//向服务端发送信息
f.channel().writeAndFlush("你好,SpringBoot启动的netty服务端,“我的结尾是一个换行符,用于传输半包粘包处理”\r\n");
f.channel().writeAndFlush("你好,SpringBoot启动的netty服务端,“我的结尾是一个换行符,用于传输半包粘包处理”\r\n");
f.channel().writeAndFlush("你好,SpringBoot启动的netty服务端,“我的结尾是一个换行符,用于传输半包粘包处理”\r\n");
f.channel().writeAndFlush("你好,SpringBoot启动的netty服务端,“我的结尾是一个换行符,用于传输半包粘包处理”\r\n");
f.channel().writeAndFlush("你好,SpringBoot启动的netty服务端,“我的结尾是一个换行符,用于传输半包粘包处理”\r\n");
f.channel().closeFuture().syncUninterruptibly();}catch(InterruptedException e){
e.printStackTrace();}finally{
workerGroup.shutdownGracefully();}}}
1.3 测试
我们仔细来观察一下这个日志打印,并注意一下这些字节的意思
- 注册
客户端
服务端
- 客户端建立连接
- 服务端连接建立,通知客户端连接建立
看到这里我就很疑惑他到底是如何把我们的文件转换成16进制的?
通过代码我们知道我们传递的文字:通知客户端链接建立成功 Mon May 08 20:30:04 CST 2023 127.0.0.1 但是他是如何转换成16进制的,下面我们来具体分析一下?
前提知识?
- GBK的中文编码是双字节来表示的,英文编码是用ASC||码表示的,既用单字节表示。但GBK编码表中也有英文字符的双字节表示形式,所以英文字母可以有2中GBK表示方式。为区分中文,将其最高位都定成1。英文单字节最高位都为0。当用GBK解码时,若高字节最高位为0,则用ASC||码表解码;若高字节最高位为1,则用GBK编码表解码
- 至于UTF-8编码则是用以解决国际上字符的一种多字节编。码,它对英文使用8位(即一个字节),中文使用24位(三个字节)来编码。
分析
这里我们采用的是GBK编码,那我们咋看懂这些字节表示的啥,这里可能还需要一些基本知识进制的转成,1字节8bit ,有了基础知识我们再来分析,如上面说中文GBK两个字节表示一个中文,那么通这个中文对应的两个字节就是cd a8,那咋验证?
参考网站:GB2312简体中文编码表 - 常用参考表对照表 - 脚本之家在线工具
我们可以发现真是CDA8代表汉字通,这也解释我心中的疑惑,这里这是汉字,下面我们看一下字母
字母分析?
当用GBK解码时,若高字节最高位为0,则用ASC||码表解码;若高字节最高位为1,则用GBK编码表解码?
那啥是高字节与低字节?
存放最低的8位有效位的字节被称为最低有效位字节或低位字节,而存放最高的8位有效位的字节被称为最高有效位字节或高位字节。
高位字节 低位字节
↓------------------------------↓ ↓-----------------------------↓
15 14 13 12 11 10 9. 8. 7. 6. 5. 4. 3. 2. 1. 0.
参考链接:ASCII码一览表,ASCII码对照表
我们可以通过Ascll码表可以发现空格对应16进制的20
而M的十六进制:4D
相信进过上面的理解,应该这一看懂这段报文的理解,这也为了自己看懂协议有了会很好的铺垫,下面我们继续看服务端与客户端的连接过程
- 客户端收到消息,建立链接,可以进行消息的传递了
- 客户端写消息
- 服务端收消息
二 Netty实现简单的文件传输
文件传输在我们的实际开发中中非常常见,下面我们来个简单的案例
2.1 实体类
主要必须实现序列化
packagecom.shu.file01;importlombok.Data;importjava.io.File;importjava.io.Serializable;/**
* @description:
* @author: shu
* @createDate: 2023/5/7 20:28
* @version: 1.0
*/@DatapublicclassFileResponseimplementsSerializable{privatelong length;privateFile file;publicFileResponse(){}publicFileResponse(long length,File file){this.length = length;this.file = file;}publiclonggetLength(){return length;}publicFilegetFile(){return file;}}
packagecom.shu.file01;importlombok.Data;importjava.io.Serializable;/**
* @description:
* @author: shu
* @createDate: 2023/5/7 20:27
* @version: 1.0
*/@DatapublicclassFileRequestimplementsSerializable{privateString fileName;publicFileRequest(){}publicFileRequest(String fileName){this.fileName = fileName;}publicStringgetFileName(){return fileName;}}
2.2 服务端
packagecom.shu.file01;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioServerSocketChannel;importio.netty.handler.codec.serialization.ClassResolvers;importio.netty.handler.codec.serialization.ObjectDecoder;importio.netty.handler.codec.serialization.ObjectEncoder;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.stereotype.Component;/**
* @description:
* @author: shu
* @createDate: 2023/5/7 19:57
* @version: 1.0
*/@Component("fileServer")publicclassFileServer{privateLogger logger =LoggerFactory.getLogger(FileServer.class);/**
* 绑定端口
*
* @param port
* @return
*/publicvoidbind(int port){EventLoopGroup bossGroup =newNioEventLoopGroup();EventLoopGroup workerGroup =newNioEventLoopGroup();try{ServerBootstrap bootstrap =newServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(newChannelInitializer<SocketChannel>(){@OverrideprotectedvoidinitChannel(SocketChannel ch)throwsException{
ch.pipeline().addLast(newObjectEncoder()).addLast(newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.weakCachingConcurrentResolver(null))).addLast(newFileServerHandler());}});ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();System.out.println("服务端启动成功,端口:"+ port);}catch(InterruptedException e){thrownewRuntimeException(e);}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();}}}
packagecom.shu.file01;importio.netty.channel.*;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.io.File;importjava.io.RandomAccessFile;/**
* @description:
* @author: shu
* @createDate: 2023/5/7 20:19
* @version: 1.0
*/publicclassFileServerHandlerextendsChannelInboundHandlerAdapter{privatestaticfinalString FILE_PATH ="D:\\coreconfig.txt";privateLogger logger =LoggerFactory.getLogger(FileServerHandler.class);@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{super.channelActive(ctx);
logger.info("channelActive");}@OverridepublicvoidchannelRead(ChannelHandlerContext ctx,Object msg)throwsException{
logger.info("channelRead");if(msg instanceofFileRequest){FileRequest request =(FileRequest) msg;if(request.getFileName().equals(FILE_PATH)){File file =newFile(FILE_PATH);if(file.exists()){RandomAccessFile raf =newRandomAccessFile(file,"r");long length = raf.length();FileResponse response =newFileResponse(length, file);
ctx.writeAndFlush(response);ChannelFuture sendFileFuture = ctx.writeAndFlush(newDefaultFileRegion(raf.getChannel(),0, length), ctx.newProgressivePromise());
sendFileFuture.addListener(newChannelProgressiveFutureListener(){@OverridepublicvoidoperationComplete(ChannelProgressiveFuture future)throwsException{System.out.println("File transfer completed.");
raf.close();}@OverridepublicvoidoperationProgressed(ChannelProgressiveFuture future,long progress,long total)throwsException{if(total <0){System.err.println("File transfer progress: "+ progress);}else{System.err.println("File transfer progress: "+ progress +" / "+ total);}}});}else{System.err.println("File not found: "+ FILE_PATH);}}else{System.err.println("Invalid file name: "+ request.getFileName());}}}}
2.3 客户端
packagecom.shu.file01;importio.netty.bootstrap.Bootstrap;importio.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioSocketChannel;importio.netty.handler.codec.serialization.ClassResolvers;importio.netty.handler.codec.serialization.ObjectDecoder;importio.netty.handler.codec.serialization.ObjectEncoder;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;/**
* @description:
* @author: shu
* @createDate: 2023/5/7 20:42
* @version: 1.0
*/publicclassFileClient{privatestaticfinalint PORT =8080;privatestaticfinalString HOST ="localhost";privateLogger logger =LoggerFactory.getLogger(FileClient.class);privatefinalEventLoopGroup parentGroup =newNioEventLoopGroup();/**
* 连接服务端
*/publicvoidconnect(){EventLoopGroup group =newNioEventLoopGroup();try{Bootstrap bootstrap =newBootstrap().group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).handler(newChannelInitializer<SocketChannel>(){@OverrideprotectedvoidinitChannel(SocketChannel ch)throwsException{
ch.pipeline().addLast(newObjectEncoder()).addLast(newObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null))).addLast(newFileClientHandler());}});ChannelFuture future = bootstrap.connect(HOST, PORT).sync();
future.channel().closeFuture().sync();}catch(Exception e){thrownewRuntimeException(e);}finally{
group.shutdownGracefully();}}}
packagecom.shu.file01;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandlerAdapter;importjava.io.File;importjava.io.FileOutputStream;importjava.nio.channels.FileChannel;/**
* @description:
* @author: shu
* @createDate: 2023/5/7 20:47
* @version: 1.0
*/publicclassFileClientHandlerextendsChannelInboundHandlerAdapter{@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{FileRequest request =newFileRequest("D:\\coreconfig.txt");
ctx.writeAndFlush(request);}@OverridepublicvoidchannelRead(ChannelHandlerContext ctx,Object msg)throwsException{if(msg instanceofFileResponse){FileResponse response =(FileResponse) msg;File file = response.getFile();long fileLength = response.getLength();FileOutputStream fos =newFileOutputStream(file);FileChannel channel = fos.getChannel();// channel.transferFrom(channel, 0, fileLength);System.out.println("File "+ file.getName()+" received.");}else{System.err.println("Invalid response type: "+ msg.getClass());}}}
2.4 测试
这就是一个简单的测试吧,实际的开发中我们需要考虑很多,比如大文件的传输,断点续点,文件传输的加密等等系列问题,这个我正在研究中,以后再说
版权归原作者 长安不及十里 所有, 如有侵权,请联系我们删除。