0


【Netty】自定义解码器、编码器、编解码器(十五)

文章目录

前言

回顾Netty系列文章:

  • Netty 概述(一)
  • Netty 架构设计(二)
  • Netty Channel 概述(三)
  • Netty ChannelHandler(四)
  • ChannelPipeline源码分析(五)
  • 字节缓冲区 ByteBuf (六)(上)
  • 字节缓冲区 ByteBuf(七)(下)
  • Netty 如何实现零拷贝(八)
  • Netty 程序引导类(九)
  • Reactor 模型(十)
  • 工作原理详解(十一)
  • Netty 解码器(十二)
  • Netty 编码器(十三)
  • Netty 编解码器(十四)

我们今天继续来分析 Netty 的编解码器,这次我们要自己动手实现自定义的编码器、解码器和编解码器。
废话不多说,进入正文吧。

一、自定义基于换行的解码器

1.1 LineBasedFrameDecoder 类

LineBasedFrameDecoder 类是基于换行的,意味着只要在接收数据时遇到以换行符\n或者回车换行符\r\n结尾时,就表明数据已经接收完成可以被处理了。

LineBasedFrameDecoder 类继承自 ByteToMessageDecoder,并重写了 decode方法。

publicclassLineBasedFrameDecoderextendsByteToMessageDecoder{/** 帧的最大长度限制  */privatefinalint maxLength;/** 帧超长时是否抛出异常 */privatefinalboolean failFast;privatefinalboolean stripDelimiter;/** 如果超出长度则为True,表明需要丢弃输入的数据  */privateboolean discarding;privateint discardedBytes;/** 最后扫描的位置 */privateint offset;publicLineBasedFrameDecoder(finalint maxLength){this(maxLength,true,false);}publicLineBasedFrameDecoder(finalint maxLength,finalboolean stripDelimiter,finalboolean failFast){this.maxLength = maxLength;this.failFast = failFast;this.stripDelimiter = stripDelimiter;}@Overrideprotectedfinalvoiddecode(ChannelHandlerContext ctx,ByteBuf in,List<Object> out)throwsException{Object decoded =decode(ctx, in);if(decoded !=null){
            out.add(decoded);}}protectedObjectdecode(ChannelHandlerContext ctx,ByteBuf buffer)throwsException{finalint eol =findEndOfLine(buffer);if(!discarding){if(eol >=0){finalByteBuf frame;finalint length = eol - buffer.readerIndex();finalint delimLength = buffer.getByte(eol)=='\r'?2:1;if(length > maxLength){
                    buffer.readerIndex(eol + delimLength);fail(ctx, length);returnnull;}if(stripDelimiter){
                    frame = buffer.readRetainedSlice(length);
                    buffer.skipBytes(delimLength);}else{
                    frame = buffer.readRetainedSlice(length + delimLength);}return frame;}else{finalint length = buffer.readableBytes();if(length > maxLength){
                    discardedBytes = length;
                    buffer.readerIndex(buffer.writerIndex());
                    discarding =true;
                    offset =0;if(failFast){fail(ctx,"over "+ discardedBytes);}}returnnull;}}else{if(eol >=0){finalint length = discardedBytes + eol - buffer.readerIndex();finalint delimLength = buffer.getByte(eol)=='\r'?2:1;
                buffer.readerIndex(eol + delimLength);
                discardedBytes =0;
                discarding =false;if(!failFast){fail(ctx, length);}}else{
                discardedBytes += buffer.readableBytes();
                buffer.readerIndex(buffer.writerIndex());// 跳过缓冲区中的所有内容,需要再次将offset 设置为0
                offset =0;}returnnull;}}privatevoidfail(finalChannelHandlerContext ctx,int length){fail(ctx,String.valueOf(length));}privatevoidfail(finalChannelHandlerContext ctx,String length){
        ctx.fireExceptionCaught(newTooLongFrameException("frame length ("+ length +") exceeds the allowed maximum ("+ maxLength +')'));}/**
     * 返回找到的行尾缓冲区的索引
     * 如果在缓冲区中未找到行尾,则返回 -1
     */privateintfindEndOfLine(finalByteBuf buffer){int totalLength = buffer.readableBytes();int i = buffer.forEachByte(buffer.readerIndex()+ offset, totalLength - offset,ByteProcessor.FIND_LF);if(i >=0){
            offset =0;// 判断是否是回车符if(i >0&& buffer.getByte(i -1)=='\r'){
                i--;}}else{
            offset = totalLength;}return i;}}

从上述代码可以看出,LineBasedFrameDecoder是通过查找回车换行符来找到数据结束的标志的。

1.2 定义解码器

定义了解码器MyLineBasedFrameDecoder,该解码器继承自LineBasedFrameDecoder,因此可以使用LineBasedFrameDecoder上的所有功能。
代码如下:

publicclassMyLineBasedFrameDecoderextendsLineBasedFrameDecoder{privatefinalstaticint MAX_LENGTH =1024;// 帧的最大长度publicMyLineBasedFrameDecoder(){super(MAX_LENGTH);}}

在上述代码中,通过MAX_LENGTH常量,来限制解码器帧的大小。超过该常量值的限制的话,则会抛出TooLongFrameException异常。

1.3 定义 ChannelHandler

ChannelHandler 定义如下:

publicclassMyLineBasedFrameDecoderServerHandlerextendsChannelInboundHandlerAdapter{@OverridepublicvoidchannelRead(ChannelHandlerContext ctx,Object msg){// 接收msg消息,此处已经无需解码了System.out.println("Client -> Server: "+ msg);}}

MyLineBasedFrameDecoderServerHandler业务非常简单,把收到的消息打印出来即可。

1.4 定义 ChannelInitializer

定义一个 ChannelInitializer 用于容纳解码器 MyLineBasedFrameDecoder 和 MyLineBasedFrameDecoderServerHandler,代码如下:

publicclassMyLineBasedFrameDecoderChannelInitializerextendsChannelInitializer<SocketChannel>{@OverrideprotectedvoidinitChannel(SocketChannel channel){// 基于换行符号
        channel.pipeline().addLast(newMyLineBasedFrameDecoder());// 解码转String
        channel.pipeline().addLast(newStringDecoder());// 自定义ChannelHandler
        channel.pipeline().addLast(newMyLineBasedFrameDecoderServerHandler());}}

注意添加到ChannelPipeline的ChannelHandler的顺序,MyLineBasedFrameDecoder 在前,MyLineBasedFrameDecoderServerHandler 在后,意味着数据先经过MyLineBasedFrameDecoder 解码,然后再交给MyLineBasedFrameDecoderServerHandler 处理。
StringDecoder实现将数据转换为字符串。

1.5 编写服务器

定义服务器 MyLineBasedFrameDecoderServer代码如下:

publicclassMyLineBasedFrameDecoderServer{publicstaticint DEFAULT_PORT =8023;publicstaticvoidmain(String[] args)throwsException{int port = DEFAULT_PORT;// 多线程事件循环器EventLoopGroup bossGroup =newNioEventLoopGroup(1);// bossEventLoopGroup workerGroup =newNioEventLoopGroup();// workertry{// 启动NIO服务的引导程序类ServerBootstrap b =newServerBootstrap();
            b.group(bossGroup, workerGroup)// 设置EventLoopGroup.channel(NioServerSocketChannel.class)// 指明新的Channel的类型.childHandler(newMyLineBasedFrameDecoderChannelInitializer())// 指定ChannelHandler.option(ChannelOption.SO_BACKLOG,128)// 设置的ServerChannel的一些选项.childOption(ChannelOption.SO_KEEPALIVE,true);// 设置的ServerChannel的子Channel的选项// 绑定端口,开始接收进来的连接ChannelFuture f = b.bind(port).sync();System.out.println("MyLineBasedFrameDecoderServer已启动,端口:"+ port);// 等待服务器 socket 关闭 。// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            f.channel().closeFuture().sync();}finally{// 优雅的关闭
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();}}}

MyLineBasedFrameDecoderServer 中唯一需要注意的是,在 ServerBootstrap 中指定MyLineBasedFrameDecoderChannelInitializer,这样服务器就能应用咱们自定义的编码器和ChannelHandler了。

1.6 编写客户端

为了测试服务器,编写了一个简单的 TCP 客户端,代码如下:

publicclassTcpClient{publicstaticvoidmain(String[] args)throwsIOException{Socket socket =null;OutputStream out =null;try{
            socket =newSocket("localhost",8023);
            out = socket.getOutputStream();// 请求服务器  String lines ="床前明月光\r\n疑是地上霜\r\n举头望明月\r\n低头思故乡\r\n";byte[] outputBytes = lines.getBytes("UTF-8");
            out.write(outputBytes);
            out.flush();}finally{// 关闭连接  
            out.close();
            socket.close();}}}

上述客户端在启动后会发送一段文本,而后关闭连接。该文本每句用回车换行符\r\n结尾,这样服务器就能一句一句地解析了。

1.7 测试

先启动服务器,观察控制台,可以看到如下输出的内容:

MyLineBasedFrameDecoderServer已启动,端口:8023

然后启动客户端。启动完成之后,再次观察服务器的控制台,可以看到如下输出内容:

MyLineBasedFrameDecoderServer已启动,端口:8023Client->Server: 床前明月光
Client->Server: 疑是地上霜
Client->Server: 举头望明月
Client->Server: 低头思故乡

上述的输出内容说明,MyLineBasedFrameDecoderServerHandler接收到了 4 条数据。那么为啥客户端发送了 1 条数据,到这里就变成了 4 条了呢?这是因为在前面介绍的MyLineBasedFrameDecoderChannelInitializer中,MyLineBasedFrameDecoder先被添加到ChannelPipeline,然后才添加到MyLineBasedFrameDecoderServerHandler,意味着数据先经过解码,再交给MyLineBasedFrameDecoderServerHandler处理,而在数据解码过程中,MyLineBasedFrameDecoderServerHandler是按照换行解码的,而客户端所发送的数据里面又包含了 4 个回车换行符,因此,数据被解码为了 4 条。

二、自定义编码器

2.1 定义消息通信协议

消息通信协议是连接客户端和服务器的密语,只有熟知双方的通信协议,客户端和服务器才能正常识别消息的内容。常见的消息通信协议有 HTTP、MQTT、XMPP、STOMP、AMQP和 RTMP等。

下面展示了消息通信协议的内容格式:
类型名称字节序列取值范围备注消息头msgType00x00-0xff消息类型消息头len1-40-2147483647消息体长度消息体body变长0-消息体
从上述协议中可以看出,消息主要是由消息头和消息体组成,并说明如下:

  • msgType 表示消息的类型。在本节示例中,请求用EMGW_LOGIN_REQ(0x00),响应用EMGW_LOGIN_RES(0x01)表示。
  • len 表示消息体的长度。
  • body 表示消息体。

定义了如下MsgType枚举类型来表示消息类型:

publicenumMsgType{EMGW_LOGIN_REQ((byte)0x00),EMGW_LOGIN_RES((byte)0x01);privatebyte value;publicbytegetValue(){return value;}privateMsgType(byte value){this.value = value;}}

消息头类 MsgHeader定义如下:

publicclassMsgHeader{privatebyte msgType;// 消息类型privateint len;// 长度publicMsgHeader(){}publicMsgHeader(byte msgType,int len){this.msgType = msgType;this.len = len;}publicbytegetMsgType(){return msgType;}publicvoidsetMsgType(byte msgType){this.msgType = msgType;}publicintgetLen(){return len;}publicvoidsetLen(int len){this.len = len;}}

消息类 Msg 定义如下:

publicclassMsg{privateMsgHeader msgHeader =newMsgHeader();privateString body;publicMsgHeadergetMsgHeader(){return msgHeader;}publicvoidsetMsgHeader(MsgHeader msgHeader){this.msgHeader = msgHeader;}publicStringgetBody(){return body;}publicvoidsetBody(String body){this.body = body;}}

2.2 定义编码器

publicclassMyEncoderextendsMessageToByteEncoder<Msg>{@Overrideprotectedvoidencode(ChannelHandlerContext ctx,Msg msg,ByteBuf out)throwsException{if(msg ==null| msg.getMsgHeader()==null){thrownewException("The encode message is null");}// 获取消息头MsgHeader header = msg.getMsgHeader();// 获取消息体String body = msg.getBody();byte[] bodyBytes = body.getBytes(Charset.forName("utf-8"));// 计算消息体的长度int bodySize = bodyBytes.length;System.out.printf("MyEncoder header: %s, body: %s", header.getMsgType(), body);

        out.writeByte(MsgType.EMGW_LOGIN_RES.getValue());
        out.writeInt(bodySize);
        out.writeBytes(bodyBytes);}}

MyEncoder会将 Msg 消息转为 ByteBuf 类型。

2.3 定义解码器

publicclassMyDecoderextendsLengthFieldBasedFrameDecoder{privatestaticfinalint MAX_FRAME_LENGTH =1024*1024;privatestaticfinalint LENGTH_FIELD_LENGTH =4;privatestaticfinalint LENGTH_FIELD_OFFSET =1;privatestaticfinalint LENGTH_ADJUSTMENT =0;privatestaticfinalint INITIAL_BYTES_TO_STRIP =0;privatestaticfinalint HEADER_SIZE =5;privatebyte msgType;// 消息类型privateint len;// 长度publicMyDecoder(){super(MAX_FRAME_LENGTH,
                LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH,
                LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);}@OverrideprotectedMsgdecode(ChannelHandlerContext ctx,ByteBuf in2)throwsException{ByteBuf in =(ByteBuf)super.decode(ctx, in2);if(in ==null){returnnull;}// 校验头长度if(in.readableBytes()< HEADER_SIZE){returnnull;}

        msgType = in.readByte();
        len = in.readInt();// 校验消息体长度if(in.readableBytes()< len){returnnull;}ByteBuf buf = in.readBytes(len);byte[] req =newbyte[buf.readableBytes()];
        buf.readBytes(req);String body =newString(req,"UTF-8");// ByteBuf转为Msg类型Msg msg =newMsg();MsgHeader msgHeader =newMsgHeader(msgType, len);
        msg.setBody(body);
        msg.setMsgHeader(msgHeader);return msg;}}

MyDecoder集成自 Netty 内嵌的解码器LengthFieldBasedFrameDecoder。LengthFieldBasedFrameDecoder是一种基于灵活长度的解码器。在数据包中,加了一个长度字段,保存上层包的长度。解码时,会按照这个长度,进行上层 ByteBuf 应用包的提取。其中,初始化LengthFieldBasedFrameDecoder时,需要指定以下参数:

  • maxFrameLength:发送数据包最大的长度。
  • lengthFieldOffset:长度域偏移量,指的是长度域位于整个数据包字节数组中的下标。
  • lengthFieldLength:长度域的字节长度。
  • lengthAdjustment:长度域的偏移量矫正。
  • initialBytesToStrip:丢弃的初始字节数。丢弃处于有效数据前面的字节数量。

2.4 定义服务器 ChannelHandler

publicclassMyServerHandlerextendsSimpleChannelInboundHandler<Object>{@OverrideprotectedvoidchannelRead0(ChannelHandlerContext ctx,Object obj)throwsException{Channel incoming = ctx.channel();if(obj instanceofMsg){Msg msg =(Msg) obj;System.out.println("Client->Server:"+ incoming.remoteAddress()+ msg.getBody());
            incoming.write(obj);}}@OverridepublicvoidchannelReadComplete(ChannelHandlerContext ctx)throwsException{
        ctx.flush();}}

MyServerHandler逻辑比较简单,只是把收到的消息内容打印出来。

2.5 定义客户端 ChannelHandler

publicclassMyClientHandlerextendsSimpleChannelInboundHandler<Object>{@OverrideprotectedvoidchannelRead0(ChannelHandlerContext ctx,Object obj)throwsException{Channel incoming = ctx.channel();if(obj instanceofMsg){Msg msg =(Msg) obj;System.out.println("Server->Client:"+ incoming.remoteAddress()+ msg.getBody());}else{System.out.println("Server->Client:"+ incoming.remoteAddress()+ obj.toString());}}}

MyClientHandler逻辑比较简单,只是把收到的消息内容打印出来。

2.6定义服务器的主程序

publicclassMyServer{privateint port;publicMyServer(int port){this.port = port;}publicvoidrun()throwsException{EventLoopGroup bossGroup =newNioEventLoopGroup();EventLoopGroup workerGroup =newNioEventLoopGroup();try{ServerBootstrap b =newServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(newChannelInitializer<SocketChannel>(){@OverridepublicvoidinitChannel(SocketChannel ch)throwsException{
                            ch.pipeline().addLast("decoder",newMyDecoder());
                            ch.pipeline().addLast("encoder",newMyEncoder());
                            ch.pipeline().addLast(newMyServerHandler());}}).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true);ChannelFuture f = b.bind(port).sync();System.out.println("Server start listen at "+ port);

            f.channel().closeFuture().sync();}finally{
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();}}publicstaticvoidmain(String[] args)throwsException{int port;if(args.length >0){
            port =Integer.parseInt(args[0]);}else{
            port =8082;}newMyServer(port).run();}}

注意添加到ChannelPipeline的ChannelHandler的顺序,MyDecoder在前,MyEncoder在后,业务处理的MyServerHandler在最后。

2.7 定义客户端主程序

publicclassMyClient{privateString host;privateint port;publicMyClient(String host,int port){this.host = host;this.port = port;}publicvoidrun()throwsInterruptedException{EventLoopGroup workerGroup =newNioEventLoopGroup();try{Bootstrap b =newBootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE,true);
            b.handler(newChannelInitializer<SocketChannel>(){@OverridepublicvoidinitChannel(SocketChannel ch)throwsException{
                    ch.pipeline().addLast("decoder",newMyDecoder());
                    ch.pipeline().addLast("encoder",newMyEncoder());
                    ch.pipeline().addLast(newMyClientHandler());}});// 启动客户端ChannelFuture f = b.connect(host, port).sync();while(true){// 发送消息给服务器Msg msg =newMsg();MsgHeader msgHeader =newMsgHeader();
                msgHeader.setMsgType(MsgType.EMGW_LOGIN_REQ.getValue());String body ="床前明月光,疑是地上霜。";byte[] bodyBytes = body.getBytes(Charset.forName("utf-8"));int bodySize = bodyBytes.length;
                msgHeader.setLen(bodySize);
                msg.setMsgHeader(msgHeader);
                msg.setBody(body);

                f.channel().writeAndFlush(msg);Thread.sleep(2000);}}finally{
            workerGroup.shutdownGracefully();}}publicstaticvoidmain(String[] args)throwsInterruptedException{newMyClient("localhost",8082).run();}}

注意添加到ChannelPipeline的ChannelHandler的顺序,MyDecoder在前,MyEncoder在后,业务处理的MyClientHandler在最后。
上述的客户端程序,会每隔 2 秒给服务器发送一条消息。

2.8 测试

分别运行服务器和客户端程序。
客户端输出如下:
arduino复制代码MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
MyEncoder header: 0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。

服务端输出如下:
arduino复制代码Server start listen at 8082
Client->Server:/127.0.0.1:62927床前明月光,疑是地上霜。
MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:62927床前明月光,疑是地上霜。
MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:62927床前明月光,疑是地上霜。
MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:62927床前明月光,疑是地上霜。
MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:62927床前明月光,疑是地上霜。
MyEncoder header: 1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:62927床前明月光,疑是地上霜。

三、自定义编解码器

前面我们实现了编码器 MyEncoder 和 解码器 MyDecoder。这些代码无须做任何改动。

3.1 自定义编解码器

使用CombinedChannelDuplexHandler 类对编码器 MyEncoder 和 解码器 MyDecoder进行组合。代码如下:

publicclassMyCodecextendsCombinedChannelDuplexHandler<MyDecoder,MyEncoder>{publicMyCodec(){super(newMyDecoder(),newMyEncoder());}}

3.2 使用编解码器

分别修改 MyServer 和 MyClient 类,添加编解码器,修改代码如下:

// 添加编解码器
ch.pipeline().addLast("codec",newMyCodec());

上述代码将原来的 MyEncoder 和 MyDecoder从ChannelPipeline中剔除掉了,取而代之是MyEncoder。

3.3 测试

分别运行服务器和客户端。
客户端输出如下:

arduino复制代码MyEncoder header:0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
MyEncoder header:0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
MyEncoder header:0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
MyEncoder header:0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。
MyEncoder header:0, body: 床前明月光,疑是地上霜。Server->Client:localhost/127.0.0.1:8082床前明月光,疑是地上霜。

服务端输出如下:

Server start listen at 8082Client->Server:/127.0.0.1:56181床前明月光,疑是地上霜。
MyEncoder header:1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:56181床前明月光,疑是地上霜。
MyEncoder header:1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:56181床前明月光,疑是地上霜。
MyEncoder header:1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:56181床前明月光,疑是地上霜。
MyEncoder header:1, body: 床前明月光,疑是地上霜。Client->Server:/127.0.0.1:56181床前明月光,疑是地上霜。
MyEncoder header:1, body: 床前明月光,疑是地上霜。

总结

以上就是关于一些自定义的编码器、解码器、编解码器的示例应用,我们下节继续深入 Netty 源码。

标签: java 前端 算法

本文转载自: https://blog.csdn.net/u011397981/article/details/130906350
版权归原作者 逆流°只是风景-bjhxcc 所有, 如有侵权,请联系我们删除。

“【Netty】自定义解码器、编码器、编解码器(十五)”的评论:

还没有评论