0


【Spark源码分析】Spark的RPC通信二-初稿

Spark的RPC通信二-初稿

Spark RPC的传输层

传输层主要还是借助netty框架进行实现。

TransportContext

包含创建

TransportServer

TransportClientFactory

和使用

TransportChannelHandler

设置 Netty Channel 管道的上下文。

TransportClient

提供两种通信协议:control-plane RPCs 和data-plane的 “chunk fetching”。RPC 的处理在

TransportContext

的范围之外进行(即由用户提供的处理程序执行),它负责设置流,这些流可以使用零拷贝 IO 以块为单位通过数据平面进行流式传输。对消息的处理由

RpcHandler

处理。

TransportServer

TransportClientFactory

都会为每个通道创建一个

TransportChannelHandler

。由于每个

TransportChannelHandler

都包含一个

TransportClient

,因此服务器进程可以通过现有通道向客户端发送消息。

#mermaid-svg-RZKgMJJbCOL4j4rf {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-RZKgMJJbCOL4j4rf .error-icon{fill:#552222;}#mermaid-svg-RZKgMJJbCOL4j4rf .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-RZKgMJJbCOL4j4rf .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-RZKgMJJbCOL4j4rf .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-RZKgMJJbCOL4j4rf .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-RZKgMJJbCOL4j4rf .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-RZKgMJJbCOL4j4rf .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-RZKgMJJbCOL4j4rf .marker{fill:#333333;stroke:#333333;}#mermaid-svg-RZKgMJJbCOL4j4rf .marker.cross{stroke:#333333;}#mermaid-svg-RZKgMJJbCOL4j4rf svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-RZKgMJJbCOL4j4rf g.classGroup text{fill:#9370DB;fill:#131300;stroke:none;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:10px;}#mermaid-svg-RZKgMJJbCOL4j4rf g.classGroup text .title{font-weight:bolder;}#mermaid-svg-RZKgMJJbCOL4j4rf .nodeLabel,#mermaid-svg-RZKgMJJbCOL4j4rf .edgeLabel{color:#131300;}#mermaid-svg-RZKgMJJbCOL4j4rf .edgeLabel .label rect{fill:#ECECFF;}#mermaid-svg-RZKgMJJbCOL4j4rf .label text{fill:#131300;}#mermaid-svg-RZKgMJJbCOL4j4rf .edgeLabel .label span{background:#ECECFF;}#mermaid-svg-RZKgMJJbCOL4j4rf .classTitle{font-weight:bolder;}#mermaid-svg-RZKgMJJbCOL4j4rf .node rect,#mermaid-svg-RZKgMJJbCOL4j4rf .node circle,#mermaid-svg-RZKgMJJbCOL4j4rf .node ellipse,#mermaid-svg-RZKgMJJbCOL4j4rf .node polygon,#mermaid-svg-RZKgMJJbCOL4j4rf .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-RZKgMJJbCOL4j4rf .divider{stroke:#9370DB;stroke:1;}#mermaid-svg-RZKgMJJbCOL4j4rf g.clickable{cursor:pointer;}#mermaid-svg-RZKgMJJbCOL4j4rf g.classGroup rect{fill:#ECECFF;stroke:#9370DB;}#mermaid-svg-RZKgMJJbCOL4j4rf g.classGroup line{stroke:#9370DB;stroke-width:1;}#mermaid-svg-RZKgMJJbCOL4j4rf .classLabel .box{stroke:none;stroke-width:0;fill:#ECECFF;opacity:0.5;}#mermaid-svg-RZKgMJJbCOL4j4rf .classLabel .label{fill:#9370DB;font-size:10px;}#mermaid-svg-RZKgMJJbCOL4j4rf .relation{stroke:#333333;stroke-width:1;fill:none;}#mermaid-svg-RZKgMJJbCOL4j4rf .dashed-line{stroke-dasharray:3;}#mermaid-svg-RZKgMJJbCOL4j4rf #compositionStart,#mermaid-svg-RZKgMJJbCOL4j4rf .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-RZKgMJJbCOL4j4rf #compositionEnd,#mermaid-svg-RZKgMJJbCOL4j4rf .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-RZKgMJJbCOL4j4rf #dependencyStart,#mermaid-svg-RZKgMJJbCOL4j4rf .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-RZKgMJJbCOL4j4rf #dependencyStart,#mermaid-svg-RZKgMJJbCOL4j4rf .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-RZKgMJJbCOL4j4rf #extensionStart,#mermaid-svg-RZKgMJJbCOL4j4rf .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-RZKgMJJbCOL4j4rf #extensionEnd,#mermaid-svg-RZKgMJJbCOL4j4rf .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-RZKgMJJbCOL4j4rf #aggregationStart,#mermaid-svg-RZKgMJJbCOL4j4rf .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-RZKgMJJbCOL4j4rf #aggregationEnd,#mermaid-svg-RZKgMJJbCOL4j4rf .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-RZKgMJJbCOL4j4rf .edgeTerminals{font-size:11px;}#mermaid-svg-RZKgMJJbCOL4j4rf :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
TransportContext

-TransportConf conf

-RpcHandler rpcHandler

-boolean closeIdleConnections

-boolean isClientOnly

-MessageEncoder ENCODER

-MessageEncoder DECODER

-EventLoopGroup chunkFetchWorkers

+TransportClientFactory createClientFactory()

+TransportServer createServer()

+TransportChannelHandler createChannelHandler()

+TransportChannelHandler initializePipeline()

ClientPool

TransportClient[] clients

Object[] lock

TransportClientFactory

-TransportContext context

-TransportConf conf

-List clientBootstraps

-ConcurrentHashMap connectionPool

-int numConnectionsPerPeer

-final Class socketChannelClass

-EventLoopGroup workerGroup

-PooledByteBufAllocator pooledAllocator

-NettyMemoryMetrics metrics

«interface»

TransportClientBootstrap

void doBootstrap(TransportClient client, Channel channel)

«interface»

TransportServerBootstrap

RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler)

«abstract»

MessageHandler

abstract void handle(T message)

abstract void channelActive()

abstract void exceptionCaught(Throwable cause)

abstract void channelInactive()

«interface»

Message

«abstract»

RpcHandler

TransportRequestHandler

RpcHandler rpcHandler

StreamManager streamManager

TransportResponseHandler

TransportClient

TransportServer

TransportChannelHandler

传输上下文

TransportContext
TransportContext

的核心成员与核心方法

  • TransportConf conf:传输的配置信息
  • RpcHandler rpcHandler:对接收的RPC消息进行处理
  • EventLoopGroup chunkFetchWorkers:处理 ChunkFetchRequest 的独立线程池。这有助于控制通过底层通道将 ChunkFetchRequest 信息写回客户端时阻塞的 TransportServer 工作线程的最大数量。
  • createClientFactory():初始化 ClientFactory,在返回新客户端之前运行给定的 TransportClientBootstraps。Bootstraps 将同步执行,并且必须成功运行才能创建客户端。
  • createServer():创建传输服务端TransportServer的实例
  • initializePipeline():对TransportClientTransportRequestHandlerTransportResponseHandler进行初始化,然后在用其构造TransportChannelHandler对象。借助Netty的API对管道进行配置。
TransportContext

createClientFactory

方法创建传输客户端工厂

TransportClientFactory

的实例。在构造

TransportClientFactory

的实例时,还会传递客户端引导程序

TransportClientBootstrap

的列表。

TransportClientFactory

内部维护每个Socket地址的连接池。通过调用

TransportContext

createServer

方法创建传输服务端

TransportServer

的实例。

核心类TransportClientFactory

用于使用

createClient

方法 创建

TransportClients

的工厂。该工厂负责维护与其他主机的连接池,并为同一远程主机返回相同的

TransportClient

。它还为所有

TransportClients

共享一个工作线程池。只要有可能,就会重复使用

TransportClients

。在完成创建新的

TransportClient

之前,将运行所有给定的

TransportClientBootstraps

TransportClientFactory

的核心成员和核心方法

  • 静态内部类ClientPool:一种简单的数据结构,用于跟踪两个对等节点之间的客户端连接池,保障其可以复用,由于线程不安全,所以增加了客户端对应的锁。privatestaticclassClientPool{TransportClient[] clients;Object[] locks;ClientPool(int size){ clients =newTransportClient[size]; locks =newObject[size];for(int i =0; i < size; i++){ locks[i]=newObject();}}}
  • TransportContext context:TransportContext 的实例对象
  • TransportConf conf:链接配置信息的实例对象
  • List<TransportClientBootstrap> clientBootstraps:客户端的引导程序,主要是客户端在建立连接的时候,进行一些初始化的准备操作。
  • ConcurrentHashMap<SocketAddress, ClientPool> connectionPool:维护了连接地址上的客户端连接池的映射表。
  • createClient(String remoteHost, int remotePort):- 首先根据远程地址,确认客户端连接池connectionPool中是否存在关于这个地址的客户端池clientPool,如果没有就新建一个客户端池放入连接池中。- 检查通道是否超时和客户端是否存活,如果客户端失活,则需要重建一个客户端。创建客户端的在createClient(InetSocketAddress address)方法中。publicTransportClientcreateClient(String remoteHost,int remotePort)throwsIOException,InterruptedException{// 此处使用未解析地址,以避免每次创建客户端时都进行 DNS 解析。finalInetSocketAddress unresolvedAddress =InetSocketAddress.createUnresolved(remoteHost, remotePort);// 如果clientPool不存在,则新建.ClientPool clientPool = connectionPool.get(unresolvedAddress);if(clientPool ==null){ connectionPool.putIfAbsent(unresolvedAddress,newClientPool(numConnectionsPerPeer)); clientPool = connectionPool.get(unresolvedAddress);}int clientIndex = rand.nextInt(numConnectionsPerPeer);TransportClient cachedClient = clientPool.clients[clientIndex];if(cachedClient !=null&& cachedClient.isActive()){// 更新处理程序的最后使用时间,确保通道不会超时TransportChannelHandler handler = cachedClient.getChannel().pipeline().get(TransportChannelHandler.class);synchronized(handler){ handler.getResponseHandler().updateTimeOfLastRequest();}// 然后检查客户端是否还活着,以防在代码更新之前超时。if(cachedClient.isActive()){ logger.trace("Returning cached connection to {}: {}", cachedClient.getSocketAddress(), cachedClient);return cachedClient;}}// 如果我们到达这里,就没有打开现有连接,尝试创建一个新连接。finallong preResolveHost =System.nanoTime();finalInetSocketAddress resolvedAddress =newInetSocketAddress(remoteHost, remotePort);finallong hostResolveTimeMs =(System.nanoTime()- preResolveHost)/1000000;if(hostResolveTimeMs >2000){ logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);}else{ logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);}// 多个线程可能会竞相在这里创建新连接。通过同步原语只保留其中一个处于活动状态。synchronized(clientPool.locks[clientIndex]){ cachedClient = clientPool.clients[clientIndex];if(cachedClient !=null){if(cachedClient.isActive()){ logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);return cachedClient;}else{ logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);}} clientPool.clients[clientIndex]=createClient(resolvedAddress);return clientPool.clients[clientIndex];}}
  • createClient(InetSocketAddress address):- 通过Netty的根引导程序进行初始化配置- 通过回调函数初始化bootstrap的Pipeline,设置好客户端引用和管道引用。- 遍历客户端引导程序集clientBootstraps,执行其初始化的内容privateTransportClientcreateClient(InetSocketAddress address)throwsIOException,InterruptedException{ logger.debug("Creating new connection to {}", address);// netty的连接创建的根引导程序Bootstrap bootstrap =newBootstrap(); bootstrap.group(workerGroup).channel(socketChannelClass)// 禁用纳格尔算法,因为我们不想让数据包等待.option(ChannelOption.TCP_NODELAY,true).option(ChannelOption.SO_KEEPALIVE,true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()).option(ChannelOption.ALLOCATOR, pooledAllocator);if(conf.receiveBuf()>0){ bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());}if(conf.sendBuf()>0){ bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf());}finalAtomicReference<TransportClient> clientRef =newAtomicReference<>();finalAtomicReference<Channel> channelRef =newAtomicReference<>();// 通过回调函数初始化bootstrap的Pipeline bootstrap.handler(newChannelInitializer<SocketChannel>(){@OverridepublicvoidinitChannel(SocketChannel ch){TransportChannelHandler clientHandler = context.initializePipeline(ch); clientRef.set(clientHandler.getClient()); channelRef.set(ch);}});// 连接远程服务器long preConnect =System.nanoTime();ChannelFuture cf = bootstrap.connect(address);if(!cf.await(conf.connectionTimeoutMs())){thrownewIOException(String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));}elseif(cf.cause()!=null){thrownewIOException(String.format("Failed to connect to %s", address), cf.cause());}TransportClient client = clientRef.get();Channel channel = channelRef.get();assert client !=null:"Channel future completed successfully with null client";// 在将客户端标记为成功之前,同步执行任何客户端引导。long preBootstrap =System.nanoTime(); logger.debug("Connection to {} successful, running bootstraps...", address);try{// 遍历客户端引导程序集clientBootstraps,执行其初始化的内容for(TransportClientBootstrap clientBootstrap : clientBootstraps){ clientBootstrap.doBootstrap(client, channel);}}catch(Exception e){// catch non-RuntimeExceptions too as bootstrap may be written in Scalalong bootstrapTimeMs =(System.nanoTime()- preBootstrap)/1000000; logger.error("Exception while bootstrapping client after "+ bootstrapTimeMs +" ms", e); client.close();throwThrowables.propagate(e);}long postBootstrap =System.nanoTime(); logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)", address,(postBootstrap - preConnect)/1000000,(postBootstrap - preBootstrap)/1000000);return client;}

TransportClient

用于向server端发送rpc请求和从server 端获取流的chunk块,旨在高效传输大量数据,这些数据被分成大小从几百 KB 到几 MB 不等的数据块。

典型流程

// 打开远程文件
client.sendRPC(newOpenFile("/foo"))--> returns StreamId=100// 获取远程文件的chunk
client.fetchChunk(streamId =100, chunkIndex =0, callback)
client.fetchChunk(streamId =100, chunkIndex =1, callback)// 关闭远程文件
client.sendRPC(newCloseStream(100))

用于获取预协商数据流中连续数据块的客户端,处理的是从数据流(即数据平面)中获取数据块的过程,但数据流的实际设置是在传输层范围之外完成的。提供 "sendRPC "方便方法是为了在客户端和服务器之间进行控制平面通信,以执行此设置。使用

TransportClientFactory

构建一个

TransportClient

实例。单个

TransportClient

可用于多个流,但任何给定的流都必须仅限于单个客户端,以避免响应顺序混乱。注意:该类用于向服务器发出请求,而

TransportResponseHandler

则负责处理来自服务器的响应。并发性:线程安全,可由多个线程调用。

TransportServer

服务器,提供高效的底层流媒体服务。

消息的处理

消息处理类

MessageHandler

处理来自

Netty

的请求或响应信息。一个

MessageHandler

实例只与一个

Netty

通道相关联(尽管同一通道上可能有多个客户端)。以下是其定义的抽象方法。

  • abstract void handle(T message):对接收的单条信息的处理。
  • abstract void channelActive():当该消息处理程序所在的频道处于活动状态时调用。
  • abstract void exceptionCaught(Throwable cause):当通道上出现异常时调用。
  • abstract void channelInactive():当此 MessageHandler 所处的通道处于非活动状态时调用。
MessageHandler

有两个继承类

TransportRequestHandler

TransportResponseHandler

分别用来进行Server端处理Client的请求信息和Client端处理Server的响应信息。

TransportRequestHandler

handle(RequestMessage request)

方法

publicvoidhandle(RequestMessage request){if(request instanceofRpcRequest){// 处理RPC请求,依赖RpcHandler的receive()方法processRpcRequest((RpcRequest) request);}elseif(request instanceofOneWayMessage){// 处理无需回复的RPC请求,依赖RpcHandler的receive()方法processOneWayMessage((OneWayMessage) request);}elseif(request instanceofStreamRequest){// 处理流请求,依赖StreamManager的openStream()方法获取流数据并封装成ManagedBufferprocessStreamRequest((StreamRequest) request);}else{// 未知请求抛异常thrownewIllegalArgumentException("Unknown request type: "+ request);}}
TransportResponseHandler

handle(ResponseMessage message)

方法

**在client端发送消息时,根据发送消息的类型调用

TransportResponseHandler

中的方法注册回调函数**,回调函数和请求信息放入相应的缓存中。

TransportResponseHandler

收到server端的响应消息时,再**调用主要的工作方法

handle()

,根据响应消息类型从对应缓存中取出回调函数并调用**。

@Overridepublicvoidhandle(ResponseMessage message)throwsException{if(message instanceofChunkFetchSuccess){ChunkFetchSuccess resp =(ChunkFetchSuccess) message;ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);if(listener ==null){
        logger.warn("Ignoring response for block {} from {} since it is not outstanding",
          resp.streamChunkId,getRemoteAddress(channel));
        resp.body().release();}else{
        outstandingFetches.remove(resp.streamChunkId);
        listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
        resp.body().release();}}elseif(message instanceofChunkFetchFailure){ChunkFetchFailure resp =(ChunkFetchFailure) message;ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);if(listener ==null){
        logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
          resp.streamChunkId,getRemoteAddress(channel), resp.errorString);}else{
        outstandingFetches.remove(resp.streamChunkId);
        listener.onFailure(resp.streamChunkId.chunkIndex,newChunkFetchFailureException("Failure while fetching "+ resp.streamChunkId +": "+ resp.errorString));}}elseif(message instanceofRpcResponse){RpcResponse resp =(RpcResponse) message;RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);if(listener ==null){
        logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
          resp.requestId,getRemoteAddress(channel), resp.body().size());}else{
        outstandingRpcs.remove(resp.requestId);try{
          listener.onSuccess(resp.body().nioByteBuffer());}finally{
          resp.body().release();}}}elseif(message instanceofRpcFailure){RpcFailure resp =(RpcFailure) message;RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);if(listener ==null){
        logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
          resp.requestId,getRemoteAddress(channel), resp.errorString);}else{
        outstandingRpcs.remove(resp.requestId);
        listener.onFailure(newRuntimeException(resp.errorString));}}elseif(message instanceofStreamResponse){StreamResponse resp =(StreamResponse) message;Pair<String,StreamCallback> entry = streamCallbacks.poll();if(entry !=null){StreamCallback callback = entry.getValue();if(resp.byteCount >0){StreamInterceptor interceptor =newStreamInterceptor(this, resp.streamId, resp.byteCount,
            callback);try{TransportFrameDecoder frameDecoder =(TransportFrameDecoder)
              channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
            frameDecoder.setInterceptor(interceptor);
            streamActive =true;}catch(Exception e){
            logger.error("Error installing stream handler.", e);deactivateStream();}}else{try{
            callback.onComplete(resp.streamId);}catch(Exception e){
            logger.warn("Error in stream handler onComplete().", e);}}}else{
        logger.error("Could not find callback for StreamResponse.");}}elseif(message instanceofStreamFailure){StreamFailure resp =(StreamFailure) message;Pair<String,StreamCallback> entry = streamCallbacks.poll();if(entry !=null){StreamCallback callback = entry.getValue();try{
          callback.onFailure(resp.streamId,newRuntimeException(resp.error));}catch(IOException ioe){
          logger.warn("Error in stream failure handler.", ioe);}}else{
        logger.warn("Stream failure with unknown callback: {}", resp.error);}}else{thrownewIllegalStateException("Unknown response type: "+ message.type());}}

消息的分类

MessageHandler

用来处理的消息都是继承或实现自Message接口的。

#mermaid-svg-utrongvlch8I48n8 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-utrongvlch8I48n8 .error-icon{fill:#552222;}#mermaid-svg-utrongvlch8I48n8 .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-utrongvlch8I48n8 .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-utrongvlch8I48n8 .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-utrongvlch8I48n8 .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-utrongvlch8I48n8 .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-utrongvlch8I48n8 .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-utrongvlch8I48n8 .marker{fill:#333333;stroke:#333333;}#mermaid-svg-utrongvlch8I48n8 .marker.cross{stroke:#333333;}#mermaid-svg-utrongvlch8I48n8 svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-utrongvlch8I48n8 g.classGroup text{fill:#9370DB;fill:#131300;stroke:none;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:10px;}#mermaid-svg-utrongvlch8I48n8 g.classGroup text .title{font-weight:bolder;}#mermaid-svg-utrongvlch8I48n8 .nodeLabel,#mermaid-svg-utrongvlch8I48n8 .edgeLabel{color:#131300;}#mermaid-svg-utrongvlch8I48n8 .edgeLabel .label rect{fill:#ECECFF;}#mermaid-svg-utrongvlch8I48n8 .label text{fill:#131300;}#mermaid-svg-utrongvlch8I48n8 .edgeLabel .label span{background:#ECECFF;}#mermaid-svg-utrongvlch8I48n8 .classTitle{font-weight:bolder;}#mermaid-svg-utrongvlch8I48n8 .node rect,#mermaid-svg-utrongvlch8I48n8 .node circle,#mermaid-svg-utrongvlch8I48n8 .node ellipse,#mermaid-svg-utrongvlch8I48n8 .node polygon,#mermaid-svg-utrongvlch8I48n8 .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-utrongvlch8I48n8 .divider{stroke:#9370DB;stroke:1;}#mermaid-svg-utrongvlch8I48n8 g.clickable{cursor:pointer;}#mermaid-svg-utrongvlch8I48n8 g.classGroup rect{fill:#ECECFF;stroke:#9370DB;}#mermaid-svg-utrongvlch8I48n8 g.classGroup line{stroke:#9370DB;stroke-width:1;}#mermaid-svg-utrongvlch8I48n8 .classLabel .box{stroke:none;stroke-width:0;fill:#ECECFF;opacity:0.5;}#mermaid-svg-utrongvlch8I48n8 .classLabel .label{fill:#9370DB;font-size:10px;}#mermaid-svg-utrongvlch8I48n8 .relation{stroke:#333333;stroke-width:1;fill:none;}#mermaid-svg-utrongvlch8I48n8 .dashed-line{stroke-dasharray:3;}#mermaid-svg-utrongvlch8I48n8 #compositionStart,#mermaid-svg-utrongvlch8I48n8 .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-utrongvlch8I48n8 #compositionEnd,#mermaid-svg-utrongvlch8I48n8 .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-utrongvlch8I48n8 #dependencyStart,#mermaid-svg-utrongvlch8I48n8 .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-utrongvlch8I48n8 #dependencyStart,#mermaid-svg-utrongvlch8I48n8 .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-utrongvlch8I48n8 #extensionStart,#mermaid-svg-utrongvlch8I48n8 .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-utrongvlch8I48n8 #extensionEnd,#mermaid-svg-utrongvlch8I48n8 .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-utrongvlch8I48n8 #aggregationStart,#mermaid-svg-utrongvlch8I48n8 .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-utrongvlch8I48n8 #aggregationEnd,#mermaid-svg-utrongvlch8I48n8 .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-utrongvlch8I48n8 .edgeTerminals{font-size:11px;}#mermaid-svg-utrongvlch8I48n8 :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
«interface»

Message

«interface»

RequestMessage

«interface»

ResponseMessage

«abstract»

AbstractMessage

«abstract»

AbstractResponseMessage

MessageHandler

abstract void handle(T message)

ChunkFetchRequest

OneWayMessage

RpcRequest

StreamRequest

ChunkFetchFailure

RpcFailure

StreamFailure

ChunkFetchSuccess

RpcResponse

StreamResponse

根据上面的类图可以看出,主要分类

  • AbstractMessage:抽象类,用于在单独的缓冲区中保存正文。其他消息类基本都继承该类。
  • RequestMessage:定义了从客户端到服务端的消息接口- ChunkFetchRequest:请求获取数据流中单个数据块的序列。这将对应一个响应信息(成功或失败)。- RpcRequest:由远程服务端 org.apache.spark.network.server.RpcHandler 处理的通用 RPC。这将对应一个响应信息(成功或失败)。- OneWayMessage:由远程服务端 org.apache.spark.network.server.RpcHandler 处理。不需要进行回复客户端。- StreamRequest:请求从远端流式传输数据。数据流 ID 是一个任意字符串,需要两个端点协商后才能流式传输数据
  • ResponseMessage:定义了从服务端到客户端的消息接口- AbstractResponseMessage:响应信息的抽象类。 - ChunkFetchSuccess:处理ChunkFetchRequest成功后返回的消息。- RpcResponse:处理RpcRequest成功后返回的消息。- StreamResponse:处理StreamRequest成功后返回的消息。- ChunkFetchFailure:处理ChunkFetchRequest失败后返回的消息。- RpcFailure:处理RpcRequest失败后返回的消息。- StreamFailure:处理StreamRequest失败后返回的消息。

client端请求和响应的流程

#mermaid-svg-tgJqsrIdtskNjh35 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-tgJqsrIdtskNjh35 .error-icon{fill:#552222;}#mermaid-svg-tgJqsrIdtskNjh35 .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-tgJqsrIdtskNjh35 .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-tgJqsrIdtskNjh35 .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-tgJqsrIdtskNjh35 .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-tgJqsrIdtskNjh35 .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-tgJqsrIdtskNjh35 .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-tgJqsrIdtskNjh35 .marker{fill:#333333;stroke:#333333;}#mermaid-svg-tgJqsrIdtskNjh35 .marker.cross{stroke:#333333;}#mermaid-svg-tgJqsrIdtskNjh35 svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-tgJqsrIdtskNjh35 .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-tgJqsrIdtskNjh35 .cluster-label text{fill:#333;}#mermaid-svg-tgJqsrIdtskNjh35 .cluster-label span{color:#333;}#mermaid-svg-tgJqsrIdtskNjh35 .label text,#mermaid-svg-tgJqsrIdtskNjh35 span{fill:#333;color:#333;}#mermaid-svg-tgJqsrIdtskNjh35 .node rect,#mermaid-svg-tgJqsrIdtskNjh35 .node circle,#mermaid-svg-tgJqsrIdtskNjh35 .node ellipse,#mermaid-svg-tgJqsrIdtskNjh35 .node polygon,#mermaid-svg-tgJqsrIdtskNjh35 .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-tgJqsrIdtskNjh35 .node .label{text-align:center;}#mermaid-svg-tgJqsrIdtskNjh35 .node.clickable{cursor:pointer;}#mermaid-svg-tgJqsrIdtskNjh35 .arrowheadPath{fill:#333333;}#mermaid-svg-tgJqsrIdtskNjh35 .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-tgJqsrIdtskNjh35 .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-tgJqsrIdtskNjh35 .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-tgJqsrIdtskNjh35 .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-tgJqsrIdtskNjh35 .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-tgJqsrIdtskNjh35 .cluster text{fill:#333;}#mermaid-svg-tgJqsrIdtskNjh35 .cluster span{color:#333;}#mermaid-svg-tgJqsrIdtskNjh35 div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-tgJqsrIdtskNjh35 :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
传输层

1.addRpcRequest或addFetchRequest

2.WriteAndFlush

IdleStateHandler

MessageDecoder

TransportFrameDecoder

TransportResponseHandler

TransportChannelHandler

Netty

MessageEncoder

TransportClient

Request

Response

server端处理请求和响应的流程

#mermaid-svg-f1APuzfNj2LgPlLA {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-f1APuzfNj2LgPlLA .error-icon{fill:#552222;}#mermaid-svg-f1APuzfNj2LgPlLA .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-f1APuzfNj2LgPlLA .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-f1APuzfNj2LgPlLA .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-f1APuzfNj2LgPlLA .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-f1APuzfNj2LgPlLA .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-f1APuzfNj2LgPlLA .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-f1APuzfNj2LgPlLA .marker{fill:#333333;stroke:#333333;}#mermaid-svg-f1APuzfNj2LgPlLA .marker.cross{stroke:#333333;}#mermaid-svg-f1APuzfNj2LgPlLA svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-f1APuzfNj2LgPlLA .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-f1APuzfNj2LgPlLA .cluster-label text{fill:#333;}#mermaid-svg-f1APuzfNj2LgPlLA .cluster-label span{color:#333;}#mermaid-svg-f1APuzfNj2LgPlLA .label text,#mermaid-svg-f1APuzfNj2LgPlLA span{fill:#333;color:#333;}#mermaid-svg-f1APuzfNj2LgPlLA .node rect,#mermaid-svg-f1APuzfNj2LgPlLA .node circle,#mermaid-svg-f1APuzfNj2LgPlLA .node ellipse,#mermaid-svg-f1APuzfNj2LgPlLA .node polygon,#mermaid-svg-f1APuzfNj2LgPlLA .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-f1APuzfNj2LgPlLA .node .label{text-align:center;}#mermaid-svg-f1APuzfNj2LgPlLA .node.clickable{cursor:pointer;}#mermaid-svg-f1APuzfNj2LgPlLA .arrowheadPath{fill:#333333;}#mermaid-svg-f1APuzfNj2LgPlLA .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-f1APuzfNj2LgPlLA .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-f1APuzfNj2LgPlLA .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-f1APuzfNj2LgPlLA .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-f1APuzfNj2LgPlLA .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-f1APuzfNj2LgPlLA .cluster text{fill:#333;}#mermaid-svg-f1APuzfNj2LgPlLA .cluster span{color:#333;}#mermaid-svg-f1APuzfNj2LgPlLA div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-f1APuzfNj2LgPlLA :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
传输层

IdleStateHandler

MessageDecoder

TransportFrameDecoder

TransportRequestHandler

TransportChannelHandler

StreamManager

RpcHandler

Netty

MessageEncoder

Request

Response

标签: spark rpc

本文转载自: https://blog.csdn.net/weixin_43820556/article/details/135157921
版权归原作者 顧棟 所有, 如有侵权,请联系我们删除。

“【Spark源码分析】Spark的RPC通信二-初稿”的评论:

还没有评论