0


Spark-RPC框架解析之组件介绍及流程再探

五、TransportChannelHandler解析

TransportChannelHandler 继承了 ChannelInboundHandlerAdapter ,ChannelInboundHandlerAdapter 实现了 Netty 的ChannelInboundHandler,在 Netty 框架中会使用 工作链模式 来对每个 ChannelInboundHandler 的实现类的 channelRead 方法进行链式调用。

工作链模式/责任链模式:责任链模式通过将多个处理请求的对象组成一条链,使请求在链上传递,直到有一个对象处理它为止。每个处理对象都负责判断自己能否处理该请求,如果可以则进行处理,否则将请求传递给下一个处理对象。这样,请求发送者无需知道具体的处理对象,只需将请求发送到责任链上即可。

责任链模式包含以下角色:

  • 抽象处理者(Handler):定义一个处理请求的接口,并持有下一个处理者的引用。
  • 具体处理者(Concrete Handler):实现抽象处理者的接口,在处理请求前判断自己是否能够处理该请求,如果可以则进行处理,否则将请求传递给下一个处理者。

通过责任链模式,我们可以动态地组合处理对象,灵活地配置处理流程,这种解耦使得系统更加灵活和可扩展

  1. publicvoidchannelRead(ChannelHandlerContext ctx,Object request)throwsException{// 如果读取到的 request 是 RequestMessage,将此消息的处理交给 TransportRequestHandlerif(request instanceofRequestMessage){
  2. requestHandler.handle((RequestMessage) request);// 如果读取到的 request 是 responseMessage,将此消息的处理交给 TransportResponseHandler}elseif(request instanceofResponseMessage){
  3. responseHandler.handle((ResponseMessage) request);}else{
  4. ctx.fireChannelRead(request);}}

从上面的代码中可以看到有比较关键的两类:Handler 和 Message

其中 Handler 具体为 TransportRequestHandler 与TransportResponseHandler ,他们都继承自抽象类 MessageHandler,MessageHandler 中定义了一些子类需要实现的方法

  1. publicabstractclassMessageHandler<TextendsMessage>{/** 用于对接收到的单个小心进行处理*/publicabstractvoidhandle(T message)throwsException;/** 当 channel 激活时调用 */publicabstractvoidchannelActive();/** 当捕获到 channel 发生异常时调用 */publicabstractvoidexceptionCaught(Throwable cause);/** 当 channel 非激活时调用 */publicabstractvoidchannelInactive();}
六、Message解析

而Message具体为 RequestMessage 和 ResponseMessage 这两个接口,他们都派生自接口 Message,Message 接口即成了 Encodable,先对 Encodable 的定义进行查看

  1. publicinterfaceEncodable{// 用于返回转换的对象数量intencodedLength();// 可以转换到一个 ByteBuf 中,多个对象将被存储到预先分配的 ByteBufvoidencode(ByteBuf buf);}

接下来再对 Message 接口进行查看

  1. publicinterfaceMessageextendsEncodable{/** 返回消息的类型 */Typetype();/** 返回消息中可选的内容体*/ManagedBufferbody();/** 用于判断消息的主体是否包含在消息的同一帧中 */booleanisBodyInFrame();/**......*/}

最终的消息实现类都直接或间接的实现了 RequestMessage 和 ResponseMessage 接口

其中 RequestMessage 的具体实现有五种:

  • ChunkFetchRequest : 请求获取流的单个块的序列
  • RpcRequest:一种需要远程的RPC服务端处理并需要向客户端回复的RPC请求信息类型
  • OneWayMessage:一种需要远程的RPC服务端处理但不需要向客户端回复的RPC请求信息类型
  • StreamRequest:向远程的服务发起请求,以获取流式数据
  • UploadStream:在帧外发送的一种带有数据的RPC消息,可以按照流式数据获取

ResponseMessage 无需对 OneWayMessage 和 UploadStream 进行响应,所以总共有3类实现,每类有成功和失败两种,总共六种实现

  • ChunkFetchSuccess:处理 ChunkFetchRequest 成功后返回的消息
  • ChunkFetchFailure:处理 ChunkFetchRequest 失败后返回的消息
  • RpcResponse : 处理 RpcRequest 成功后返回的消息
  • RpcFailure:处理 RpcRequest 失败后返回的消息
  • StreamResponse : 处理 StreamRequest 成功后返回的消息
  • StreamFailure:处理 StreamRequest 失败后返回的消息

在Message接口中可以看到对于body方法的返回是 ManagedBuffer,ManagedBuffer 提供了由字节构成数据的不可变视图,类似关系型数据库的视图,不存储数据,也不是数据的实际来源,ManagedBuffer 抽象类的定义如下

  1. publicabstractclassManagedBuffer{/** 返回数据的字节数*/publicabstractlongsize();/** 将数据按照NIO的 ByteBuffer 类型返回*/publicabstractByteBuffernioByteBuffer()throwsIOException;/** 将数据按照InputStream返回*/publicabstractInputStreamcreateInputStream()throwsIOException;/** 当有新的使用者使用此视图时,增加引用此视图的引用数*/publicabstractManagedBufferretain();/** 当有使用者不再使用此视图时,减少引用此视图的引用数*/publicabstractManagedBufferrelease();/** 将缓冲区的数据转换为Netty的对象,用来将数据写到外部。此方法返回的数据类型要么是 io.netty.buffer.ByteBuf,要么是 io.netty.channel.FileRegion*/publicabstractObjectconvertToNetty()throwsIOException;}

ManagedBuffer 实际用于处理逻辑的具体实现类有三种,分别是NettyManagedBuffer、NioManagedBuffer 以及FileSegmentManagedBuffer,其中 NettyManagedBuffer 使用 io.netty.buffer.ByteBuf 作为缓冲,NioManagedBuffer 则使用了 java.nio.ByteBuffer 作为了缓冲,这两种实现都仅仅是将拿到的 buf 数据进行了 duplicate 或者 nioBuffer 转换,并没有涉及到很复杂的其他逻辑处理

FileSegmentManagedBuffer 的作用为获取一个文件中的一段,如下结合代码进行理解

  1. publicfinalclassFileSegmentManagedBufferextendsManagedBuffer{// 也就是 TransportConfprivatefinalTransportConf conf;// 所要读取的文件privatefinalFile file;// 所要读取文件的偏移量privatefinallong offset;// 所要读取的长度privatefinallong length;/**......*//** NIO 方式读取文件*/@OverridepublicByteBuffernioByteBuffer()throwsIOException{FileChannel channel =null;try{// 使用 RandomAccessFile 获取 FileChannel
  2. channel =newRandomAccessFile(file,"r").getChannel();// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.if(length < conf.memoryMapBytes()){ByteBuffer buf =ByteBuffer.allocate((int) length);
  3. channel.position(offset);while(buf.remaining()!=0){if(channel.read(buf)==-1){thrownewIOException(String.format("Reached EOF before filling buffer\n"+"offset=%s\nfile=%s\nbuf.remaining=%s",
  4. offset, file.getAbsoluteFile(), buf.remaining()));}}// 通过 ByteBuffer 和 FileChannel 的API将数据写入缓冲区 ByteBuffer 中
  5. buf.flip();return buf;}else{return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);}}catch(IOException e){String errorMessage ="Error in reading "+this;try{if(channel !=null){long size = channel.size();
  6. errorMessage ="Error in reading "+this+" (actual file length "+ size +")";}}catch(IOException ignored){// ignore}thrownewIOException(errorMessage, e);}finally{JavaUtils.closeQuietly(channel);}}/** 文件流方式读取文件*/@OverridepublicInputStreamcreateInputStream()throwsIOException{FileInputStream is =null;boolean shouldClose =true;try{
  7. is =newFileInputStream(file);ByteStreams.skipFully(is, offset);InputStream r =newLimitedInputStream(is, length);
  8. shouldClose =false;return r;}catch(IOException e){String errorMessage ="Error in reading "+this;if(is !=null){long size = file.length();
  9. errorMessage ="Error in reading "+this+" (actual file length "+ size +")";}thrownewIOException(errorMessage, e);}finally{if(shouldClose){JavaUtils.closeQuietly(is);}}}/**......*//** 将数据转换为 Netty 对象*/@OverridepublicObjectconvertToNetty()throwsIOException{if(conf.lazyFileDescriptor()){returnnewDefaultFileRegion(file, offset, length);}else{FileChannel fileChannel =FileChannel.open(file.toPath(),StandardOpenOption.READ);returnnewDefaultFileRegion(fileChannel, offset, length);}}/**......*/}
七、RpcHandler解析

RpcHandler是一个抽象类,定义了一些RPC处理器的规范,TransportRequestHandler 实际上是把请求消息交给 RpcHandler 进行进一步处理

  1. publicabstractclassRpcHandler{privatestaticfinalRpcResponseCallbackONE_WAY_CALLBACK=newOneWayRpcCallback();/** 用于接收单一的RPC消息,是抽象方法,具体逻辑需要子类实现,接收三个参数,其中 RpcResponseCallback 用于对请求处理结束后进行回调,无论处理结果是成功还是失败,RpcResponseCallback 都会被调用一次*/publicabstractvoidreceive(TransportClient client,ByteBuffer message,RpcResponseCallback callback);/** 用于接收流数据的处理,接收三个参数,其中 messageHeader 是包含有当前流数据的头信息(元数据),用于指向实际数据位置,RpcResponseCallback类似 receive,返回的是 StreamCallbackWithID 实际是个接口,继承于 StreamCallback,定义了处理流数据的方法,一般会在实际处理时通过匿名内部类的方式去实现这些方法*/publicStreamCallbackWithIDreceiveStream(TransportClient client,ByteBuffer messageHeader,RpcResponseCallback callback){thrownewUnsupportedOperationException();}// 获取 StreamManager,StreamManager可以从流中获取单个的块,因此它也包含着当前正在被 TransportClient 获取的流的状态publicabstractStreamManagergetStreamManager();// 对上面的 receive 进行重载,callback部分被替换为了 ONE_WAY_CALLBACK,类型是 OneWayRpcCallback,这个回调的逻辑中 onSuccess 和 onFailure 都只是打印日志,并没有针对客户端做回复处理publicvoidreceive(TransportClient client,ByteBuffer message){receive(client, message,ONE_WAY_CALLBACK);}// 当与给客户端相关联的 channel 处于活动状态时调用publicvoidchannelActive(TransportClient client){}// 当与给客户端相关联的 channel 处于非活动状态时调用publicvoidchannelInactive(TransportClient client){}// 当 channel 产生异常时调用publicvoidexceptionCaught(Throwable cause,TransportClient client){}privatestaticclassOneWayRpcCallbackimplementsRpcResponseCallback{privatestaticfinalLogger logger =LoggerFactory.getLogger(OneWayRpcCallback.class);@OverridepublicvoidonSuccess(ByteBuffer response){
  2. logger.warn("Response provided for one-way RPC.");}@OverridepublicvoidonFailure(Throwable e){
  3. logger.error("Error response provided for one-way RPC.", e);}}}
八、TransportRequestHandler解析

TransportRequestHandler 处理 5 种 RequestMessage的实现如下:

  1. @Overridepublicvoidhandle(RequestMessage request){if(request instanceofChunkFetchRequest){processFetchRequest((ChunkFetchRequest) request);}elseif(request instanceofRpcRequest){processRpcRequest((RpcRequest) request);}elseif(request instanceofOneWayMessage){processOneWayMessage((OneWayMessage) request);}elseif(request instanceofStreamRequest){processStreamRequest((StreamRequest) request);}elseif(request instanceofUploadStream){processStreamUpload((UploadStream) request);}else{thrownewIllegalArgumentException("Unknown request type: "+ request);}}
  • 处理块获取请求:processFetchRequest 方法用于处理 ChunkFetchRequest 类型的消息privatevoidprocessFetchRequest(finalChunkFetchRequest req){if(logger.isTraceEnabled()){ logger.trace("Received req from {} to fetch block {}",getRemoteAddress(channel), req.streamChunkId);}// streamManager 是来源于上面的代码 this.streamManager = rpcHandler.getStreamManager();// 是通过调用 RpcHandler 的 getStreamManager 方法获取到的,long chunksBeingTransferred = streamManager.chunksBeingTransferred();if(chunksBeingTransferred >= maxChunksBeingTransferred){ logger.warn("The number of chunks being transferred {} is above {}, close the connection.", chunksBeingTransferred, maxChunksBeingTransferred); channel.close();return;}ManagedBuffer buf;try{// 第一步:校验客户端是否有权限从给定的流中读取 streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);// 第二步:获取单个的块(块被封装为 ManagedBuffer) buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);}catch(Exception e){ logger.error(String.format("Error opening block %s for request from %s", req.streamChunkId,getRemoteAddress(channel)), e);respond(newChunkFetchFailure(req.streamChunkId,Throwables.getStackTraceAsString(e)));return;}// 第三部:将 ManagedBuffer 和流的块Id封装为 ChunkFetchSuccess 后,返回给客户端 streamManager.chunkBeingSent(req.streamChunkId.streamId);respond(newChunkFetchSuccess(req.streamChunkId, buf)).addListener(future ->{ streamManager.chunkSent(req.streamChunkId.streamId);});}
  • 处理RPC请求:processRpcRequest 方法用于处理 RpcRequest 类型的消息privatevoidprocessRpcRequest(finalRpcRequest req){try{// 将RpcRequest 消息的内容体、发送消息的客户端及一个RpcResponseCallback 类型的匿名内部类作为参数传递给了RpcHandler 的receive 方法。这就是说 真正用于处理RpcRequest 消息的是RpcHandler,而非 TransportRequestHiandler,并且在 成功或失败时分别调用了 onSuccess 和 onFailure rpcHandler.receive(reverseClient, req.body().nioByteBuffer(),newRpcResponseCallback(){@OverridepublicvoidonSuccess(ByteBuffer response){respond(newRpcResponse(req.requestId,newNioManagedBuffer(response)));}@OverridepublicvoidonFailure(Throwable e){respond(newRpcFailure(req.requestId,Throwables.getStackTraceAsString(e)));}});}catch(Exception e){ logger.error("Error while invoking RpcHandler#receive() on RPC id "+ req.requestId, e);respond(newRpcFailure(req.requestId,Throwables.getStackTraceAsString(e)));}finally{ req.body().release();}}
  • 处理流请求:processStreamRequest 方法用于处理StreamRequest 类型的消息privatevoidprocessStreamRequest(finalStreamRequest req){if(logger.isTraceEnabled()){ logger.trace("Received req from {} to fetch stream {}",getRemoteAddress(channel), req.streamId);}long chunksBeingTransferred = streamManager.chunksBeingTransferred();if(chunksBeingTransferred >= maxChunksBeingTransferred){ logger.warn("The number of chunks being transferred {} is above {}, close the connection.", chunksBeingTransferred, maxChunksBeingTransferred); channel.close();return;}ManagedBuffer buf;try{// 第一步:调用StreamManager 的 openStream 方法将获取到的流数据封装 ManagedBuffer buf = streamManager.openStream(req.streamId);}catch(Exception e){ logger.error(String.format("Error opening stream %s for request from %s", req.streamId,getRemoteAddress(channel)), e);respond(newStreamFailure(req.streamId,Throwables.getStackTraceAsString(e)));return;}// 当成功或失败时调用respond 方法向客户端响应if(buf !=null){ streamManager.streamBeingSent(req.streamId);respond(newStreamResponse(req.streamId, buf.size(), buf)).addListener(future ->{ streamManager.streamSent(req.streamId);});}else{respond(newStreamFailure(req.streamId,String.format("Stream '%s' was not found.", req.streamId)));}}
  • 处理无需回复的RPC请求:processOneWayMessage 方法用于处理 StreamRequest 类型的消息privatevoidprocessOneWayMessage(OneWayMessage req){try{// 这里的实现和 processRpcRequest 相似,区别在于这里调用了 RpcHandler 中重载的带有 ONE_WAY_CALLBACK 的 receive 方法,也就是没有callback回调,也就是不会做出响应 rpcHandler.receive(reverseClient, req.body().nioByteBuffer());}catch(Exception e){ logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);}finally{ req.body().release();}}
  • 处理上传流的请求(流处理):processStreamUpload 方法用于处理 UploadStream 类型的消息privatevoidprocessStreamUpload(finalUploadStream req){assert(req.body()==null);try{// 第一步:先构建好回调的对象,并处理成功和失败的情况RpcResponseCallback callback =newRpcResponseCallback(){@OverridepublicvoidonSuccess(ByteBuffer response){respond(newRpcResponse(req.requestId,newNioManagedBuffer(response)));}@OverridepublicvoidonFailure(Throwable e){respond(newRpcFailure(req.requestId,Throwables.getStackTraceAsString(e)));}};// 第二步;构建一个帧解码器TransportFrameDecoder frameDecoder =(TransportFrameDecoder) channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);// 第三步:获取到请求的消息头(元数据)的 ByteBufferByteBuffer meta = req.meta.nioByteBuffer();// 第四步:根据消息头开始获取流数据StreamCallbackWithID streamHandler = rpcHandler.receiveStream(reverseClient, meta, callback);if(streamHandler ==null){thrownewNullPointerException("rpcHandler returned a null streamHandler");}// 第五步:这里使用streamhandler来处理了当前id和对应的buf数据,并且定义了处理数据,完成,失败等情况下的处理方式StreamCallbackWithID wrappedCallback =newStreamCallbackWithID(){@OverridepublicvoidonData(String streamId,ByteBuffer buf)throwsIOException{ streamHandler.onData(streamId, buf);}@OverridepublicvoidonComplete(String streamId)throwsIOException{try{ streamHandler.onComplete(streamId); callback.onSuccess(ByteBuffer.allocate(0));}catch(Exception ex){IOException ioExc =newIOException("Failure post-processing complete stream;"+" failing this rpc and leaving channel active", ex); callback.onFailure(ioExc); streamHandler.onFailure(streamId, ioExc);}}@OverridepublicvoidonFailure(String streamId,Throwable cause)throwsIOException{ callback.onFailure(newIOException("Destination failed while reading stream", cause)); streamHandler.onFailure(streamId, cause);}@OverridepublicStringgetID(){return streamHandler.getID();}};// 第六步:这里对有数据的流数据请求,通过拦截器的方式,使用解码器进行了解码if(req.bodyByteCount >0){StreamInterceptor<RequestMessage> interceptor =newStreamInterceptor<>(this, wrappedCallback.getID(), req.bodyByteCount, wrappedCallback); frameDecoder.setInterceptor(interceptor);}else{ wrappedCallback.onComplete(wrappedCallback.getID());}}catch(Exception e){ logger.error("Error while invoking RpcHandler#receive() on RPC id "+ req.requestId, e);respond(newRpcFailure(req.requestId,Throwables.getStackTraceAsString(e)));// We choose to totally fail the channel, rather than trying to recover as we do in other// cases. We don't know how many bytes of the stream the client has already sent for the// stream, it's not worth trying to recover. channel.pipeline().fireExceptionCaught(e);}finally{ req.meta.release();}}

可以看到上面几个方法最后都调用了 respond方法响应客户端,其代码实现如下

  1. privateChannelFuturerespond(Encodable result){SocketAddress remoteAddress = channel.remoteAddress();// 实际调用了 Netty 中 Channel 的 writeAndFlush 方法进行了响应return channel.writeAndFlush(result).addListener(future ->{if(future.isSuccess()){
  2. logger.trace("Sent result {} to client {}", result, remoteAddress);}else{
  3. logger.error(String.format("Error sending result %s to %s; closing connection",
  4. result, remoteAddress), future.cause());
  5. channel.close();}});}
九、TransportServerBootstrap解析

TransportServerBootstrap 的列表 bootstraps 出现在 TransportServer 的构造器中,接口 TransportServerBootstrap 定义了服务端引导程序的规范,服务端引导程序是当客户端和服务端建立连接后,在服务端持有的客户端管道上执行的引导程序

  1. publicinterfaceTransportServerBootstrap{// TransportServerBootstrap 的实现类的 doBootstrap 将对服务端的 RpcHandler 进行代理,接收客户端的请求RpcHandlerdoBootstrap(Channel channel,RpcHandler rpcHandler);}

TransportServerBootstrap 有三个实现类,分别是 AuthServerBootstrap、EncryptionCheckerBootstrap 和 SaslServerBootstrap

  • AuthServerBootstrap 使用的是 Spark 自己的协议来做验证,如果客户端不支持,那么就会变为使用 SaslServerBootstrap 进行认证交换和执行引导程序
  • EncryptionCheckerBootstrap 通过将自身加入Netty的管道中实现引导
  • SaslServerBootstrap 则是使用 SASL 进行认证交换

下面就以 SaslServerBootstrap 来分析具体是如何实现的

  1. publicRpcHandlerdoBootstrap(Channel channel,RpcHandler rpcHandler){returnnewSaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);}

SaslServerBootstrap 的 doBootstrap 方法司机创建了 SaslRpcHandler,SaslRpcHandler 负责对管道进行 SASL 认证交换,SaslRpcHandler 本身也继承了 AbstractAuthRpcHandler 抽象类,AbstractAuthRpcHandler 继承了 RpcHandler。

AbstractAuthRpcHandler 是三种实现类的父抽象类,同时也是在这个类中对于不同情况下的验证加密情况进行判断

  • 当处理的是一般带有回调的非流请求时,如果是验证过了,那么进行receive,没有的话,就调用三种实现下各自的 doAuthChallenge 方法进行验证和加密
  • 当处理的是没有回调的一次性请求 或者 带有回调的流式数据请求 时,如果是验证过了,那么进行receive,没有的话,直接抛出异常

可参考类似 TransportRequestHandler 中的各种 Request 和 processXXX 方法来结合理解

下面对于 SaslRpcHandler 中的 doAuthChallenge 分析

  1. @OverridepublicbooleandoAuthChallenge(TransportClient client,ByteBuffer message,RpcResponseCallback callback){// 如果 SASL 认证交换未完成或者还没有 saslServer时if(saslServer ==null||!saslServer.isComplete()){ByteBuf nettyBuf =Unpooled.wrappedBuffer(message);SaslMessage saslMessage;try{// 首先对发送过来的消息进行解码
  2. saslMessage =SaslMessage.decode(nettyBuf);}finally{
  3. nettyBuf.release();}if(saslServer ==null){// 当接收到客户端的第一条消息的时候就会做这个操作
  4. client.setClientId(saslMessage.appId);// 创建一个 SparkSaslServer
  5. saslServer =newSparkSaslServer(saslMessage.appId, secretKeyHolder,
  6. conf.saslServerAlwaysEncrypt());}byte[] response;try{// 这里处理已解密的消息并且构建响应
  7. response = saslServer.response(JavaUtils.bufferToArray(
  8. saslMessage.body().nioByteBuffer()));}catch(IOException ioe){thrownewRuntimeException(ioe);}// 通过 RpcResponseCallback 的回调方法返回给客户端
  9. callback.onSuccess(ByteBuffer.wrap(response));}// 如果 SALSL 认证交换已经是完成状态 if(saslServer.isComplete()){if(!SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))){
  10. logger.debug("SASL authentication successful for channel {}", client);// SASL认证交换已完成complete(true);returntrue;}
  11. logger.debug("Enabling encryption for channel {}", client);// 对管道进行加密SaslEncryption.addToChannel(channel, saslServer, conf.maxSaslEncryptedBlockSize());complete(false);returntrue;}returnfalse;}

TransportServerBootstrap 在整个过程中起到的是 引导、包装、代理的作用

十、TransportClient解析

TransportClient 作为客户端发送RPC请求,一种有5个方法用于发送请求

  1. fetchChunk : 从远端协商好的流中请求单个块
  2. stream : 使用流的 ID,从远端获取流数据
  3. sendRpc : 向服务端发送 RPC 的请求,通过 At least Once Delivery 原则保证请求不会丢失
  4. sendRpcSync : 向服务端发送异步的 RPC 的请求,并根据指定的超时时间等待响应
  5. send : 向服务端发送 RPC 的请求,但是并不期望能获取响应,因为不能保证投递的可靠性

这里对于 sendRpc 方法进行分析,fetchChunk 方法只是将 requestId 变为了 streamId 以及对应的 chunkIndex 来对当前流数据进行定位和绑定,实际处理流程大同小异

sendRpc 发送Rpc请求的方法的实现如下代码:

  1. publiclongsendRpc(ByteBuffer message,RpcResponseCallback callback){if(logger.isTraceEnabled()){
  2. logger.trace("Sending RPC to {}",getRemoteAddress(channel));}// 通过 requestId 方法可以知道,这里其实是使用了 UUID 生成了请求主键long requestId =requestId();// 添加 requestId 与 RpcResponseCallback 的引用之间的关系,通过下方的方法可以知道,这里先将更新最后一次请求的时间为当前系统时间,然后将 requestId 与 RpcResponseCallback 之间的映射加入到 outstandingRpcs 缓存中,outstandingRpcs 专门用于缓存发出的 RPC 请求信息
  3. handler.addRpcRequest(requestId, callback);RpcChannelListener listener =newRpcChannelListener(requestId, callback);// 发送 RPC 请求,当发送成功或者失败的时候会回调 ChannelFutureListener 的 operationComplete 方法。// 如果发送成功,那么只会打印 requestId、远端地址及花费时间的日志// 如果发送失败,除了打印错误日志外,还要调用 TransportResponseHandler 的 removePrcRequest 方法,将此次请求从 outstandingRpcs 缓存中移除
  4. channel.writeAndFlush(newRpcRequest(requestId,newNioManagedBuffer(message))).addListener(listener);return requestId;}
  1. privatestaticlongrequestId(){returnMath.abs(UUID.randomUUID().getLeastSignificantBits());}
  1. publicvoidaddRpcRequest(long requestId,RpcResponseCallback callback){updateTimeOfLastRequest();
  2. outstandingRpcs.put(requestId, callback);}

当上面的请求发送成功后,客户端会等到接收服务端的响应,返回的消息会传递到 TransportChannelHandler 的 channelRead 方法,最后会交给 TransportResponseHandler 的 handle 方法来处理,TransportResponseHandler 的 handle 方法分别会处理 6 种 ResponseMessage,由于服务端使用 processRpcRequest 方法处理 RpcRequest 类型的消息后,返回给客户端的消息为 RpcRequest 或 RpcFailure,接下来对于 TransportResponseHandler 的handle方法进行如下分析:

  1. elseif(message instanceofRpcResponse){RpcResponse resp =(RpcResponse) message;// 使用 RpcResponse 对应的 RpcRequest 的主键 requestId,从缓存中获取 RpcResponseCallbackRpcResponseCallback listener = outstandingRpcs.get(resp.requestId);if(listener ==null){
  2. logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
  3. resp.requestId,getRemoteAddress(channel), resp.body().size());// 如果没有对应的callback,则直接将响应释放
  4. resp.body().release();}else{// 如果有对应的callback,移除 outstandingRpcs 缓存中 requestId 和 RpcResponseCallback 的注册消息
  5. outstandingRpcs.remove(resp.requestId);try{// 调用 callback 中的 onSuccess,处理成功响应后的具体逻辑,这里的逻辑会在各个使用 TransportClient 的 sendRpc 方法的场景中分别实现
  6. listener.onSuccess(resp.body().nioByteBuffer());}finally{// 最终将响应的body释放
  7. resp.body().release();}}}elseif(message instanceofRpcFailure){RpcFailure resp =(RpcFailure) message;// 如 RpcSuccess 处理RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);if(listener ==null){
  8. logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
  9. resp.requestId,getRemoteAddress(channel), resp.errorString);}else{
  10. outstandingRpcs.remove(resp.requestId);// 这里和 RpcSuccess,唯一的区别是调用的为 onFailure
  11. listener.onFailure(newRuntimeException(resp.errorString));}}

了解了TransportClient之后可以对之前的图进行进一步的丰富

管道处理请求和响应V3

标签: spark rpc java

本文转载自: https://blog.csdn.net/weixin_37990260/article/details/140214390
版权归原作者 Neil-Wick 所有, 如有侵权,请联系我们删除。

“Spark-RPC框架解析之组件介绍及流程再探”的评论:

还没有评论