文章目录
我们从整体的角度看一下Flink RPC通信框架的设计与实现,了解其底层Akka通信框架的基础概念及二者之间的关系。
1. Akka基本概念与Actor模型
Akka是使用Scala语言编写的库,用于在JVM上简化编写具有可容错、高可伸缩性的Java或Scala的Actor模型。Akka基于Actor模型,提供了一个用于构建可扩展、弹性、快速响应的应用程序的平台。
Actor 模型是一种并发计算模型,Actor 模型的核心思想是将计算单元抽象为独立的并发实体,称为 “actors”,这些 actors 之间通过消息传递进行通信。
以下是 Actor 模型的一些关键概念:
- Actor:Actor 是计算模型的基本执行单元。每个 Actor 都有自己的状态、行为和邮箱(用于接收消息)。Actor 之间是相互独立的,它们通过消息传递进行通信。
- 消息传递:在 Actor 模型中,通信是通过消息传递来实现的。一个 Actor 可以向另一个 Actor 发送消息,消息包含了要执行的操作或者改变状态的请求。这种异步消息传递使得系统更具有弹性和可伸缩性。
- 地址:每个 Actor 都有一个唯一的地址,用于唯一标识该 Actor。其他 Actor 可以使用地址向目标 Actor 发送消息。
- 邮箱:每个 Actor 都有一个邮箱,
用于存储接收到的消息
。Actor 处理消息的速度可能不同,但由于消息传递是异步的,这不会阻塞发送者。- 行为:Actor 的行为定义了对消息的
响应方式
,包括状态的修改、消息的处理等。行为可以随着时间和接收到的消息而动态变化。
Actor由状态(State)、行为(Behavior)和邮箱(Mailbox)三部分组成。
actors和其他actors通过发送异步消息通信。Actor模型的强大来自于异步。它也可以显式等待响应,这使得可以执行同步操作。但是,强烈不建议同步消息,因为它们限制了系统的伸缩性(?怎么实现的伸缩性)。
actor系统
每个actor是一个单一的线程,它不断地从其邮箱中poll(拉取)消息,并且连续不断地处理。对于已经处理过的消息的结果,actor可以改变它自身的内部状态或者发送一个新消息或者
孵化一个新的actor
。
2. Akka相关demo
2.1. 创建Akka系统
Akka系统的核心组件包括ActorSystem和Actor,构建一个Akka系统,首先需要创建ActorSystem,然后通过ActorSystem创建Actor。
需要注意的是:
- Akka不允许直接创建Actor实例,只能通过ActorSystem.actorOf和ActorContext.actorOf等特定接口创建Actor。
- 只能通过ActorRef与Actor进行通信,ActorRef对原生Actor实例做了良好的封装,外界不能随意修改其内部状态。
如代码所示,Akka系统中包含了创建ActorSystem以及Actor的基本实例。
// 1. 构建ActorSystem// 使用缺省配置ActorSystem system =ActorSystem.create("sys");// 也可显示指定appsys配置// ActorSystem system1 = ActorSystem.create("helloakka", ConfigFactory.load("appsys"));// 2. 构建Actor,获取该Actor的引用,即ActorRefActorRef helloActor = system.actorOf(Props.create(HelloActor.class),"helloActor");// 3. 给helloActor发送消息
helloActor.tell("hello helloActor",ActorRef.noSender());// 4. 关闭ActorSystem
system.terminate();
在Akka中,创建的每个Actor都有自己的路径,该路径遵循 ActorSystem 的层级结构,大致如下:
本地:akka://sys/user/helloActor
远程:akka.tcp://[email protected]:2020/user/remoteActor
- sys,创建的ActorSystem的名字;
- user,通过ActorSystem#actorOf和ActorContext#actorOf 方法创建的 Actor 都属于/user下,其是系统层面创建的,与系统整体行为有关,在开发阶段并不需要对其过多关注;
- helloActor,我们创建的HelloActor。
其中远程部分路径含义如下:
- akka.tcp,远程通信方式为tcp;
- [email protected]:2020,ActorSystem名字及远程主机ip和端口号。
2.2. 根据path获取Actor并与之通讯
若提供了Actor的路径,可以通过路径获取到ActorRef,然后与之通信,代码如下所示:
ActorSystem system =ActorSystem.create("sys");
ActorSelection as = system.actorSelection("/path/to/actor");Timeout timeout =newTimeout(Duration.create(2,"seconds"));Future<ActorRef> fu = as.resolveOne(timeout);
fu.onSuccess(newOnSuccess<ActorRef>(){@OverridepublicvoidonSuccess(ActorRef actor){System.out.println("actor:"+ actor);
actor.tell("hello actor",ActorRef.noSender());}}, system.dispatcher());
fu.onFailure(newOnFailure(){@OverridepublicvoidonFailure(Throwable failure){System.out.println("failure:"+ failure);}}, system.dispatcher());
3. Flink RPC框架与Akka的关系
Flink进行RPC通信的组件
如图所示,从Flink RPC节点关系中可以看出,集群运行时中实现了RPC通信节点功能的主要有
Dispatcher、ResourceManager和TaskManager以及JobMaster
等组件。
借助RPC通信,这些组件共同参与任务提交及运行的整个流程,例如通过客户端向Dispatcher服务提交JobGraph,JobManager向TaskManager提交Task请求,以及TaskManager向JobManager更新Task执行状态等。
通过AkkaRpcService实现远程通讯能力
从图中也可以看出,集群的RPC服务组件是(1)RpcEndpoint,每个RpcEndpoint包含一个内置的RpcServer负责执行本地和远程的代码请求,(2)RpcServer对应Akka中的Actor实例。RpcEndpoint中创建和启动RpcServer主要是基于集群中的(3)RpcService实现,(4)RpcService的主要实现是AkkaRpcService。
从图可以看出,
AkkaRpcService将Akka中的ActorSystem进行封装
,通过AkkaRpcService可以创建RpcEndpoint中的RpcServer,同时基于AkkaRpcService提供的connect()方法与远程RpcServer建立RPC连接,提供远程进程调用的能力。
4.运行时RPC整体架构设计
Flink的RPC框架设计非常复杂,除了基于Akka构建了底层通信系统之外,还会使用JDK动态代理构建RpcGateway接口的代理类。
Flink RPC UML关系图
这里我们简单梳理一下RPC架构涉及的组件以及每种组件的作用。
- 集群RPC组件的基本实现类:
RpcEndpoint提供了集群RPC组件的基本实现,所有需要实现RPC服务的组件都会继承RpcEndpoint抽象类。
RpcEndpoint中包含了endpointId,用于唯一标记当前的RPC节点。RpcEndpoint借助RpcService启动内部RpcServer,之后通过RpcServer完成本地和远程线程执行。
- 基本实现类与FencedToken对比
对于RpcEndpoint来讲,底层主要有FencedRpcEndpoint基本实现类。
实现FencedRpcEndpoint的RPC节点都会有自己的FencedToken,当进行远程RPC调用时,会对比访问者分配的FencedToken和被访问者的FencedToken,结果一致才会进行后续操作。
- RpcEndpoint的实现类有TaskExecutor组件,FencedRpcEndpoint的实现类有Dispatcher、JobMaster以及ResourceManager等组件。这些组件可以获取RpcService中ActorSystem的dispatcher服务,并直接通过
dispatcher创建Task线程实例
。- RpcService提供了创建和启动RpcServer的方法。
在启动RpcServer的过程中,通过RpcEndpoint的地址创建Akka Actor实例,并基于Actor实例构建RpcServer接口的动态代理类,向RpcServer的主线程中提交Runnable以及Callable线程等。
同时在RpcService中提供了连接远程RpcEndpoint的方法,并创建了相应RpcGateway接口的动态代理类,用于执行远程RPC请求。
- RpcServer接口通过AkkaInvocationHandler动态代理类实现,所有远程或本地的执行请求最终都会转换到AkkaInvocationHandler代理类中执行。
AkkaInvocationHandler实现了MainThreadExecutable接口,提供了
runAsync(Runnable runnable)
以及
callAsync(Callable<V> callable, Time callTimeout)
等在主线程中执行代码块的功能。例如在TaskExecutor中释放Slot资源时,会调用runAsync()方法将freeSlotInternal()方法提交到TaskExecutor对应的RpcServer中运行,此时就会调用AkkaInvocationHandler在主线程中执行任务.
5. RpcEndpoint的设计与实现
RpcEndpoint是集群中RPC组件的端点,每个RpcEndpoint都对应一个由endpointId和actorSystem确定的路径,且该路径对应同一个Akka Actor。
如图,所有需要实现RPC通信的集群组件都会继承RpcEndpoint抽象类,例如TaskExecutor、Dispatcher以及ResourceManager组件服务,还包括根据JobGraph动态创建和启动的JobMaster服务。
从图中我们可以看出,RpcEndpoint实现了RpcGateway和AutoCloseableAsync两个接口,其中
RpcGateway
提供了动态获取RpcEndpoint中Akka地址和HostName的方法。
因为JobMaster组件在任务启动时才会获取Akka中ActorSystem分配的地址信息,所以借助RpcGateway接口提供的方法就能获取Akka相关连接信息。
RpcEndpoint中包含RpcService、RpcServer以及MainThreadExecutor三个重要的成员变量,其中
- RpcService是RpcEndpoint的后台管理服务
- RpcServer是RpcEndpoint的内部服务类
- MainThreadExecutor封装了MainThreadExecutable接口,其主要底层实现是AkkaInvocationHandler代理类。所有本地和远程的RpcGateway执行请求都会通过动态代理的形式转换到AkkaInvocationHandler代理类中执行。
版权归原作者 roman_日积跬步-终至千里 所有, 如有侵权,请联系我们删除。