Spark的RPC通信一-初稿
文章目录
Spark的RPC顶层设计
在
RpcEnv
中定义了RPC通信框架的启动、停止和关闭等抽象方法,表示RPC的顶层环境。唯一的子类
NettyRpcEnv
。
RpcEndpoints
需要向
RpcEnv
注册自己的名称,以便接收信息。然后,
RpcEnv
将处理从
RpcEndpointRef
或远程节点发送的信息,并将它们传送到相应的
RpcEndpoints
。对于
RpcEnv
捕捉到的未捕获异常,
RpcEnv
会使用
RpcCallContext.sendFailure
将异常发回给发送者,如果没有发送者或出现
NotSerializableException
,则记录异常。
RpcEnv
还提供了一些方法来检索给定名称或
uri
的
RpcEndpointRefs
。
#mermaid-svg-d0CZExuMoO08sEMx {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-d0CZExuMoO08sEMx .error-icon{fill:#552222;}#mermaid-svg-d0CZExuMoO08sEMx .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-d0CZExuMoO08sEMx .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-d0CZExuMoO08sEMx .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-d0CZExuMoO08sEMx .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-d0CZExuMoO08sEMx .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-d0CZExuMoO08sEMx .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-d0CZExuMoO08sEMx .marker{fill:#333333;stroke:#333333;}#mermaid-svg-d0CZExuMoO08sEMx .marker.cross{stroke:#333333;}#mermaid-svg-d0CZExuMoO08sEMx svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-d0CZExuMoO08sEMx g.classGroup text{fill:#9370DB;fill:#131300;stroke:none;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:10px;}#mermaid-svg-d0CZExuMoO08sEMx g.classGroup text .title{font-weight:bolder;}#mermaid-svg-d0CZExuMoO08sEMx .nodeLabel,#mermaid-svg-d0CZExuMoO08sEMx .edgeLabel{color:#131300;}#mermaid-svg-d0CZExuMoO08sEMx .edgeLabel .label rect{fill:#ECECFF;}#mermaid-svg-d0CZExuMoO08sEMx .label text{fill:#131300;}#mermaid-svg-d0CZExuMoO08sEMx .edgeLabel .label span{background:#ECECFF;}#mermaid-svg-d0CZExuMoO08sEMx .classTitle{font-weight:bolder;}#mermaid-svg-d0CZExuMoO08sEMx .node rect,#mermaid-svg-d0CZExuMoO08sEMx .node circle,#mermaid-svg-d0CZExuMoO08sEMx .node ellipse,#mermaid-svg-d0CZExuMoO08sEMx .node polygon,#mermaid-svg-d0CZExuMoO08sEMx .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-d0CZExuMoO08sEMx .divider{stroke:#9370DB;stroke:1;}#mermaid-svg-d0CZExuMoO08sEMx g.clickable{cursor:pointer;}#mermaid-svg-d0CZExuMoO08sEMx g.classGroup rect{fill:#ECECFF;stroke:#9370DB;}#mermaid-svg-d0CZExuMoO08sEMx g.classGroup line{stroke:#9370DB;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx .classLabel .box{stroke:none;stroke-width:0;fill:#ECECFF;opacity:0.5;}#mermaid-svg-d0CZExuMoO08sEMx .classLabel .label{fill:#9370DB;font-size:10px;}#mermaid-svg-d0CZExuMoO08sEMx .relation{stroke:#333333;stroke-width:1;fill:none;}#mermaid-svg-d0CZExuMoO08sEMx .dashed-line{stroke-dasharray:3;}#mermaid-svg-d0CZExuMoO08sEMx #compositionStart,#mermaid-svg-d0CZExuMoO08sEMx .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #compositionEnd,#mermaid-svg-d0CZExuMoO08sEMx .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #dependencyStart,#mermaid-svg-d0CZExuMoO08sEMx .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #dependencyStart,#mermaid-svg-d0CZExuMoO08sEMx .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #extensionStart,#mermaid-svg-d0CZExuMoO08sEMx .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #extensionEnd,#mermaid-svg-d0CZExuMoO08sEMx .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #aggregationStart,#mermaid-svg-d0CZExuMoO08sEMx .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx #aggregationEnd,#mermaid-svg-d0CZExuMoO08sEMx .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-d0CZExuMoO08sEMx .edgeTerminals{font-size:11px;}#mermaid-svg-d0CZExuMoO08sEMx :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
继承
继承
继承
继承
继承
继承
继承
继承
继承
继承
继承
Association
Association
Dispatcher
-endpoints
-endpointRefs
-receivers
-stopped
-threadpool
registerRpcEndpoint(name: String, endpoint: RpcEndpoint)
getRpcEndpointRef(endpoint: RpcEndpoint)
removeRpcEndpointRef(endpoint: RpcEndpoint)
unregisterRpcEndpoint(name: String)
postToAll(message: InboxMessage)
postMessage(endpointName: String,message: InboxMessage,callbackIfStopped: (Exception) => Unit)
«trait»
RpcEndpoint
rpcEnv: RpcEnv
self()
receive()
receiveAndReply(context: RpcCallContext)
onError(cause: Throwable)
onConnected(remoteAddress: RpcAddress)
onDisconnected(remoteAddress: RpcAddress)
onNetworkError(cause: Throwable, remoteAddress: RpcAddress)
onStart()
onStop()
stop()
«trait»
ThreadSafeRpcEndpoint
«trait»
RpcEnvFactory
create(config: RpcEnvConfig)
NettyRpcEnvFactory
create(config: RpcEnvConfig)
«abstract»
RpcEndpointRef
«abstract»
RpcEnv
NettyRpcEnv
-dispatcher: Dispatcher
-streamManager:NettyStreamManager
-transportContext:TransportContext
-clientFactory:TransportClientFactory
startServer(bindAddress: String, port: Int)
setupEndpoint(name: String, endpoint: RpcEndpoint)
send(message: RequestMessage)
ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout)
postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage)
asyncSetupEndpointRefByURI(uri: String)
DummyMaster
ClientEndpoint
DriverEndpoint
Master
Worker
LocalEndpoint
其他
NettyRpcEndpointRef
在
RpcEnvFactory
中定义了创建
RpcEnv
的抽象方法,在
NettyRpcEnv
和
NettyRpcEnvFactory
中使用Netty对继承的方式进行了实现。
在
NettRpcEnv
中启动终端点方法
setEndpoint
中,会将
RpcEndpoint
和
RpcEndpointRef
相互以键值对的形式存储到
ConcurrentHashMap
中,最后在
RpcEnv
的
object
类中通过反射方式实现创建
RpcEnv
的实例的静态方法。
核心类
NettyRpcEnv
NettyRpcEnv
的核心成员和核心方法
transportConf
:TransportConf
的实例对象,加载一些关于RPC的配置项dispatcher
:Dispatcher
的实例对象,消息转发器,将RPC消息路由到要该对此消息处理的RpcEndpoint
。streamManager
:NettyStreamManager
的实例对象,流的管理器,为NettyRpcEnv
提供流式服务。transportContext
:TransportContext
的实例对象clientFactory
: 用于构造发送和接收响应的TransportClient
fileDownloadFactory
: 用于文件下载的独立客户端工厂。这样可以避免使用与主 RPC 上下文相同的 RPC 处理程序,从而将这些客户端引起的事件与主 RPC 流量隔离开来。它还允许对某些属性进行不同的配置,例如每个对等节点的连接数。server
:TransportServer
,提供高效的底层流媒体服务。ConcurrentHashMap[RpcAddress, Outbox] outboxes
:远程地址与Outbox
的映射map。startServer(bindAddress: String, port: Int)
- 创建一个TransportServer
- 向消息转发器中注册RpcEndpointVerifier
,RpcEndpointVerifier
的注册名称为endpoint-verifier
,用来校验RpcEndpoint
是否存在的RpcEndpoint
服务send(message: RequestMessage): Unit
- 发送消息时,将本地消息交于InBox
,远程消息交于OutBox
ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout)
- 若请求消息的接收者的地址与当前的NettyRpcEnv
的地址相同,将消息交通过dispatcher.postLocalMessage(message, p)
方法处理,p中是成功和失败的回调函数。- 若请求消息的的接收者的地址与当前的NettyRpcEnv
的地址不同时,将消息通过postToOutbox(message.receiver, rpcMessage)
方法处理,主要是将消息放入outbox,然后传输到远程地址上。- 在方法的最后设定了一个定时器,实现消息请求的超时机制。postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage)
:将消息传到远程节点上 - 如果receiver.client
不为空,那么消息将直接通过TransportClient
发送到远端节点- 如果receiver.client
为空,则获取远端结点地址对应的Outbox
,若没有则新建一个- 如果NettyRpcEnv
已经停止,移除该Outbox
并停止,否则调用Outbox.send()
发送消息。
核心类
RpcEndpoint
RpcEndpoint
是对能够处理RPC请求,给某一特定服务提供本地调用及跨节点调用的RPC组件的抽象,所有运行于RPC框架之上的实体都应该继承
RpcEndpoint
。
RPC 的
RpcEndpoint
,它定义了给定消息时要触发的函数。保证按调用顺序为
onStart
、
receive
和
onStop
。
RpcEndpoint
的生命周期为
constructor -> onStart -> receive* -> onStop
。receive 可以并发调用。如果希望接收是线程安全的,则需要请使用
ThreadSafeRpcEndpoint
。如果
RpcEndpoint
方法(
onError
除外)抛出任何错误,
onError
将被调用并说明原因。如果
onError
抛出错误,
RpcEnv
会将忽略。
ThreadSafeRpcEndpoint
是继承自
RpcEndpoint
的特质,需要
RpcEnv
以线程安全方式向其发送消息的特性。主要用于对消息的处理,必须是线程安全的场景。
ThreadSafeRpcEndpoint
对消息的处理都是串行的,即前一条消息处理完才能接着处理下一条消息。
核心类
RpcEndpointRef
远程
RpcEndpoint
的引用。
RpcEndpointRef
是线程安全的。用于消息发送方持有并发送消息。
核心成员
maxRetries
:最大尝试连接次数。可以通过spark.rpc.numRetries
参数指定,默认3次retryWaitMs
:每次尝试连接最大等待毫秒值。可以通过spark.rpc.retry.wait
,默认3秒defaultAskTimeout
:RPC ask操作的超时时间。可以通过spark.rpc.askTimeout
,默认120秒address
:远程RpcEndpoint
引用的地址name
:远程RpcEndpoint
引用的名称
核心方法
send()
:发送单向异步信息。只管发送,不管结果。ask()
系列:向远程的RpcEndpoint.receiveAndReply()
方法发送消息,并带有超时机制的Future。该类方法只发送一次消息,从不重试。askSync()
系列:向相应的RpcEndpoint.receiveAndReply
发送消息,并在指定超时内获取结果,如果失败则抛出异常。 这是一个阻塞操作,可能会耗费大量时间,因此不要在RpcEndpoint
的消息循环中调用它。
NettyRpcEndpointRef
是其唯一的继承类。重写了
ask()
和
send()
方法,主要是消息封装成
RequestMessage
,然后通过
nettyEnv
的
ask
和
send
方法将消息发送出去。
客户端发送请求简单示例图
#mermaid-svg-8JNaie07WjKgUzlw {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-8JNaie07WjKgUzlw .error-icon{fill:#552222;}#mermaid-svg-8JNaie07WjKgUzlw .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-8JNaie07WjKgUzlw .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-8JNaie07WjKgUzlw .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-8JNaie07WjKgUzlw .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-8JNaie07WjKgUzlw .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-8JNaie07WjKgUzlw .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-8JNaie07WjKgUzlw .marker{fill:#333333;stroke:#333333;}#mermaid-svg-8JNaie07WjKgUzlw .marker.cross{stroke:#333333;}#mermaid-svg-8JNaie07WjKgUzlw svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-8JNaie07WjKgUzlw .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-8JNaie07WjKgUzlw .cluster-label text{fill:#333;}#mermaid-svg-8JNaie07WjKgUzlw .cluster-label span{color:#333;}#mermaid-svg-8JNaie07WjKgUzlw .label text,#mermaid-svg-8JNaie07WjKgUzlw span{fill:#333;color:#333;}#mermaid-svg-8JNaie07WjKgUzlw .node rect,#mermaid-svg-8JNaie07WjKgUzlw .node circle,#mermaid-svg-8JNaie07WjKgUzlw .node ellipse,#mermaid-svg-8JNaie07WjKgUzlw .node polygon,#mermaid-svg-8JNaie07WjKgUzlw .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-8JNaie07WjKgUzlw .node .label{text-align:center;}#mermaid-svg-8JNaie07WjKgUzlw .node.clickable{cursor:pointer;}#mermaid-svg-8JNaie07WjKgUzlw .arrowheadPath{fill:#333333;}#mermaid-svg-8JNaie07WjKgUzlw .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-8JNaie07WjKgUzlw .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-8JNaie07WjKgUzlw .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-8JNaie07WjKgUzlw .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-8JNaie07WjKgUzlw .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-8JNaie07WjKgUzlw .cluster text{fill:#333;}#mermaid-svg-8JNaie07WjKgUzlw .cluster span{color:#333;}#mermaid-svg-8JNaie07WjKgUzlw div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-8JNaie07WjKgUzlw :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
Client
Server
RpcEnv
outboxs
outbox
messages
RpcEnv
5
3
4
1
2
Dispatcher
receivers
threadpool
EndpointData
messages
1.1
1.2
1.2
1.3
EndpointData
MessageLoop
InboxMessage
Inbox
RpcEndpoint
NettyRpcEndpointRef
InboxMessage
EndpointData
MessageLoop
NettyRpcEndPointRef
outbox
outbox
TransportClient
OutboxMessage
OutboxMessage
RpcEndPoint
Dispatcher
- 若是向本地节点的RpcEndpoint发送消息 1. 通过调用
NettyRpcEndpointRef
的send()
和ask()
方法向本地节点的RpcEndpoint
发送消息。由于是在同一节点,所以直接调用Dispatcher
的postLocalMessage()
或postOneWayMessage()
方法将消息放入EndpointData
内部Inbox
的messages
中。2.InboxMessage
放入后Inbox
后,Inbox
所属的endPointData
就会放入receivers
一旦receivers
中有数据,原本阻塞的MessageLoop
就可以取到数据,3.MessageLoop
将调用inbox.process()
方法消息的处理。对不同的消息类型调用endpoint
的不同回调函数,即完成了消息的处理。 - 通过调用
NettyRpcEndpointRef
的send()
和ask()
方法向远端节点的RpcEndpoint
发送消息。消息将首先被封装为OutboxMessage
,然后放入到远端RpcEndpoint
的地址所对应的Outbox
的messages
中。 - 每个
Outbox
的drainOutbox()
方法通过循环,不断从messages
列表中取得OutboxMessage
,并通过TransportClient
发送,底层依赖Netty
。 TransportClient
和远端NettyRpcEnv
的TransportServer
建立了连接后,请求消息首先经过Netty管道的处理,由TransportChannelHandler
将消息分发给TransportRequestHandler
,最终会调用NettyRpcHandler
或StreamManager
处理。如果是RPC消息则会调用NettyRpcHandler.receive()
方法,之后与第一步所述一致,调用Dispatcher
的postRemoteMessage()
或``postOneWayMessage()`方法。- 如果
TransportRequestHandler
处理的是RpcRequest
,那么server端的TransportRequestHandler
处理消息时还会对client端进行响应,依赖Netty
将响应消息发送给client端。client端接收到消息时由TransportChannelHandler
将消息分发给TransportResponseHandler
处理。
Spark RPC消息的发送与接收实现
**
OutboxMessage
在客户端使用,是对外发送消息的封装。
InboxMessage
在服务端使用,是对接收消息的封装**。
#mermaid-svg-lBRXp7roQgEctFdG {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-lBRXp7roQgEctFdG .error-icon{fill:#552222;}#mermaid-svg-lBRXp7roQgEctFdG .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-lBRXp7roQgEctFdG .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-lBRXp7roQgEctFdG .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-lBRXp7roQgEctFdG .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-lBRXp7roQgEctFdG .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-lBRXp7roQgEctFdG .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-lBRXp7roQgEctFdG .marker{fill:#333333;stroke:#333333;}#mermaid-svg-lBRXp7roQgEctFdG .marker.cross{stroke:#333333;}#mermaid-svg-lBRXp7roQgEctFdG svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-lBRXp7roQgEctFdG g.classGroup text{fill:#9370DB;fill:#131300;stroke:none;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:10px;}#mermaid-svg-lBRXp7roQgEctFdG g.classGroup text .title{font-weight:bolder;}#mermaid-svg-lBRXp7roQgEctFdG .nodeLabel,#mermaid-svg-lBRXp7roQgEctFdG .edgeLabel{color:#131300;}#mermaid-svg-lBRXp7roQgEctFdG .edgeLabel .label rect{fill:#ECECFF;}#mermaid-svg-lBRXp7roQgEctFdG .label text{fill:#131300;}#mermaid-svg-lBRXp7roQgEctFdG .edgeLabel .label span{background:#ECECFF;}#mermaid-svg-lBRXp7roQgEctFdG .classTitle{font-weight:bolder;}#mermaid-svg-lBRXp7roQgEctFdG .node rect,#mermaid-svg-lBRXp7roQgEctFdG .node circle,#mermaid-svg-lBRXp7roQgEctFdG .node ellipse,#mermaid-svg-lBRXp7roQgEctFdG .node polygon,#mermaid-svg-lBRXp7roQgEctFdG .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-lBRXp7roQgEctFdG .divider{stroke:#9370DB;stroke:1;}#mermaid-svg-lBRXp7roQgEctFdG g.clickable{cursor:pointer;}#mermaid-svg-lBRXp7roQgEctFdG g.classGroup rect{fill:#ECECFF;stroke:#9370DB;}#mermaid-svg-lBRXp7roQgEctFdG g.classGroup line{stroke:#9370DB;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG .classLabel .box{stroke:none;stroke-width:0;fill:#ECECFF;opacity:0.5;}#mermaid-svg-lBRXp7roQgEctFdG .classLabel .label{fill:#9370DB;font-size:10px;}#mermaid-svg-lBRXp7roQgEctFdG .relation{stroke:#333333;stroke-width:1;fill:none;}#mermaid-svg-lBRXp7roQgEctFdG .dashed-line{stroke-dasharray:3;}#mermaid-svg-lBRXp7roQgEctFdG #compositionStart,#mermaid-svg-lBRXp7roQgEctFdG .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #compositionEnd,#mermaid-svg-lBRXp7roQgEctFdG .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #dependencyStart,#mermaid-svg-lBRXp7roQgEctFdG .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #dependencyStart,#mermaid-svg-lBRXp7roQgEctFdG .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #extensionStart,#mermaid-svg-lBRXp7roQgEctFdG .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #extensionEnd,#mermaid-svg-lBRXp7roQgEctFdG .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #aggregationStart,#mermaid-svg-lBRXp7roQgEctFdG .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG #aggregationEnd,#mermaid-svg-lBRXp7roQgEctFdG .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-lBRXp7roQgEctFdG .edgeTerminals{font-size:11px;}#mermaid-svg-lBRXp7roQgEctFdG :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
继承
继承
继承
继承
继承
继承
继承
继承
继承
聚合
聚合
Dispatcher
-endpoints
-endpointRefs
-receivers
-stopped
-threadpool
registerRpcEndpoint(name: String, endpoint: RpcEndpoint)
getRpcEndpointRef(endpoint: RpcEndpoint)
removeRpcEndpointRef(endpoint: RpcEndpoint)
unregisterRpcEndpoint(name: String)
postToAll(message: InboxMessage)
postMessage(endpointName: String,message: InboxMessage,callbackIfStopped: (Exception) => Unit)
EndpointData
MessageLoop
Inbox
#messages
-stopped
-enableConcurrent
-numActiveThreads
process(dispatcher: Dispatcher)
post(message: InboxMessage)
stop()
Outbox
-messages
-client
-connectFuture
-stopped
-draining
send(message: OutboxMessage)
-drainOutbox()
-launchConnectTask()
-handleNetworkFailure(e: Throwable)
-closeClient()
stop()
RpcMessage
OnStart
OnStop
RemoteProcessConnected
RemoteProcessDisconnected
RemoteProcessConnectionError
OneWayMessage
«trait»
InboxMessage
OneWayOutboxMessage
«trait»
OutboxMessage
sendWith(client: TransportClient)
onFailure(e: Throwable)
RpcOutboxMessage
**
InboxMessage
是一个scala特质类,所有的RPC消息都继承自
InboxMessage
**。下面是继承自
InboxMessage
的子类
OneWayMessage
:RpcEndpoint
处理此类型的消息后不需要向客户端回复信息。RpcMessage
:RpcEndpoint
处理完此消息后需要向客户端回复信息。OnStart
:Inbox
实例化后,再通知与此Inbox
相关联的RpcEndpoint
启动。OnStop
:Inbox
停止后,通知与此Inbox
相关联的RpcEndpoint
停止。RemoteProcessConnected
:告诉所有的RpcEndpoint
,有远端的进程已经与当前RPC服务建立了连接。RemoteProcessDisconnected
:告诉所有的RpcEndpoint
,有远端的进程已经与当前RPC服务断开了连接。RemoteProcessConnectionError
:告诉所有的RpcEndpoint
,与远端某个地址之间的连接发生了错误。
核心类
Inbox
**
Inbox
为
RpcEndpoint
存储了消息即
InboxMessage
,并线程安全地发送给
RpcEndPoint
**。
private[netty]class Inbox(val endpointRef: NettyRpcEndpointRef,val endpoint: RpcEndpoint)extends Logging {//相当于给this起了一个别名为inbox,
inbox =>}
重要的属性
messages
:所有的消息以消息盒子的方式,通过LinkedList链式存储enableConcurrent
:是否同时允许多线程同时处理消息numActiveThreads
:Inbox中正在处理消息的线程数
重要方法
post()
:将InboxMessage
投递到box中,从下面的代码可以看出使用了synchronized
保证线程安全,如果该box已经关闭,消息将会丢弃。def post(message: InboxMessage):Unit= inbox.synchronized {if(stopped){// 日志进行warning输出 onDrop(message)}else{ messages.add(message)false}}
process()
:处理存储在messages
中的消息。def process(dispatcher: Dispatcher):Unit={var message: InboxMessage =null// 1.以synchronized进行并发检查,开启并发则取消息,numActiveThreads自增1。 inbox.synchronized {if(!enableConcurrent && numActiveThreads !=0){return} message = messages.poll()if(message !=null){ numActiveThreads +=1}else{return}}while(true){// 安全回调?处理异常的 safelyCall(endpoint){//对不同消息,通过模式匹配进行通过不同的endpoint进行处理 message match{case RpcMessage(_sender, content, context)=>try{ endpoint.receiveAndReply(context).applyOrElse[Any,Unit](content,{ msg =>thrownew SparkException(s"Unsupported message $message from ${_sender}")})}catch{case e: Throwable => context.sendFailure(e)// Throw the exception -- this exception will be caught by the safelyCall function.// The endpoint's onError function will be called.throw e }case OneWayMessage(_sender, content)=> endpoint.receive.applyOrElse[Any,Unit](content,{ msg =>thrownew SparkException(s"Unsupported message $message from ${_sender}")})case OnStart => endpoint.onStart()if(!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]){ inbox.synchronized {if(!stopped){ enableConcurrent =true}}}case OnStop =>val activeThreads = inbox.synchronized { inbox.numActiveThreads } assert(activeThreads ==1,s"There should be only a single active thread but found $activeThreads threads.") dispatcher.removeRpcEndpointRef(endpoint) endpoint.onStop() assert(isEmpty,"OnStop should be the last message")case RemoteProcessConnected(remoteAddress)=> endpoint.onConnected(remoteAddress)case RemoteProcessDisconnected(remoteAddress)=> endpoint.onDisconnected(remoteAddress)case RemoteProcessConnectionError(cause, remoteAddress)=> endpoint.onNetworkError(cause, remoteAddress)}} inbox.synchronized {// 调用 `onStop` 后,"enableConcurrent "将被设置为 false,所以需要每次都检查它。if(!enableConcurrent && numActiveThreads !=1){// 此线程退出,降低并发,最终归于一个线程处理剩下的消息 numActiveThreads -=1return} message = messages.poll()// 没有消息之后,退出当前循环if(message ==null){ numActiveThreads -=1return}}}}
stop()
:enableConcurrent
赋值为false,保证当前是唯一活跃的线程。并在messages
中添加onStop
消息。def stop():Unit= inbox.synchronized {// 优雅关闭,是关闭并发只留一个线程处理消息。确保OnStop为最后一个消息,这样,"RpcEndpoint.onStop "就可以安全地释放资源了。if(!stopped){ enableConcurrent =false stopped =true messages.add(OnStop)}}
核心类
Dispatcher
**
Dispatcher
负责将RPC消息路由到要该对此消息处理的
RpcEndpoint
**。
内部类
EndpointData
:包装一个Inbox
类。一个RpcEndpoint与NettyRpcEndpointRef映射关联在一起。即一个Inbox只为一个映射关系服务。MessageLoop
:用于转发信息的循环任务类,从receivers中获取有消息的inbox进行处理。
重要属性
endpoints
:储存name
和EndpointData
的映射关系。EndpointData
包含了name
,RpcEndpoint
,NettyRpcEndpointRef
和Inbox
,采用ConcureentHashMap
保证线程安全endpointRefs
:储存RpcEndpoint
和RpcEndpointRef
的映射关系。采用ConcureentHashMap
保证线程安全receivers
:**存储inbox
中可能包含message的EndpointData
**。在MessageLoop
中取出并处理消息。使用阻塞队列LinkedBlockingQueue
存储。threadpool
:用于调度消息的线程池。根据spark.rpc.netty.dispatcher.numThreads
创建固定大小的线程池,启动与线程池大小相同个数的MessageLoop
任务。
重要方法
registerRpcEndpoint()
:在调度器中注册endpoint。由name
和RpcEndpoint
构建NettyRpcEndpointRef
,并加入到endpoints
,endpointRefs
,receivers
中postToAll()
:**将message投递到在注册到该Dispatcher
的所有RpcEndpoint
**。postMessage()
将message投递到注册到该Dispatcher
指定name的RpcEndpoint
中,并将EndpointData
放入receivers
中,该方法中还传入了失败回调函数unregisterRpcEndpoint()
,stop()
:注销所有已注册的RpcEndpoint
,从endpoints
中移除并在inbox
中增加了onstop
消息。在receivers
中插入哨兵,等待receivers
中的所有消息都处理完毕后,关闭线程池。
Dispatcher
中的消息处理流程。
postToAll()
或者postxx()
方法会调用postMessage()
方法将InboxMessage
放到对应endPointData
里inbox
的messages
列表(调用inbox.post()
)InboxMessage
放入后inbox
后,inbox
所属的endPointData
就会放入receivers
- 一旦
receivers
中有数据,原本阻塞的MessageLoop
就可以取到数据,因为receivers
是一个阻塞队列 MessageLoop
将调用inbox.process()
方法消息的处理。利用模式匹配,对不同的消息类型调用endpoint
的不同回调函数,即完成了消息的处理。
核心类
Outbox
OutboxMessage
是一个特质,内部只有未实现的
SendWith
方法和
onFailure
方法。
OneWayOutboxMessage
和
RpcOutboxMessage
都继承自
OutboxMessage
特质,**实现的
SendWith
通过调用
TransportClient
的
sendRpc()
方法发送信息**,其中
RpcOutboxMessage
还增加了超时和发送成功的回调方法。
Outbox
的重要属性
messages
: 保存要发送的OutboxMessage
。LinkedList
类型,线程不安全client
:TransportClient
stopped
: 当前Outbox
是否停止的标识draining
: 表示当前Outbox
内正有线程在处理messages
中消息的状态
重要方法
send()
:将要发送的OutboxMessage
首先保存到成员变量链表messages
中,若Outbox
未停止则调用drainOutbox()
方法处理messages
中的信息。因为messages
是LinkedList
类型,线程不安全,所以在添加和删除时使用了同步机制。之后调用了私有的drainOutbox()
方法发送消息。发送信息。如果没有活动连接,则缓存并启动新连接。如果[[发件箱]]被停止,发送者将收到[[SparkException]]通知。def send(message: OutboxMessage):Unit={val dropped = synchronized {if(stopped){true}else{ messages.add(message)false}}if(dropped){ message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))}else{ drainOutbox()}}
drainOutbox()
:先判断是否已停止,client是否空等前置条件。取出一条消息,并将draining
置为true,接下来将messages
中所有消息调用sendWith()
方法发送。耗尽消息队列。如果有其他线程正在排空,则直接退出。如果尚未建立连接,则在nettyEnv.clientConnectionExecutor
中启动一个任务来建立连接。launchConnectTask()
: 初始化client
stop()
:停止Outbox
- 将Outbox
的停止状态stopped置为true- 关闭TransportClient
- 清空messages
中的消息
**之所以要使用这种机制来发消息,是保证并发发送消息时,所有消息依次添加到
Outbox
中,并依次传输,同时不会阻塞
send()
方法**
版权归原作者 顧棟 所有, 如有侵权,请联系我们删除。