spark rpc框架
spark网络通信主要使用netty。
包括:spark各个组件之间的消息互通、用户文件和jar包上传、节点间的shuffle过程、bolck数据的复制和备份等。
spark 0.x.x和1.x.x组件间通信使用的akka,但是2.0.0后akka移除,使用netty。是因为依赖akka的具体版本,限制了用户对akka的使用。用户文件和jar包上传原本用的是Jetty,但是在2.0.0中也废弃了,改用netty实现了。节点间的shuffle过程、bolck数据的复制和备份沿用netty。在2.0.0中这几部分重新整合设计统一纳入spark rpc框架中。
network-common包(上图绿色的部分):
常用组件介绍
- TransportContext:传输上下文,包含了用于创建传输服务端(TransportServer)和传输客户端工厂(TransportClientFactory)的上下文信息,并支持使用Transport-ChannelHandler设置Netty提供的SocketChannel的Pipeline的实现。
- TransportConf:传输上下文的配置信息。
- RpcHandler:对调用传输客户端(TransportClient)的sendRPC方法发送的消息进行处理的程序。
- MessageEncoder:在将消息放入管道前,先对消息内容进行编码,防止管道另一端读取时丢包和解析错误。
- MessageDecoder:对从管道中读取的ByteBuf进行解析,防止丢包和解析错误。
- TransportFrameDecoder:对从管道中读取的ByteBuf按照数据帧进行解析。
- RpcResponseCallback:RpcHandler对请求的消息处理完毕后进行回调的接口。
- TransportClientFactory:创建TransportClient的传输客户端工厂类。
- ClientPool:在两个对等节点间维护的关于TransportClient的池子。ClientPool是TransportClientFactory的内部组件。
- TransportClient:RPC框架的客户端,用于获取预先协商好的流中的连续块。TransportClient旨在允许有效传输大量数据,这些数据将被拆分成几百KB到几MB的块。当TransportClient处理从流中获取的块时,实际的设置是在传输层之外完成的。sendRPC方法能够在客户端和服务端的同一水平线的通信进行这些设置。
- TransportClientBootstrap:当服务端响应客户端连接时在客户端执行一次的引导程序。
- TransportRequestHandler:用于处理客户端的请求并在写完块数据后返回的处理程序。
- TransportResponseHandler:用于处理服务端的响应,并且对发出请求的客户端进行响应的处理程序。
- TransportChannelHandler:代理由TransportRequestHandler处理的请求和由Transport-ResponseHandler处理的响应,并加入传输层的处理。
- TransportServerBootstrap:当客户端连接到服务端时在服务端执行一次的引导程序。[插图]TransportServer:RPC框架的服务端,提供高效的、低级别的流服务。
- TransportServer:RPC框架的服务端,提供高效的、低级别的流服务。
RPC配置TransportConf
TransportConf是rpc框架的配置参数的来源。
有两个成员变量:配置提供者conf和配置的模块名称module
TransportConf初始化是在SparkTransportConf类的fromSparkConf中。其中conf赋值是一个ConfigProvider的一个匿名内部类。
RPC客户端工厂TransportClientFactory
connectionPool的数据结构较为复杂,分析如下
connectionPool初始化是在TransportClientFactory构造器中this.connectionPool = new ConcurrentHashMap<>();
赋值是在createClient方法中,优先取connectionPool中存在的值,没有的话就将socket和新的clientpool放进去。
clientPool的大小是由numConnectionsPerPeer控制,是数组的大小,表示远程socket有几个client连接。
此时clientPool中clients变量还都是null。
clientPool中clients赋值在createClient方法后面。首先对指定位置分段加锁,createClient(resolvedAddress)创建新的client放到clients数组的指定位置。
大概结构如下图所示:
客户端引导程序TransportClientBootstrap
TransportClientFactory的clientBootstraps属性是TransportClientBootstrap的列表,是在创建TransportClientFactory的时候传入的参数。
Transport ClientBootstrap是在TransportClient上执行的客户端引导程序,主要对连接建立时进行一些初始化的准备(例如验证、加密)。有两个实现类:AuthClientBootstrap(spark用户认证,例如Kerberos认证)、SaslClientBootstrap(ssl认证)
doBootstrap有两个参数(client、channel),其实就是将channel绑定到client上。
SaslClientBootstrap解析
如果配置了sasl认证,添加channel。
addChannel会见inbound和outbound绑定到client的pipeline上。
inbound和outbound是netty中责任链设计,在netty编程中很常用。
创建RPC客户端TransportClient
- 创建unresolvedAddress从connectPool获取和这个地址对应的ClientPool。如果没有就创建一个,放到connectPool
- 从ClientPool随机挑选一个client。检查client是否存活,存活直接返回client并更新client的使用时间
- 没有获取到缓存的client,使用解析后的resolvedAddress创建client并放入clientPool。为了避免多个线程同时创建,对缓存的位置加锁
clientPool.clients[clientIndex] = createClient(resolvedAddress);实际创建client
- 使用根bootstrap进行配置
- 连接远程服务器
- 将clientBootstraps中client bootstrap 绑定到client上
- 返回新创建的client
RPC服务端TransportServer
TransportContext的createServer方法用于创建TransportServer,实际就是调用了TransportServer的构造器方法。
- 使用netty的根bootstrap配置
- 配置channel管道,将clientBootstraps中client bootstrap 绑定
- 绑定IP和端口到bootstrap
服务端引导程序TransportServerBootstrap
TransportServerBootstrap跟TransportClientBootstrap差不多,也是一个接口,主要方法是doBootstrap。
TransportServerBootstrap的实现类有两个。SaslServerBootstrap和AuthServerBootstrap
用SaslServerBootstrap举例:
SaslServerBootstrap的doBootstrap就是创建了一个SaslRpcHandler
SaslRpcHandler处理消息。如果认证已经通过,调用delegate的receive方法,
delegate就是SaslRpcHandler构造器传入的rpcHandler。
没有认证的话,就尝试认证。
认证步骤:
1.与saslServer进行SASL认证交换
2.添加加解密channel到pipeline上
RPC框架服务端处理请求、响应流程图如下:
RPC消息处理(TransportChannelHandler、TransportRequestHandler、TransportResponseHandler)
- TransportRequestHandler:用于处理客户端的请求并在写完块数据后返回的处理程序。
- TransportResponseHandler:用于处理服务端的响应,并且对发出请求的客户端进行响应的处理程序。
- TransportChannelHandler:代理由TransportRequestHandler处理的请求和由Transport-ResponseHandler处理的响应,并加入传输层的处理。
在pipeline初始化的时候生成TransportChannelHandler。
在pipeline初始化的时候先生成channelHandler,再将添加到pipeline中
最后生成的pipeline如下图:
createChannelHandler
1.生成responseHandler,用于client处理response消息
2.生成requestHandler,用于server处理request消息
3.responseHandler和requestHandler封装成channelHandler
TransportChannelHandler处理消息流程
TransportChannelHandler继承了netty中SimpleChannelInboundHandler,核心处理方法是channelRead0
如果消息是RequestMessage,调用requestHandler
如果消失是ResponseMessage,调用responseHandler
requestHandler
requestHandler处理request。进一步判断message类型。选择对应的处理方式。举例RpcRequest处理
调用rpcHandler的receive处理,并注册回调方法监听执行结果。
rpcHandler是一个接口类,具体的实现逻辑在spark不同的实现类中,不在network-common包。实现了基础网络通信隔离。
responseHandler
跟requestHandler一样,根据message不同类型来选择处理方式。
处理rpc消息:
1.根据requestId从缓存outstandingRpcs获取RpcResponseCallback
2.移除requestId,并调用RpcResponseCallback的onSuccess方法
消息体系Message
RequestMessage的具体实现有4种,分别如下。
- ChunkFetchRequest:请求获取流的单个块的序列。
- RpcRequest:此消息类型由远程的RPC服务端进行处理,是一种需要服务端向客户端回复的RPC请求信息类型。
- OneWayMessage:此消息也需要由远程的RPC服务端进行处理,与RpcRequest不同的是,不需要服务端向客户端回复。
- StreamRequest:此消息表示向远程的服务发起请求,以获取流式数据。
由于OneWayMessage不需要响应,所以ResponseMessage对于成功或失败状态的实现各有3种,分别如下。
- ChunkFetchSuccess:处理ChunkFetchRequest成功后返回的消息。
- ChunkFetchFailure:处理ChunkFetchRequest失败后返回的消息。
- RpcResponse:处理RpcRequest成功后返回的消息。
- RpcFailure:处理RpcRequest失败后返回的消息。
- StreamResponse:处理StreamRequest成功后返回的消息。
- StreamFailure:处理StreamRequest失败后返回的消息。
自定义buffer(ManagedBuffer)
ManagedBuffer定义的方法,可以将ManagedBuffer自由在nio和netty中进行转换。
- NioManagedBuffer:nio的ByteBuffer实现
- NettyManagedBuffer:netty的ByteBuf实现
- FileSegmentManagedBuffer:文件实现
client请求和响应流程
以rpc发送举例
发送rpc数据
1.生成requestId
2.handler添加(requestId、callback)到回调队列outstandingRpcs中
3.发送rpc消息到server,并注册listener。发送完成后调用listener的operationComplete方法处理发送成功或者失败的情况。
接收rpc返回数据
1.handler处理接收的数据,根据rpc类型处理rpcResponse
2.根据requestId获取回调方法
3.调用回调方法的onSuccess
版权归原作者 申尧强 所有, 如有侵权,请联系我们删除。