0


【Spark源码分析】Spark的RPC通信一-初稿

Spark的RPC通信一-初稿

文章目录

Spark的RPC顶层设计

RpcEnv

中定义了RPC通信框架的启动、停止和关闭等抽象方法,表示RPC的顶层环境。唯一的子类

NettyRpcEnv

RpcEndpoints

需要向

RpcEnv

注册自己的名称,以便接收信息。然后,

RpcEnv

将处理从

RpcEndpointRef

或远程节点发送的信息,并将它们传送到相应的

RpcEndpoints

。对于

RpcEnv

捕捉到的未捕获异常,

RpcEnv

会使用

RpcCallContext.sendFailure

将异常发回给发送者,如果没有发送者或出现

NotSerializableException

,则记录异常。

RpcEnv

还提供了一些方法来检索给定名称或

uri

RpcEndpointRefs

#mermaid-svg-d0CZExuMoO08sEMx {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-d0CZExuMoO08sEMx .error-icon{fill:#552222;}#mermaid-svg-d0CZExuMoO08sEMx .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-d0CZExuMoO08sEMx .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-d0CZExuMoO08sEMx .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-d0CZExuMoO08sEMx .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-d0CZExuMoO08sEMx .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-d0CZExuMoO08sEMx .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-d0CZExuMoO08sEMx .marker{fill:#333333;stroke:#333333;}#mermaid-svg-d0CZExuMoO08sEMx .marker.cross{stroke:#333333;}#mermaid-svg-d0CZExuMoO08sEMx svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-d0CZExuMoO08sEMx g.classGroup text{fill:#9370DB;fill:#131300;stroke:none;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:10px;}#mermaid-svg-d0CZExuMoO08sEMx g.classGroup text .title{font-weight:bolder;}#mermaid-svg-d0CZExuMoO08sEMx .nodeLabel,#mermaid-svg-d0CZExuMoO08sEMx .edgeLabel{color:#131300;}#mermaid-svg-d0CZExuMoO08sEMx .edgeLabel .label rect{fill:#ECECFF;}#mermaid-svg-d0CZExuMoO08sEMx .label text{fill:#131300;}#mermaid-svg-d0CZExuMoO08sEMx .edgeLabel .label span{background:#ECECFF;}#mermaid-svg-d0CZExuMoO08sEMx .classTitle{font-weight:bolder;}#mermaid-svg-d0CZExuMoO08sEMx .node rect,#mermaid-svg-d0CZExuMoO08sEMx .node circle,#mermaid-svg-d0CZExuMoO08sEMx .node ellipse,#mermaid-svg-d0CZExuMoO08sEMx .node polygon,#mermaid-svg-d0CZExuMoO08sEMx .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-d0CZExuMoO08sEMx .divider{stroke:#9370DB;stroke:1;}#mermaid-svg-d0CZExuMoO08sEMx g.clickable{cursor:pointer;}#mermaid-svg-d0CZExuMoO08sEMx g.classGroup rect{fill:#ECECFF;stroke:#9370DB;}#mermaid-svg-d0CZExuMoO08sEMx g.classGroup line{stroke:#9370DB;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx .classLabel .box{stroke:none;stroke-width:0;fill:#ECECFF;opacity:0.5;}#mermaid-svg-d0CZExuMoO08sEMx .classLabel .label{fill:#9370DB;font-size:10px;}#mermaid-svg-d0CZExuMoO08sEMx .relation{stroke:#333333;stroke-width:1;fill:none;}#mermaid-svg-d0CZExuMoO08sEMx .dashed-line{stroke-dasharray:3;}#mermaid-svg-d0CZExuMoO08sEMx #compositionStart,#mermaid-svg-d0CZExuMoO08sEMx .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #compositionEnd,#mermaid-svg-d0CZExuMoO08sEMx .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #dependencyStart,#mermaid-svg-d0CZExuMoO08sEMx .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #dependencyStart,#mermaid-svg-d0CZExuMoO08sEMx .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #extensionStart,#mermaid-svg-d0CZExuMoO08sEMx .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #extensionEnd,#mermaid-svg-d0CZExuMoO08sEMx .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #aggregationStart,#mermaid-svg-d0CZExuMoO08sEMx .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #aggregationEnd,#mermaid-svg-d0CZExuMoO08sEMx .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx .edgeTerminals{font-size:11px;}#mermaid-svg-d0CZExuMoO08sEMx :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
继承

继承

继承

继承

继承

继承

继承

继承

继承

继承

继承

Association

Association

Dispatcher

-endpoints

-endpointRefs

-receivers

-stopped

-threadpool

registerRpcEndpoint(name: String, endpoint: RpcEndpoint)

getRpcEndpointRef(endpoint: RpcEndpoint)

removeRpcEndpointRef(endpoint: RpcEndpoint)

unregisterRpcEndpoint(name: String)

postToAll(message: InboxMessage)

postMessage(endpointName: String,message: InboxMessage,callbackIfStopped: (Exception) => Unit)

«trait»

RpcEndpoint

rpcEnv: RpcEnv

self()

receive()

receiveAndReply(context: RpcCallContext)

onError(cause: Throwable)

onConnected(remoteAddress: RpcAddress)

onDisconnected(remoteAddress: RpcAddress)

onNetworkError(cause: Throwable, remoteAddress: RpcAddress)

onStart()

onStop()

stop()

«trait»

ThreadSafeRpcEndpoint

«trait»

RpcEnvFactory

create(config: RpcEnvConfig)

NettyRpcEnvFactory

create(config: RpcEnvConfig)

«abstract»

RpcEndpointRef

«abstract»

RpcEnv

NettyRpcEnv

-dispatcher: Dispatcher

-streamManager:NettyStreamManager

-transportContext:TransportContext

-clientFactory:TransportClientFactory

startServer(bindAddress: String, port: Int)

setupEndpoint(name: String, endpoint: RpcEndpoint)

send(message: RequestMessage)

ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout)

postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage)

asyncSetupEndpointRefByURI(uri: String)

DummyMaster

ClientEndpoint

DriverEndpoint

Master

Worker

LocalEndpoint

其他

NettyRpcEndpointRef

RpcEnvFactory

中定义了创建

RpcEnv

的抽象方法,在

NettyRpcEnv

NettyRpcEnvFactory

中使用Netty对继承的方式进行了实现。

NettRpcEnv

中启动终端点方法

setEndpoint

中,会将

RpcEndpoint

RpcEndpointRef

相互以键值对的形式存储到

ConcurrentHashMap

中,最后在

RpcEnv

object

类中通过反射方式实现创建

RpcEnv

的实例的静态方法。

核心类

NettyRpcEnv
NettyRpcEnv

的核心成员和核心方法

  • transportConfTransportConf的实例对象,加载一些关于RPC的配置项
  • dispatcherDispatcher的实例对象,消息转发器,将RPC消息路由到要该对此消息处理的RpcEndpoint
  • streamManagerNettyStreamManager的实例对象,流的管理器,为NettyRpcEnv提供流式服务。
  • transportContextTransportContext的实例对象
  • clientFactory: 用于构造发送和接收响应的TransportClient
  • fileDownloadFactory: 用于文件下载的独立客户端工厂。这样可以避免使用与主 RPC 上下文相同的 RPC 处理程序,从而将这些客户端引起的事件与主 RPC 流量隔离开来。它还允许对某些属性进行不同的配置,例如每个对等节点的连接数。
  • serverTransportServer,提供高效的底层流媒体服务。
  • ConcurrentHashMap[RpcAddress, Outbox] outboxes:远程地址与Outbox的映射map。
  • startServer(bindAddress: String, port: Int)- 创建一个TransportServer- 向消息转发器中注册RpcEndpointVerifierRpcEndpointVerifier的注册名称为endpoint-verifier,用来校验RpcEndpoint是否存在的RpcEndpoint服务
  • send(message: RequestMessage): Unit- 发送消息时,将本地消息交于InBox,远程消息交于OutBox
  • ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout)- 若请求消息的接收者的地址与当前的NettyRpcEnv的地址相同,将消息交通过dispatcher.postLocalMessage(message, p)方法处理,p中是成功和失败的回调函数。- 若请求消息的的接收者的地址与当前的NettyRpcEnv的地址不同时,将消息通过postToOutbox(message.receiver, rpcMessage)方法处理,主要是将消息放入outbox,然后传输到远程地址上。- 在方法的最后设定了一个定时器,实现消息请求的超时机制。
  • postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage):将消息传到远程节点上 - 如果receiver.client不为空,那么消息将直接通过TransportClient发送到远端节点- 如果receiver.client为空,则获取远端结点地址对应的Outbox,若没有则新建一个- 如果NettyRpcEnv已经停止,移除该Outbox并停止,否则调用Outbox.send()发送消息。

核心类

RpcEndpoint
RpcEndpoint

是对能够处理RPC请求,给某一特定服务提供本地调用及跨节点调用的RPC组件的抽象,所有运行于RPC框架之上的实体都应该继承

RpcEndpoint

RPC 的

RpcEndpoint

,它定义了给定消息时要触发的函数。保证按调用顺序为

onStart

receive

onStop

RpcEndpoint

的生命周期为

constructor -> onStart -> receive* -> onStop

。receive 可以并发调用。如果希望接收是线程安全的,则需要请使用

ThreadSafeRpcEndpoint

。如果

RpcEndpoint

方法(

onError

除外)抛出任何错误,

onError

将被调用并说明原因。如果

onError

抛出错误,

RpcEnv

会将忽略。

ThreadSafeRpcEndpoint

是继承自

RpcEndpoint

的特质,需要

RpcEnv

以线程安全方式向其发送消息的特性。主要用于对消息的处理,必须是线程安全的场景。

ThreadSafeRpcEndpoint

对消息的处理都是串行的,即前一条消息处理完才能接着处理下一条消息。

核心类

RpcEndpointRef

远程

RpcEndpoint

的引用。

RpcEndpointRef

是线程安全的。用于消息发送方持有并发送消息。

核心成员

  • maxRetries最大尝试连接次数。可以通过spark.rpc.numRetries参数指定,默认3次
  • retryWaitMs每次尝试连接最大等待毫秒值。可以通过spark.rpc.retry.wait,默认3秒
  • defaultAskTimeoutRPC ask操作的超时时间。可以通过spark.rpc.askTimeout,默认120秒
  • address:远程RpcEndpoint引用的地址
  • name:远程RpcEndpoint引用的名称

核心方法

  • send():发送单向异步信息。只管发送,不管结果。
  • ask()系列:向远程的RpcEndpoint.receiveAndReply()方法发送消息,并带有超时机制的Future。该类方法只发送一次消息,从不重试。
  • askSync()系列:向相应的 RpcEndpoint.receiveAndReply 发送消息,并在指定超时内获取结果,如果失败则抛出异常。 这是一个阻塞操作,可能会耗费大量时间,因此不要在 RpcEndpoint 的消息循环中调用它。
NettyRpcEndpointRef

是其唯一的继承类。重写了

ask()

send()

方法,主要是消息封装成

RequestMessage

,然后通过

nettyEnv

ask

send

方法将消息发送出去。

客户端发送请求简单示例图

#mermaid-svg-8JNaie07WjKgUzlw {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-8JNaie07WjKgUzlw .error-icon{fill:#552222;}#mermaid-svg-8JNaie07WjKgUzlw .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-8JNaie07WjKgUzlw .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-8JNaie07WjKgUzlw .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-8JNaie07WjKgUzlw .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-8JNaie07WjKgUzlw .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-8JNaie07WjKgUzlw .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-8JNaie07WjKgUzlw .marker{fill:#333333;stroke:#333333;}#mermaid-svg-8JNaie07WjKgUzlw .marker.cross{stroke:#333333;}#mermaid-svg-8JNaie07WjKgUzlw svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-8JNaie07WjKgUzlw .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-8JNaie07WjKgUzlw .cluster-label text{fill:#333;}#mermaid-svg-8JNaie07WjKgUzlw .cluster-label span{color:#333;}#mermaid-svg-8JNaie07WjKgUzlw .label text,#mermaid-svg-8JNaie07WjKgUzlw span{fill:#333;color:#333;}#mermaid-svg-8JNaie07WjKgUzlw .node rect,#mermaid-svg-8JNaie07WjKgUzlw .node circle,#mermaid-svg-8JNaie07WjKgUzlw .node ellipse,#mermaid-svg-8JNaie07WjKgUzlw .node polygon,#mermaid-svg-8JNaie07WjKgUzlw .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-8JNaie07WjKgUzlw .node .label{text-align:center;}#mermaid-svg-8JNaie07WjKgUzlw .node.clickable{cursor:pointer;}#mermaid-svg-8JNaie07WjKgUzlw .arrowheadPath{fill:#333333;}#mermaid-svg-8JNaie07WjKgUzlw .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-8JNaie07WjKgUzlw .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-8JNaie07WjKgUzlw .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-8JNaie07WjKgUzlw .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-8JNaie07WjKgUzlw .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-8JNaie07WjKgUzlw .cluster text{fill:#333;}#mermaid-svg-8JNaie07WjKgUzlw .cluster span{color:#333;}#mermaid-svg-8JNaie07WjKgUzlw div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-8JNaie07WjKgUzlw :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
Client

Server

RpcEnv

outboxs

outbox

messages

RpcEnv

5

3

4

1

2

Dispatcher

receivers

threadpool

EndpointData

messages

1.1

1.2

1.2

1.3

EndpointData

MessageLoop

InboxMessage

Inbox

RpcEndpoint

NettyRpcEndpointRef

InboxMessage

EndpointData

MessageLoop

NettyRpcEndPointRef

outbox

outbox

TransportClient

OutboxMessage

OutboxMessage

RpcEndPoint

Dispatcher

  1. 若是向本地节点的RpcEndpoint发送消息 1. 通过调用NettyRpcEndpointRefsend()ask()方法向本地节点的RpcEndpoint发送消息。由于是在同一节点,所以直接调用DispatcherpostLocalMessage()postOneWayMessage()方法将消息放入EndpointData内部Inboxmessages中。2. InboxMessage放入后Inbox后,Inbox所属的endPointData就会放入receivers一旦receivers中有数据,原本阻塞的MessageLoop就可以取到数据,3. MessageLoop将调用inbox.process()方法消息的处理。对不同的消息类型调用endpoint的不同回调函数,即完成了消息的处理。
  2. 通过调用NettyRpcEndpointRefsend()ask()方法向远端节点的RpcEndpoint发送消息。消息将首先被封装为OutboxMessage,然后放入到远端RpcEndpoint的地址所对应的Outboxmessages中。
  3. 每个OutboxdrainOutbox()方法通过循环,不断从messages列表中取得OutboxMessage,并通过TransportClient发送,底层依赖Netty
  4. TransportClient和远端NettyRpcEnvTransportServer建立了连接后,请求消息首先经过Netty管道的处理,由TransportChannelHandler将消息分发给TransportRequestHandler,最终会调用NettyRpcHandlerStreamManager处理。如果是RPC消息则会调用NettyRpcHandler.receive()方法,之后与第一步所述一致,调用DispatcherpostRemoteMessage()或``postOneWayMessage()`方法。
  5. 如果TransportRequestHandler处理的是RpcRequest,那么server端的TransportRequestHandler处理消息时还会对client端进行响应,依赖Netty将响应消息发送给client端。client端接收到消息时由TransportChannelHandler将消息分发给TransportResponseHandler处理。

Spark RPC消息的发送与接收实现

**

OutboxMessage

在客户端使用,是对外发送消息的封装。

InboxMessage

在服务端使用,是对接收消息的封装**。

#mermaid-svg-lBRXp7roQgEctFdG {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-lBRXp7roQgEctFdG .error-icon{fill:#552222;}#mermaid-svg-lBRXp7roQgEctFdG .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-lBRXp7roQgEctFdG .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-lBRXp7roQgEctFdG .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-lBRXp7roQgEctFdG .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-lBRXp7roQgEctFdG .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-lBRXp7roQgEctFdG .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-lBRXp7roQgEctFdG .marker{fill:#333333;stroke:#333333;}#mermaid-svg-lBRXp7roQgEctFdG .marker.cross{stroke:#333333;}#mermaid-svg-lBRXp7roQgEctFdG svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-lBRXp7roQgEctFdG g.classGroup text{fill:#9370DB;fill:#131300;stroke:none;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:10px;}#mermaid-svg-lBRXp7roQgEctFdG g.classGroup text .title{font-weight:bolder;}#mermaid-svg-lBRXp7roQgEctFdG .nodeLabel,#mermaid-svg-lBRXp7roQgEctFdG .edgeLabel{color:#131300;}#mermaid-svg-lBRXp7roQgEctFdG .edgeLabel .label rect{fill:#ECECFF;}#mermaid-svg-lBRXp7roQgEctFdG .label text{fill:#131300;}#mermaid-svg-lBRXp7roQgEctFdG .edgeLabel .label span{background:#ECECFF;}#mermaid-svg-lBRXp7roQgEctFdG .classTitle{font-weight:bolder;}#mermaid-svg-lBRXp7roQgEctFdG .node rect,#mermaid-svg-lBRXp7roQgEctFdG .node circle,#mermaid-svg-lBRXp7roQgEctFdG .node ellipse,#mermaid-svg-lBRXp7roQgEctFdG .node polygon,#mermaid-svg-lBRXp7roQgEctFdG .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-lBRXp7roQgEctFdG .divider{stroke:#9370DB;stroke:1;}#mermaid-svg-lBRXp7roQgEctFdG g.clickable{cursor:pointer;}#mermaid-svg-lBRXp7roQgEctFdG g.classGroup rect{fill:#ECECFF;stroke:#9370DB;}#mermaid-svg-lBRXp7roQgEctFdG g.classGroup line{stroke:#9370DB;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG .classLabel .box{stroke:none;stroke-width:0;fill:#ECECFF;opacity:0.5;}#mermaid-svg-lBRXp7roQgEctFdG .classLabel .label{fill:#9370DB;font-size:10px;}#mermaid-svg-lBRXp7roQgEctFdG .relation{stroke:#333333;stroke-width:1;fill:none;}#mermaid-svg-lBRXp7roQgEctFdG .dashed-line{stroke-dasharray:3;}#mermaid-svg-lBRXp7roQgEctFdG #compositionStart,#mermaid-svg-lBRXp7roQgEctFdG .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #compositionEnd,#mermaid-svg-lBRXp7roQgEctFdG .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #dependencyStart,#mermaid-svg-lBRXp7roQgEctFdG .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #dependencyStart,#mermaid-svg-lBRXp7roQgEctFdG .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #extensionStart,#mermaid-svg-lBRXp7roQgEctFdG .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #extensionEnd,#mermaid-svg-lBRXp7roQgEctFdG .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #aggregationStart,#mermaid-svg-lBRXp7roQgEctFdG .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #aggregationEnd,#mermaid-svg-lBRXp7roQgEctFdG .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG .edgeTerminals{font-size:11px;}#mermaid-svg-lBRXp7roQgEctFdG :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
继承

继承

继承

继承

继承

继承

继承

继承

继承

聚合

聚合

Dispatcher

-endpoints

-endpointRefs

-receivers

-stopped

-threadpool

registerRpcEndpoint(name: String, endpoint: RpcEndpoint)

getRpcEndpointRef(endpoint: RpcEndpoint)

removeRpcEndpointRef(endpoint: RpcEndpoint)

unregisterRpcEndpoint(name: String)

postToAll(message: InboxMessage)

postMessage(endpointName: String,message: InboxMessage,callbackIfStopped: (Exception) => Unit)

EndpointData

MessageLoop

Inbox

#messages

-stopped

-enableConcurrent

-numActiveThreads

process(dispatcher: Dispatcher)

post(message: InboxMessage)

stop()

Outbox

-messages

-client

-connectFuture

-stopped

-draining

send(message: OutboxMessage)

-drainOutbox()

-launchConnectTask()

-handleNetworkFailure(e: Throwable)

-closeClient()

stop()

RpcMessage

OnStart

OnStop

RemoteProcessConnected

RemoteProcessDisconnected

RemoteProcessConnectionError

OneWayMessage

«trait»

InboxMessage

OneWayOutboxMessage

«trait»

OutboxMessage

sendWith(client: TransportClient)

onFailure(e: Throwable)

RpcOutboxMessage

**

InboxMessage

是一个scala特质类,所有的RPC消息都继承自

InboxMessage

**。下面是继承自

InboxMessage

的子类

  • OneWayMessageRpcEndpoint处理此类型的消息后不需要向客户端回复信息。
  • RpcMessageRpcEndpoint处理完此消息后需要向客户端回复信息。
  • OnStartInbox实例化后,再通知与此Inbox相关联的RpcEndpoint启动。
  • OnStopInbox停止后,通知与此Inbox相关联的RpcEndpoint停止。
  • RemoteProcessConnected:告诉所有的RpcEndpoint,有远端的进程已经与当前RPC服务建立了连接。
  • RemoteProcessDisconnected:告诉所有的RpcEndpoint,有远端的进程已经与当前RPC服务断开了连接。
  • RemoteProcessConnectionError:告诉所有的RpcEndpoint,与远端某个地址之间的连接发生了错误。

核心类

Inbox

**

Inbox

RpcEndpoint

存储了消息即

InboxMessage

,并线程安全地发送给

RpcEndPoint

**。

private[netty]class Inbox(val endpointRef: NettyRpcEndpointRef,val endpoint: RpcEndpoint)extends Logging {//相当于给this起了一个别名为inbox,
  inbox =>}

重要的属性

  • messages所有的消息以消息盒子的方式,通过LinkedList链式存储
  • enableConcurrent是否同时允许多线程同时处理消息
  • numActiveThreadsInbox中正在处理消息的线程数

重要方法

  • post()InboxMessage投递到box中,从下面的代码可以看出使用了synchronized保证线程安全,如果该box已经关闭,消息将会丢弃。def post(message: InboxMessage):Unit= inbox.synchronized {if(stopped){// 日志进行warning输出 onDrop(message)}else{ messages.add(message)false}}
  • process()处理存储在messages中的消息def process(dispatcher: Dispatcher):Unit={var message: InboxMessage =null// 1.以synchronized进行并发检查,开启并发则取消息,numActiveThreads自增1。 inbox.synchronized {if(!enableConcurrent && numActiveThreads !=0){return} message = messages.poll()if(message !=null){ numActiveThreads +=1}else{return}}while(true){// 安全回调?处理异常的 safelyCall(endpoint){//对不同消息,通过模式匹配进行通过不同的endpoint进行处理 message match{case RpcMessage(_sender, content, context)=>try{ endpoint.receiveAndReply(context).applyOrElse[Any,Unit](content,{ msg =>thrownew SparkException(s"Unsupported message $message from ${_sender}")})}catch{case e: Throwable => context.sendFailure(e)// Throw the exception -- this exception will be caught by the safelyCall function.// The endpoint's onError function will be called.throw e }case OneWayMessage(_sender, content)=> endpoint.receive.applyOrElse[Any,Unit](content,{ msg =>thrownew SparkException(s"Unsupported message $message from ${_sender}")})case OnStart => endpoint.onStart()if(!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]){ inbox.synchronized {if(!stopped){ enableConcurrent =true}}}case OnStop =>val activeThreads = inbox.synchronized { inbox.numActiveThreads } assert(activeThreads ==1,s"There should be only a single active thread but found $activeThreads threads.") dispatcher.removeRpcEndpointRef(endpoint) endpoint.onStop() assert(isEmpty,"OnStop should be the last message")case RemoteProcessConnected(remoteAddress)=> endpoint.onConnected(remoteAddress)case RemoteProcessDisconnected(remoteAddress)=> endpoint.onDisconnected(remoteAddress)case RemoteProcessConnectionError(cause, remoteAddress)=> endpoint.onNetworkError(cause, remoteAddress)}} inbox.synchronized {// 调用 `onStop` 后,"enableConcurrent "将被设置为 false,所以需要每次都检查它。if(!enableConcurrent && numActiveThreads !=1){// 此线程退出,降低并发,最终归于一个线程处理剩下的消息 numActiveThreads -=1return} message = messages.poll()// 没有消息之后,退出当前循环if(message ==null){ numActiveThreads -=1return}}}}
  • stop()enableConcurrent赋值为false,保证当前是唯一活跃的线程。并在messages中添加onStop消息。def stop():Unit= inbox.synchronized {// 优雅关闭,是关闭并发只留一个线程处理消息。确保OnStop为最后一个消息,这样,"RpcEndpoint.onStop "就可以安全地释放资源了。if(!stopped){ enableConcurrent =false stopped =true messages.add(OnStop)}}

核心类

Dispatcher

**

Dispatcher

负责将RPC消息路由到要该对此消息处理的

RpcEndpoint

**。

内部类

  • EndpointData:包装一个Inbox类。一个RpcEndpoint与NettyRpcEndpointRef映射关联在一起。即一个Inbox只为一个映射关系服务。
  • MessageLoop:用于转发信息的循环任务类,从receivers中获取有消息的inbox进行处理。

重要属性

  • endpoints储存nameEndpointData的映射关系EndpointData包含了nameRpcEndpoint, NettyRpcEndpointRefInbox,采用ConcureentHashMap保证线程安全
  • endpointRefs储存RpcEndpointRpcEndpointRef的映射关系。采用ConcureentHashMap保证线程安全
  • receivers:**存储inbox中可能包含message的EndpointData**。在MessageLoop中取出并处理消息。使用阻塞队列LinkedBlockingQueue存储。
  • threadpool用于调度消息的线程池。根据spark.rpc.netty.dispatcher.numThreads创建固定大小的线程池,启动与线程池大小相同个数的MessageLoop任务。

重要方法

  • registerRpcEndpoint()在调度器中注册endpoint。由nameRpcEndpoint构建NettyRpcEndpointRef,并加入到endpoints, endpointRefs, receivers
  • postToAll():**将message投递到在注册到该Dispatcher的所有RpcEndpoint**。postMessage()将message投递到注册到该Dispatcher指定name的RpcEndpoint中,并将EndpointData放入receivers中,该方法中还传入了失败回调函数
  • unregisterRpcEndpoint(), stop():注销所有已注册的RpcEndpoint,从endpoints中移除并在inbox中增加了onstop消息。在receivers中插入哨兵,等待receivers中的所有消息都处理完毕后,关闭线程池。
Dispatcher

中的消息处理流程。

  1. postToAll()或者postxx()方法会调用postMessage()方法将InboxMessage放到对应endPointDatainboxmessages列表(调用inbox.post())
  2. InboxMessage放入后inbox后,inbox所属的endPointData就会放入receivers
  3. 一旦receivers中有数据,原本阻塞的MessageLoop就可以取到数据,因为receivers是一个阻塞队列
  4. MessageLoop将调用inbox.process()方法消息的处理。利用模式匹配,对不同的消息类型调用endpoint的不同回调函数,即完成了消息的处理。

核心类

Outbox
OutboxMessage

是一个特质,内部只有未实现的

SendWith

方法和

onFailure

方法。

OneWayOutboxMessage

RpcOutboxMessage

都继承自

OutboxMessage

特质,**实现的

SendWith

通过调用

TransportClient

sendRpc()

方法发送信息**,其中

RpcOutboxMessage

还增加了超时和发送成功的回调方法。

Outbox

的重要属性

  • messages: 保存要发送的OutboxMessageLinkedList类型,线程不安全
  • client: TransportClient
  • stopped: 当前Outbox是否停止的标识
  • draining: 表示当前Outbox内正有线程在处理messages中消息的状态

重要方法

  • send():将要发送的OutboxMessage首先保存到成员变量链表messages中,若Outbox未停止则调用drainOutbox()方法处理messages中的信息。因为messagesLinkedList类型,线程不安全,所以在添加和删除时使用了同步机制。之后调用了私有的drainOutbox()方法发送消息。发送信息。如果没有活动连接,则缓存并启动新连接。如果[[发件箱]]被停止,发送者将收到[[SparkException]]通知。def send(message: OutboxMessage):Unit={val dropped = synchronized {if(stopped){true}else{ messages.add(message)false}}if(dropped){ message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))}else{ drainOutbox()}}
  • drainOutbox():先判断是否已停止,client是否空等前置条件。取出一条消息,并将draining置为true,接下来将messages中所有消息调用sendWith()方法发送。耗尽消息队列。如果有其他线程正在排空,则直接退出。如果尚未建立连接,则在 nettyEnv.clientConnectionExecutor 中启动一个任务来建立连接。
  • launchConnectTask(): 初始化client
  • stop():停止Outbox- 将Outbox的停止状态stopped置为true- 关闭TransportClient- 清空messages中的消息

**之所以要使用这种机制来发消息,是保证并发发送消息时,所有消息依次添加到

Outbox

中,并依次传输,同时不会阻塞

send()

方法**

标签: spark rpc

本文转载自: https://blog.csdn.net/weixin_43820556/article/details/135003173
版权归原作者 顧棟 所有, 如有侵权,请联系我们删除。

“【Spark源码分析】Spark的RPC通信一-初稿”的评论:

还没有评论