0


spark rpc(网络通信)

spark rpc框架

image.png
spark网络通信主要使用netty。
包括:spark各个组件之间的消息互通、用户文件和jar包上传、节点间的shuffle过程、bolck数据的复制和备份等。
spark 0.x.x和1.x.x组件间通信使用的akka,但是2.0.0后akka移除,使用netty。是因为依赖akka的具体版本,限制了用户对akka的使用。用户文件和jar包上传原本用的是Jetty,但是在2.0.0中也废弃了,改用netty实现了。节点间的shuffle过程、bolck数据的复制和备份沿用netty。在2.0.0中这几部分重新整合设计统一纳入spark rpc框架中。
network-common包(上图绿色的部分):
image.png

常用组件介绍

  • TransportContext:传输上下文,包含了用于创建传输服务端(TransportServer)和传输客户端工厂(TransportClientFactory)的上下文信息,并支持使用Transport-ChannelHandler设置Netty提供的SocketChannel的Pipeline的实现。
  • TransportConf:传输上下文的配置信息。
  • RpcHandler:对调用传输客户端(TransportClient)的sendRPC方法发送的消息进行处理的程序。
  • MessageEncoder:在将消息放入管道前,先对消息内容进行编码,防止管道另一端读取时丢包和解析错误。
  • MessageDecoder:对从管道中读取的ByteBuf进行解析,防止丢包和解析错误。
  • TransportFrameDecoder:对从管道中读取的ByteBuf按照数据帧进行解析。
  • RpcResponseCallback:RpcHandler对请求的消息处理完毕后进行回调的接口。
  • TransportClientFactory:创建TransportClient的传输客户端工厂类。
  • ClientPool:在两个对等节点间维护的关于TransportClient的池子。ClientPool是TransportClientFactory的内部组件。
  • TransportClient:RPC框架的客户端,用于获取预先协商好的流中的连续块。TransportClient旨在允许有效传输大量数据,这些数据将被拆分成几百KB到几MB的块。当TransportClient处理从流中获取的块时,实际的设置是在传输层之外完成的。sendRPC方法能够在客户端和服务端的同一水平线的通信进行这些设置。
  • TransportClientBootstrap:当服务端响应客户端连接时在客户端执行一次的引导程序。
  • TransportRequestHandler:用于处理客户端的请求并在写完块数据后返回的处理程序。
  • TransportResponseHandler:用于处理服务端的响应,并且对发出请求的客户端进行响应的处理程序。
  • TransportChannelHandler:代理由TransportRequestHandler处理的请求和由Transport-ResponseHandler处理的响应,并加入传输层的处理。
  • TransportServerBootstrap:当客户端连接到服务端时在服务端执行一次的引导程序。[插图]TransportServer:RPC框架的服务端,提供高效的、低级别的流服务。
  • TransportServer:RPC框架的服务端,提供高效的、低级别的流服务。

image.png

RPC配置TransportConf

TransportConf是rpc框架的配置参数的来源。
有两个成员变量:配置提供者conf和配置的模块名称module
TransportConf初始化是在SparkTransportConf类的fromSparkConf中。其中conf赋值是一个ConfigProvider的一个匿名内部类。
image.png
image.png

RPC客户端工厂TransportClientFactory

image.png

connectionPool的数据结构较为复杂,分析如下
image.png
connectionPool初始化是在TransportClientFactory构造器中this.connectionPool = new ConcurrentHashMap<>();
赋值是在createClient方法中,优先取connectionPool中存在的值,没有的话就将socket和新的clientpool放进去。
clientPool的大小是由numConnectionsPerPeer控制,是数组的大小,表示远程socket有几个client连接。
此时clientPool中clients变量还都是null。
image.png
clientPool中clients赋值在createClient方法后面。首先对指定位置分段加锁,createClient(resolvedAddress)创建新的client放到clients数组的指定位置。
image.png
大概结构如下图所示:
image.png

客户端引导程序TransportClientBootstrap

TransportClientFactory的clientBootstraps属性是TransportClientBootstrap的列表,是在创建TransportClientFactory的时候传入的参数。
Transport ClientBootstrap是在TransportClient上执行的客户端引导程序,主要对连接建立时进行一些初始化的准备(例如验证、加密)。有两个实现类:AuthClientBootstrap(spark用户认证,例如Kerberos认证)、SaslClientBootstrap(ssl认证)
doBootstrap有两个参数(client、channel),其实就是将channel绑定到client上。
image.png
SaslClientBootstrap解析
如果配置了sasl认证,添加channel。
image.png
addChannel会见inbound和outbound绑定到client的pipeline上。
inbound和outbound是netty中责任链设计,在netty编程中很常用。
image.png

创建RPC客户端TransportClient

  1. 创建unresolvedAddress从connectPool获取和这个地址对应的ClientPool。如果没有就创建一个,放到connectPool
  2. 从ClientPool随机挑选一个client。检查client是否存活,存活直接返回client并更新client的使用时间
  3. 没有获取到缓存的client,使用解析后的resolvedAddress创建client并放入clientPool。为了避免多个线程同时创建,对缓存的位置加锁

image.png
clientPool.clients[clientIndex] = createClient(resolvedAddress);实际创建client

  1. 使用根bootstrap进行配置
  2. 连接远程服务器
  3. 将clientBootstraps中client bootstrap 绑定到client上
  4. 返回新创建的client

image.png

RPC服务端TransportServer

TransportContext的createServer方法用于创建TransportServer,实际就是调用了TransportServer的构造器方法。

  1. 使用netty的根bootstrap配置
  2. 配置channel管道,将clientBootstraps中client bootstrap 绑定
  3. 绑定IP和端口到bootstrap

image.png
image.png

服务端引导程序TransportServerBootstrap

TransportServerBootstrap跟TransportClientBootstrap差不多,也是一个接口,主要方法是doBootstrap。
image.png
TransportServerBootstrap的实现类有两个。SaslServerBootstrap和AuthServerBootstrap
用SaslServerBootstrap举例:
SaslServerBootstrap的doBootstrap就是创建了一个SaslRpcHandler
image.png
SaslRpcHandler处理消息。如果认证已经通过,调用delegate的receive方法,
delegate就是SaslRpcHandler构造器传入的rpcHandler。
没有认证的话,就尝试认证。
image.png
认证步骤:
1.与saslServer进行SASL认证交换
2.添加加解密channel到pipeline上
image.png
image.png
RPC框架服务端处理请求、响应流程图如下:
image.png

RPC消息处理(TransportChannelHandler、TransportRequestHandler、TransportResponseHandler)

  • TransportRequestHandler:用于处理客户端的请求并在写完块数据后返回的处理程序。
  • TransportResponseHandler:用于处理服务端的响应,并且对发出请求的客户端进行响应的处理程序。
  • TransportChannelHandler:代理由TransportRequestHandler处理的请求和由Transport-ResponseHandler处理的响应,并加入传输层的处理。

在pipeline初始化的时候生成TransportChannelHandler。
image.pngimage.png
在pipeline初始化的时候先生成channelHandler,再将添加到pipeline中
image.png
最后生成的pipeline如下图:
image.png
createChannelHandler
1.生成responseHandler,用于client处理response消息
2.生成requestHandler,用于server处理request消息
3.responseHandler和requestHandler封装成channelHandler
image.png
TransportChannelHandler处理消息流程
TransportChannelHandler继承了netty中SimpleChannelInboundHandler,核心处理方法是channelRead0
如果消息是RequestMessage,调用requestHandler
如果消失是ResponseMessage,调用responseHandler
image.png

requestHandler

requestHandler处理request。进一步判断message类型。选择对应的处理方式。举例RpcRequest处理
image.png
调用rpcHandler的receive处理,并注册回调方法监听执行结果。
image.png
rpcHandler是一个接口类,具体的实现逻辑在spark不同的实现类中,不在network-common包。实现了基础网络通信隔离。
image.png

responseHandler

跟requestHandler一样,根据message不同类型来选择处理方式。
处理rpc消息:
1.根据requestId从缓存outstandingRpcs获取RpcResponseCallback
2.移除requestId,并调用RpcResponseCallback的onSuccess方法
image.png

消息体系Message

image.png
image.png
image.png
RequestMessage的具体实现有4种,分别如下。

  • ChunkFetchRequest:请求获取流的单个块的序列。
  • RpcRequest:此消息类型由远程的RPC服务端进行处理,是一种需要服务端向客户端回复的RPC请求信息类型。
  • OneWayMessage:此消息也需要由远程的RPC服务端进行处理,与RpcRequest不同的是,不需要服务端向客户端回复。
  • StreamRequest:此消息表示向远程的服务发起请求,以获取流式数据。

由于OneWayMessage不需要响应,所以ResponseMessage对于成功或失败状态的实现各有3种,分别如下。

  • ChunkFetchSuccess:处理ChunkFetchRequest成功后返回的消息。
  • ChunkFetchFailure:处理ChunkFetchRequest失败后返回的消息。
  • RpcResponse:处理RpcRequest成功后返回的消息。
  • RpcFailure:处理RpcRequest失败后返回的消息。
  • StreamResponse:处理StreamRequest成功后返回的消息。
  • StreamFailure:处理StreamRequest失败后返回的消息。

自定义buffer(ManagedBuffer)

image.png
ManagedBuffer定义的方法,可以将ManagedBuffer自由在nio和netty中进行转换。

  • NioManagedBuffer:nio的ByteBuffer实现
  • NettyManagedBuffer:netty的ByteBuf实现
  • FileSegmentManagedBuffer:文件实现

image.png

client请求和响应流程

以rpc发送举例
发送rpc数据
1.生成requestId
2.handler添加(requestId、callback)到回调队列outstandingRpcs中
3.发送rpc消息到server,并注册listener。发送完成后调用listener的operationComplete方法处理发送成功或者失败的情况。
image.png
接收rpc返回数据
1.handler处理接收的数据,根据rpc类型处理rpcResponse
2.根据requestId获取回调方法
3.调用回调方法的onSuccess
image.png

image.png

标签: spark rpc 大数据

本文转载自: https://blog.csdn.net/weixin_43839095/article/details/135112102
版权归原作者 申尧强 所有, 如有侵权,请联系我们删除。

“spark rpc(网络通信)”的评论:

还没有评论