五、TransportChannelHandler解析
TransportChannelHandler 继承了 ChannelInboundHandlerAdapter ,ChannelInboundHandlerAdapter 实现了 Netty 的ChannelInboundHandler,在 Netty 框架中会使用 工作链模式 来对每个 ChannelInboundHandler 的实现类的 channelRead 方法进行链式调用。
工作链模式/责任链模式:责任链模式通过将多个处理请求的对象组成一条链,使请求在链上传递,直到有一个对象处理它为止。每个处理对象都负责判断自己能否处理该请求,如果可以则进行处理,否则将请求传递给下一个处理对象。这样,请求发送者无需知道具体的处理对象,只需将请求发送到责任链上即可。
责任链模式包含以下角色:
- 抽象处理者(Handler):定义一个处理请求的接口,并持有下一个处理者的引用。
- 具体处理者(Concrete Handler):实现抽象处理者的接口,在处理请求前判断自己是否能够处理该请求,如果可以则进行处理,否则将请求传递给下一个处理者。
通过责任链模式,我们可以动态地组合处理对象,灵活地配置处理流程,这种解耦使得系统更加灵活和可扩展
publicvoidchannelRead(ChannelHandlerContext ctx,Object request)throwsException{// 如果读取到的 request 是 RequestMessage,将此消息的处理交给 TransportRequestHandlerif(request instanceofRequestMessage){
requestHandler.handle((RequestMessage) request);// 如果读取到的 request 是 responseMessage,将此消息的处理交给 TransportResponseHandler}elseif(request instanceofResponseMessage){
responseHandler.handle((ResponseMessage) request);}else{
ctx.fireChannelRead(request);}}
从上面的代码中可以看到有比较关键的两类:Handler 和 Message
其中 Handler 具体为 TransportRequestHandler 与TransportResponseHandler ,他们都继承自抽象类 MessageHandler,MessageHandler 中定义了一些子类需要实现的方法
publicabstractclassMessageHandler<TextendsMessage>{/** 用于对接收到的单个小心进行处理*/publicabstractvoidhandle(T message)throwsException;/** 当 channel 激活时调用 */publicabstractvoidchannelActive();/** 当捕获到 channel 发生异常时调用 */publicabstractvoidexceptionCaught(Throwable cause);/** 当 channel 非激活时调用 */publicabstractvoidchannelInactive();}
六、Message解析
而Message具体为 RequestMessage 和 ResponseMessage 这两个接口,他们都派生自接口 Message,Message 接口即成了 Encodable,先对 Encodable 的定义进行查看
publicinterfaceEncodable{// 用于返回转换的对象数量intencodedLength();// 可以转换到一个 ByteBuf 中,多个对象将被存储到预先分配的 ByteBufvoidencode(ByteBuf buf);}
接下来再对 Message 接口进行查看
publicinterfaceMessageextendsEncodable{/** 返回消息的类型 */Typetype();/** 返回消息中可选的内容体*/ManagedBufferbody();/** 用于判断消息的主体是否包含在消息的同一帧中 */booleanisBodyInFrame();/**......*/}
最终的消息实现类都直接或间接的实现了 RequestMessage 和 ResponseMessage 接口
其中 RequestMessage 的具体实现有五种:
- ChunkFetchRequest : 请求获取流的单个块的序列
- RpcRequest:一种需要远程的RPC服务端处理并需要向客户端回复的RPC请求信息类型
- OneWayMessage:一种需要远程的RPC服务端处理但不需要向客户端回复的RPC请求信息类型
- StreamRequest:向远程的服务发起请求,以获取流式数据
- UploadStream:在帧外发送的一种带有数据的RPC消息,可以按照流式数据获取
ResponseMessage 无需对 OneWayMessage 和 UploadStream 进行响应,所以总共有3类实现,每类有成功和失败两种,总共六种实现
- ChunkFetchSuccess:处理 ChunkFetchRequest 成功后返回的消息
- ChunkFetchFailure:处理 ChunkFetchRequest 失败后返回的消息
- RpcResponse : 处理 RpcRequest 成功后返回的消息
- RpcFailure:处理 RpcRequest 失败后返回的消息
- StreamResponse : 处理 StreamRequest 成功后返回的消息
- StreamFailure:处理 StreamRequest 失败后返回的消息
在Message接口中可以看到对于body方法的返回是 ManagedBuffer,ManagedBuffer 提供了由字节构成数据的不可变视图,类似关系型数据库的视图,不存储数据,也不是数据的实际来源,ManagedBuffer 抽象类的定义如下
publicabstractclassManagedBuffer{/** 返回数据的字节数*/publicabstractlongsize();/** 将数据按照NIO的 ByteBuffer 类型返回*/publicabstractByteBuffernioByteBuffer()throwsIOException;/** 将数据按照InputStream返回*/publicabstractInputStreamcreateInputStream()throwsIOException;/** 当有新的使用者使用此视图时,增加引用此视图的引用数*/publicabstractManagedBufferretain();/** 当有使用者不再使用此视图时,减少引用此视图的引用数*/publicabstractManagedBufferrelease();/** 将缓冲区的数据转换为Netty的对象,用来将数据写到外部。此方法返回的数据类型要么是 io.netty.buffer.ByteBuf,要么是 io.netty.channel.FileRegion*/publicabstractObjectconvertToNetty()throwsIOException;}
ManagedBuffer 实际用于处理逻辑的具体实现类有三种,分别是NettyManagedBuffer、NioManagedBuffer 以及FileSegmentManagedBuffer,其中 NettyManagedBuffer 使用 io.netty.buffer.ByteBuf 作为缓冲,NioManagedBuffer 则使用了 java.nio.ByteBuffer 作为了缓冲,这两种实现都仅仅是将拿到的 buf 数据进行了 duplicate 或者 nioBuffer 转换,并没有涉及到很复杂的其他逻辑处理
FileSegmentManagedBuffer 的作用为获取一个文件中的一段,如下结合代码进行理解
publicfinalclassFileSegmentManagedBufferextendsManagedBuffer{// 也就是 TransportConfprivatefinalTransportConf conf;// 所要读取的文件privatefinalFile file;// 所要读取文件的偏移量privatefinallong offset;// 所要读取的长度privatefinallong length;/**......*//** NIO 方式读取文件*/@OverridepublicByteBuffernioByteBuffer()throwsIOException{FileChannel channel =null;try{// 使用 RandomAccessFile 获取 FileChannel
channel =newRandomAccessFile(file,"r").getChannel();// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.if(length < conf.memoryMapBytes()){ByteBuffer buf =ByteBuffer.allocate((int) length);
channel.position(offset);while(buf.remaining()!=0){if(channel.read(buf)==-1){thrownewIOException(String.format("Reached EOF before filling buffer\n"+"offset=%s\nfile=%s\nbuf.remaining=%s",
offset, file.getAbsoluteFile(), buf.remaining()));}}// 通过 ByteBuffer 和 FileChannel 的API将数据写入缓冲区 ByteBuffer 中
buf.flip();return buf;}else{return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);}}catch(IOException e){String errorMessage ="Error in reading "+this;try{if(channel !=null){long size = channel.size();
errorMessage ="Error in reading "+this+" (actual file length "+ size +")";}}catch(IOException ignored){// ignore}thrownewIOException(errorMessage, e);}finally{JavaUtils.closeQuietly(channel);}}/** 文件流方式读取文件*/@OverridepublicInputStreamcreateInputStream()throwsIOException{FileInputStream is =null;boolean shouldClose =true;try{
is =newFileInputStream(file);ByteStreams.skipFully(is, offset);InputStream r =newLimitedInputStream(is, length);
shouldClose =false;return r;}catch(IOException e){String errorMessage ="Error in reading "+this;if(is !=null){long size = file.length();
errorMessage ="Error in reading "+this+" (actual file length "+ size +")";}thrownewIOException(errorMessage, e);}finally{if(shouldClose){JavaUtils.closeQuietly(is);}}}/**......*//** 将数据转换为 Netty 对象*/@OverridepublicObjectconvertToNetty()throwsIOException{if(conf.lazyFileDescriptor()){returnnewDefaultFileRegion(file, offset, length);}else{FileChannel fileChannel =FileChannel.open(file.toPath(),StandardOpenOption.READ);returnnewDefaultFileRegion(fileChannel, offset, length);}}/**......*/}
七、RpcHandler解析
RpcHandler是一个抽象类,定义了一些RPC处理器的规范,TransportRequestHandler 实际上是把请求消息交给 RpcHandler 进行进一步处理
publicabstractclassRpcHandler{privatestaticfinalRpcResponseCallbackONE_WAY_CALLBACK=newOneWayRpcCallback();/** 用于接收单一的RPC消息,是抽象方法,具体逻辑需要子类实现,接收三个参数,其中 RpcResponseCallback 用于对请求处理结束后进行回调,无论处理结果是成功还是失败,RpcResponseCallback 都会被调用一次*/publicabstractvoidreceive(TransportClient client,ByteBuffer message,RpcResponseCallback callback);/** 用于接收流数据的处理,接收三个参数,其中 messageHeader 是包含有当前流数据的头信息(元数据),用于指向实际数据位置,RpcResponseCallback类似 receive,返回的是 StreamCallbackWithID 实际是个接口,继承于 StreamCallback,定义了处理流数据的方法,一般会在实际处理时通过匿名内部类的方式去实现这些方法*/publicStreamCallbackWithIDreceiveStream(TransportClient client,ByteBuffer messageHeader,RpcResponseCallback callback){thrownewUnsupportedOperationException();}// 获取 StreamManager,StreamManager可以从流中获取单个的块,因此它也包含着当前正在被 TransportClient 获取的流的状态publicabstractStreamManagergetStreamManager();// 对上面的 receive 进行重载,callback部分被替换为了 ONE_WAY_CALLBACK,类型是 OneWayRpcCallback,这个回调的逻辑中 onSuccess 和 onFailure 都只是打印日志,并没有针对客户端做回复处理publicvoidreceive(TransportClient client,ByteBuffer message){receive(client, message,ONE_WAY_CALLBACK);}// 当与给客户端相关联的 channel 处于活动状态时调用publicvoidchannelActive(TransportClient client){}// 当与给客户端相关联的 channel 处于非活动状态时调用publicvoidchannelInactive(TransportClient client){}// 当 channel 产生异常时调用publicvoidexceptionCaught(Throwable cause,TransportClient client){}privatestaticclassOneWayRpcCallbackimplementsRpcResponseCallback{privatestaticfinalLogger logger =LoggerFactory.getLogger(OneWayRpcCallback.class);@OverridepublicvoidonSuccess(ByteBuffer response){
logger.warn("Response provided for one-way RPC.");}@OverridepublicvoidonFailure(Throwable e){
logger.error("Error response provided for one-way RPC.", e);}}}
八、TransportRequestHandler解析
TransportRequestHandler 处理 5 种 RequestMessage的实现如下:
@Overridepublicvoidhandle(RequestMessage request){if(request instanceofChunkFetchRequest){processFetchRequest((ChunkFetchRequest) request);}elseif(request instanceofRpcRequest){processRpcRequest((RpcRequest) request);}elseif(request instanceofOneWayMessage){processOneWayMessage((OneWayMessage) request);}elseif(request instanceofStreamRequest){processStreamRequest((StreamRequest) request);}elseif(request instanceofUploadStream){processStreamUpload((UploadStream) request);}else{thrownewIllegalArgumentException("Unknown request type: "+ request);}}
- 处理块获取请求:processFetchRequest 方法用于处理 ChunkFetchRequest 类型的消息
privatevoidprocessFetchRequest(finalChunkFetchRequest req){if(logger.isTraceEnabled()){ logger.trace("Received req from {} to fetch block {}",getRemoteAddress(channel), req.streamChunkId);}// streamManager 是来源于上面的代码 this.streamManager = rpcHandler.getStreamManager();// 是通过调用 RpcHandler 的 getStreamManager 方法获取到的,long chunksBeingTransferred = streamManager.chunksBeingTransferred();if(chunksBeingTransferred >= maxChunksBeingTransferred){ logger.warn("The number of chunks being transferred {} is above {}, close the connection.", chunksBeingTransferred, maxChunksBeingTransferred); channel.close();return;}ManagedBuffer buf;try{// 第一步:校验客户端是否有权限从给定的流中读取 streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);// 第二步:获取单个的块(块被封装为 ManagedBuffer) buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);}catch(Exception e){ logger.error(String.format("Error opening block %s for request from %s", req.streamChunkId,getRemoteAddress(channel)), e);respond(newChunkFetchFailure(req.streamChunkId,Throwables.getStackTraceAsString(e)));return;}// 第三部:将 ManagedBuffer 和流的块Id封装为 ChunkFetchSuccess 后,返回给客户端 streamManager.chunkBeingSent(req.streamChunkId.streamId);respond(newChunkFetchSuccess(req.streamChunkId, buf)).addListener(future ->{ streamManager.chunkSent(req.streamChunkId.streamId);});}
- 处理RPC请求:processRpcRequest 方法用于处理 RpcRequest 类型的消息
privatevoidprocessRpcRequest(finalRpcRequest req){try{// 将RpcRequest 消息的内容体、发送消息的客户端及一个RpcResponseCallback 类型的匿名内部类作为参数传递给了RpcHandler 的receive 方法。这就是说 真正用于处理RpcRequest 消息的是RpcHandler,而非 TransportRequestHiandler,并且在 成功或失败时分别调用了 onSuccess 和 onFailure rpcHandler.receive(reverseClient, req.body().nioByteBuffer(),newRpcResponseCallback(){@OverridepublicvoidonSuccess(ByteBuffer response){respond(newRpcResponse(req.requestId,newNioManagedBuffer(response)));}@OverridepublicvoidonFailure(Throwable e){respond(newRpcFailure(req.requestId,Throwables.getStackTraceAsString(e)));}});}catch(Exception e){ logger.error("Error while invoking RpcHandler#receive() on RPC id "+ req.requestId, e);respond(newRpcFailure(req.requestId,Throwables.getStackTraceAsString(e)));}finally{ req.body().release();}}
- 处理流请求:processStreamRequest 方法用于处理StreamRequest 类型的消息
privatevoidprocessStreamRequest(finalStreamRequest req){if(logger.isTraceEnabled()){ logger.trace("Received req from {} to fetch stream {}",getRemoteAddress(channel), req.streamId);}long chunksBeingTransferred = streamManager.chunksBeingTransferred();if(chunksBeingTransferred >= maxChunksBeingTransferred){ logger.warn("The number of chunks being transferred {} is above {}, close the connection.", chunksBeingTransferred, maxChunksBeingTransferred); channel.close();return;}ManagedBuffer buf;try{// 第一步:调用StreamManager 的 openStream 方法将获取到的流数据封装 ManagedBuffer buf = streamManager.openStream(req.streamId);}catch(Exception e){ logger.error(String.format("Error opening stream %s for request from %s", req.streamId,getRemoteAddress(channel)), e);respond(newStreamFailure(req.streamId,Throwables.getStackTraceAsString(e)));return;}// 当成功或失败时调用respond 方法向客户端响应if(buf !=null){ streamManager.streamBeingSent(req.streamId);respond(newStreamResponse(req.streamId, buf.size(), buf)).addListener(future ->{ streamManager.streamSent(req.streamId);});}else{respond(newStreamFailure(req.streamId,String.format("Stream '%s' was not found.", req.streamId)));}}
- 处理无需回复的RPC请求:processOneWayMessage 方法用于处理 StreamRequest 类型的消息
privatevoidprocessOneWayMessage(OneWayMessage req){try{// 这里的实现和 processRpcRequest 相似,区别在于这里调用了 RpcHandler 中重载的带有 ONE_WAY_CALLBACK 的 receive 方法,也就是没有callback回调,也就是不会做出响应 rpcHandler.receive(reverseClient, req.body().nioByteBuffer());}catch(Exception e){ logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);}finally{ req.body().release();}}
- 处理上传流的请求(流处理):processStreamUpload 方法用于处理 UploadStream 类型的消息
privatevoidprocessStreamUpload(finalUploadStream req){assert(req.body()==null);try{// 第一步:先构建好回调的对象,并处理成功和失败的情况RpcResponseCallback callback =newRpcResponseCallback(){@OverridepublicvoidonSuccess(ByteBuffer response){respond(newRpcResponse(req.requestId,newNioManagedBuffer(response)));}@OverridepublicvoidonFailure(Throwable e){respond(newRpcFailure(req.requestId,Throwables.getStackTraceAsString(e)));}};// 第二步;构建一个帧解码器TransportFrameDecoder frameDecoder =(TransportFrameDecoder) channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);// 第三步:获取到请求的消息头(元数据)的 ByteBufferByteBuffer meta = req.meta.nioByteBuffer();// 第四步:根据消息头开始获取流数据StreamCallbackWithID streamHandler = rpcHandler.receiveStream(reverseClient, meta, callback);if(streamHandler ==null){thrownewNullPointerException("rpcHandler returned a null streamHandler");}// 第五步:这里使用streamhandler来处理了当前id和对应的buf数据,并且定义了处理数据,完成,失败等情况下的处理方式StreamCallbackWithID wrappedCallback =newStreamCallbackWithID(){@OverridepublicvoidonData(String streamId,ByteBuffer buf)throwsIOException{ streamHandler.onData(streamId, buf);}@OverridepublicvoidonComplete(String streamId)throwsIOException{try{ streamHandler.onComplete(streamId); callback.onSuccess(ByteBuffer.allocate(0));}catch(Exception ex){IOException ioExc =newIOException("Failure post-processing complete stream;"+" failing this rpc and leaving channel active", ex); callback.onFailure(ioExc); streamHandler.onFailure(streamId, ioExc);}}@OverridepublicvoidonFailure(String streamId,Throwable cause)throwsIOException{ callback.onFailure(newIOException("Destination failed while reading stream", cause)); streamHandler.onFailure(streamId, cause);}@OverridepublicStringgetID(){return streamHandler.getID();}};// 第六步:这里对有数据的流数据请求,通过拦截器的方式,使用解码器进行了解码if(req.bodyByteCount >0){StreamInterceptor<RequestMessage> interceptor =newStreamInterceptor<>(this, wrappedCallback.getID(), req.bodyByteCount, wrappedCallback); frameDecoder.setInterceptor(interceptor);}else{ wrappedCallback.onComplete(wrappedCallback.getID());}}catch(Exception e){ logger.error("Error while invoking RpcHandler#receive() on RPC id "+ req.requestId, e);respond(newRpcFailure(req.requestId,Throwables.getStackTraceAsString(e)));// We choose to totally fail the channel, rather than trying to recover as we do in other// cases. We don't know how many bytes of the stream the client has already sent for the// stream, it's not worth trying to recover. channel.pipeline().fireExceptionCaught(e);}finally{ req.meta.release();}}
可以看到上面几个方法最后都调用了 respond方法响应客户端,其代码实现如下
privateChannelFuturerespond(Encodable result){SocketAddress remoteAddress = channel.remoteAddress();// 实际调用了 Netty 中 Channel 的 writeAndFlush 方法进行了响应return channel.writeAndFlush(result).addListener(future ->{if(future.isSuccess()){
logger.trace("Sent result {} to client {}", result, remoteAddress);}else{
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
channel.close();}});}
九、TransportServerBootstrap解析
TransportServerBootstrap 的列表 bootstraps 出现在 TransportServer 的构造器中,接口 TransportServerBootstrap 定义了服务端引导程序的规范,服务端引导程序是当客户端和服务端建立连接后,在服务端持有的客户端管道上执行的引导程序
publicinterfaceTransportServerBootstrap{// TransportServerBootstrap 的实现类的 doBootstrap 将对服务端的 RpcHandler 进行代理,接收客户端的请求RpcHandlerdoBootstrap(Channel channel,RpcHandler rpcHandler);}
TransportServerBootstrap 有三个实现类,分别是 AuthServerBootstrap、EncryptionCheckerBootstrap 和 SaslServerBootstrap
- AuthServerBootstrap 使用的是 Spark 自己的协议来做验证,如果客户端不支持,那么就会变为使用 SaslServerBootstrap 进行认证交换和执行引导程序
- EncryptionCheckerBootstrap 通过将自身加入Netty的管道中实现引导
- SaslServerBootstrap 则是使用 SASL 进行认证交换
下面就以 SaslServerBootstrap 来分析具体是如何实现的
publicRpcHandlerdoBootstrap(Channel channel,RpcHandler rpcHandler){returnnewSaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);}
SaslServerBootstrap 的 doBootstrap 方法司机创建了 SaslRpcHandler,SaslRpcHandler 负责对管道进行 SASL 认证交换,SaslRpcHandler 本身也继承了 AbstractAuthRpcHandler 抽象类,AbstractAuthRpcHandler 继承了 RpcHandler。
AbstractAuthRpcHandler 是三种实现类的父抽象类,同时也是在这个类中对于不同情况下的验证加密情况进行判断
- 当处理的是一般带有回调的非流请求时,如果是验证过了,那么进行receive,没有的话,就调用三种实现下各自的 doAuthChallenge 方法进行验证和加密
- 当处理的是没有回调的一次性请求 或者 带有回调的流式数据请求 时,如果是验证过了,那么进行receive,没有的话,直接抛出异常
可参考类似 TransportRequestHandler 中的各种 Request 和 processXXX 方法来结合理解
下面对于 SaslRpcHandler 中的 doAuthChallenge 分析
@OverridepublicbooleandoAuthChallenge(TransportClient client,ByteBuffer message,RpcResponseCallback callback){// 如果 SASL 认证交换未完成或者还没有 saslServer时if(saslServer ==null||!saslServer.isComplete()){ByteBuf nettyBuf =Unpooled.wrappedBuffer(message);SaslMessage saslMessage;try{// 首先对发送过来的消息进行解码
saslMessage =SaslMessage.decode(nettyBuf);}finally{
nettyBuf.release();}if(saslServer ==null){// 当接收到客户端的第一条消息的时候就会做这个操作
client.setClientId(saslMessage.appId);// 创建一个 SparkSaslServer
saslServer =newSparkSaslServer(saslMessage.appId, secretKeyHolder,
conf.saslServerAlwaysEncrypt());}byte[] response;try{// 这里处理已解密的消息并且构建响应
response = saslServer.response(JavaUtils.bufferToArray(
saslMessage.body().nioByteBuffer()));}catch(IOException ioe){thrownewRuntimeException(ioe);}// 通过 RpcResponseCallback 的回调方法返回给客户端
callback.onSuccess(ByteBuffer.wrap(response));}// 如果 SALSL 认证交换已经是完成状态 if(saslServer.isComplete()){if(!SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))){
logger.debug("SASL authentication successful for channel {}", client);// SASL认证交换已完成complete(true);returntrue;}
logger.debug("Enabling encryption for channel {}", client);// 对管道进行加密SaslEncryption.addToChannel(channel, saslServer, conf.maxSaslEncryptedBlockSize());complete(false);returntrue;}returnfalse;}
TransportServerBootstrap 在整个过程中起到的是 引导、包装、代理的作用
十、TransportClient解析
TransportClient 作为客户端发送RPC请求,一种有5个方法用于发送请求
- fetchChunk : 从远端协商好的流中请求单个块
- stream : 使用流的 ID,从远端获取流数据
- sendRpc : 向服务端发送 RPC 的请求,通过 At least Once Delivery 原则保证请求不会丢失
- sendRpcSync : 向服务端发送异步的 RPC 的请求,并根据指定的超时时间等待响应
- send : 向服务端发送 RPC 的请求,但是并不期望能获取响应,因为不能保证投递的可靠性
这里对于 sendRpc 方法进行分析,fetchChunk 方法只是将 requestId 变为了 streamId 以及对应的 chunkIndex 来对当前流数据进行定位和绑定,实际处理流程大同小异
sendRpc 发送Rpc请求的方法的实现如下代码:
publiclongsendRpc(ByteBuffer message,RpcResponseCallback callback){if(logger.isTraceEnabled()){
logger.trace("Sending RPC to {}",getRemoteAddress(channel));}// 通过 requestId 方法可以知道,这里其实是使用了 UUID 生成了请求主键long requestId =requestId();// 添加 requestId 与 RpcResponseCallback 的引用之间的关系,通过下方的方法可以知道,这里先将更新最后一次请求的时间为当前系统时间,然后将 requestId 与 RpcResponseCallback 之间的映射加入到 outstandingRpcs 缓存中,outstandingRpcs 专门用于缓存发出的 RPC 请求信息
handler.addRpcRequest(requestId, callback);RpcChannelListener listener =newRpcChannelListener(requestId, callback);// 发送 RPC 请求,当发送成功或者失败的时候会回调 ChannelFutureListener 的 operationComplete 方法。// 如果发送成功,那么只会打印 requestId、远端地址及花费时间的日志// 如果发送失败,除了打印错误日志外,还要调用 TransportResponseHandler 的 removePrcRequest 方法,将此次请求从 outstandingRpcs 缓存中移除
channel.writeAndFlush(newRpcRequest(requestId,newNioManagedBuffer(message))).addListener(listener);return requestId;}
privatestaticlongrequestId(){returnMath.abs(UUID.randomUUID().getLeastSignificantBits());}
publicvoidaddRpcRequest(long requestId,RpcResponseCallback callback){updateTimeOfLastRequest();
outstandingRpcs.put(requestId, callback);}
当上面的请求发送成功后,客户端会等到接收服务端的响应,返回的消息会传递到 TransportChannelHandler 的 channelRead 方法,最后会交给 TransportResponseHandler 的 handle 方法来处理,TransportResponseHandler 的 handle 方法分别会处理 6 种 ResponseMessage,由于服务端使用 processRpcRequest 方法处理 RpcRequest 类型的消息后,返回给客户端的消息为 RpcRequest 或 RpcFailure,接下来对于 TransportResponseHandler 的handle方法进行如下分析:
elseif(message instanceofRpcResponse){RpcResponse resp =(RpcResponse) message;// 使用 RpcResponse 对应的 RpcRequest 的主键 requestId,从缓存中获取 RpcResponseCallbackRpcResponseCallback listener = outstandingRpcs.get(resp.requestId);if(listener ==null){
logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
resp.requestId,getRemoteAddress(channel), resp.body().size());// 如果没有对应的callback,则直接将响应释放
resp.body().release();}else{// 如果有对应的callback,移除 outstandingRpcs 缓存中 requestId 和 RpcResponseCallback 的注册消息
outstandingRpcs.remove(resp.requestId);try{// 调用 callback 中的 onSuccess,处理成功响应后的具体逻辑,这里的逻辑会在各个使用 TransportClient 的 sendRpc 方法的场景中分别实现
listener.onSuccess(resp.body().nioByteBuffer());}finally{// 最终将响应的body释放
resp.body().release();}}}elseif(message instanceofRpcFailure){RpcFailure resp =(RpcFailure) message;// 如 RpcSuccess 处理RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);if(listener ==null){
logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
resp.requestId,getRemoteAddress(channel), resp.errorString);}else{
outstandingRpcs.remove(resp.requestId);// 这里和 RpcSuccess,唯一的区别是调用的为 onFailure
listener.onFailure(newRuntimeException(resp.errorString));}}
了解了TransportClient之后可以对之前的图进行进一步的丰富
版权归原作者 Neil-Wick 所有, 如有侵权,请联系我们删除。