文章目录
零. RpcService服务概述
RpcService负责创建和启动Flink集群环境中RpcEndpoint组件的RpcServer,且RpcService在启动集群时会提前创建好。AkkaRpcService作为RpcService的唯一实现类,基于Akka的ActorSystem进行封装,为不同的RpcEndpoint创建相应的ActorRef实例。
RpcService主要包含如下两个重要方法。
- startServer():用于启动RpcEndpoint中的RpcServer。RpcServer实际上就是对Actor进行封装,启动完成后,RpcEndpoint中的RpcServer就能够
对外提供服务
了。- connect():用于连接远端RpcEndpoint并返回给调用方RpcGateway接口的方法,建立连接后RPC客户端就能像本地一样调用RpcServer提供的RpcGateway接口了。
例如在JobMaster组件中创建与ResourceManager组件之间的
RPC连接
时。此时可以通过Akka发送消息到ResourceManager的RpcServer中,这样就使得JobMaster像调用本地方法一样在ResourceManager中执行请求任务。
1. AkkaRpcService的创建和初始化
在创建和启动ClusterEntrypoint及TaskManagerRunner的过程中,会调用AkkaRpcServiceUtils.createRpcService()方法创建默认的AkkaRpcService,接着启动RpcServer。
例如管理节点中会使用AkkaRpcService实例创建并启动ResourceManager、Dispatcher以及JobMaster等RPC服务。
创建AkkaRpcService主要包括如下步骤。
- 在ClusterEntrypoint中创建RpcService。
- 启动ActorSystem服务。
- 创建RobustActorSystem。RobustActorSystem实际上是
对Akka的ActorSystem进行了封装和拓展
,相比于原生Akka ActorSystem,RobustActorSystem包含了UncaughtExceptionHandler组件,能够对ActorSystem抛出的异常进行处理。- 使用RobustActorSystem创建AkkaRpcService实例。
- 将AkkaRpcService返回到ClusterEntrypoint中,用于启动集群中各个RpcEndpoint组件服务
2.通过AkkaRpcService初始化RpcServer
在集群运行时中创建了共用的AkkaRpcService服务,相当于创建了Akka系统中的ActorSystem,接下来就是使用AkkaRpcService启动各个RpcEndpoint中的RpcServer实例。(AkkaRpcService服务作为共用的rpc服务,启动其他各个组件的RpcServer实例?)
这里先看通过AkkaRpcService初始化RpcEndpoint对应的RpcServer的逻辑。如下在org.apache.flink.runtime.rpc.RpcEndpoint的构造器中,执行了RpcServer的初始化
protectedRpcEndpoint(finalRpcService rpcService,finalString endpointId){this.rpcService =checkNotNull(rpcService,"rpcService");this.endpointId =checkNotNull(endpointId,"endpointId");// 初始化RpcEndpoint中的RpcServerthis.rpcServer = rpcService.startServer(this);this.mainThreadExecutor =newMainThreadExecutor(rpcServer,this::validateRunsInMainThread);}
具体看下
rpcService.startServer(this)
启动rpcServer的逻辑
- ActorSystem创建相应Actor的ActorRef引用类。创建完毕后会将RpcEndpoint和ActorRef信息存储在Actor键值对集合中。
- 启动RpcEndpoint对应的RPC服务,首先获取当前RpcEndpoint实现的RpcGateways接口。 RpcGateway接口最终通过RpcUtils.extractImplementedRpcGateways()方法从类定义抽取出来,例如JobMaster组件会抽取JobMasterGateway接口定义。
- 创建InvocationHandler代理类,根据InvocationHandler代理类提供的invoke()方法实现被代理类的具体方法。
- 根据RpcEndpoint是否为FencedRpcEndpoint,InvocationHandler分为FencedAkkaInvocationHandler和AkkaInvocationHandler两种类型。
FencedMainThreadExecutable代理的接口主要有FencedMainThreadExecutable和FencedRpcGateway两种。
AkkaInvocationHandler主要代理实现AkkaBasedEndpoint、RpcGateway、StartStoppable、MainThreadExecutable、RpcServer等接口。
- 创建好InvocationHandler代理类后,通过反射的方式(Proxy.newProxyInstance())创建代理类。创建的代理类会被转换为RpcServer实例,再返回给RpcEndpoint使用。
在RpcServer创建的过程中可以看出,实际上包含了创建RpcEndpoint中的Actor引用类ActorRef和AkkaInvocationHandler动态代理类。最后将动态代理类转换为RpcServer接口返回给RpcEndpoint实现类,此时实现的组件就
能够获取到RpcServer服务
,且
通过RpcServer代理了所有的RpcGateways接口
,提供了本地方法调用和远程方法调用两种模式。
@Overridepublic<CextendsRpcEndpoint&RpcGateway>RpcServerstartServer(C rpcEndpoint){checkNotNull(rpcEndpoint,"rpc endpoint");finalSupervisorActor.ActorRegistration actorRegistration =registerAkkaRpcActor(rpcEndpoint);finalActorRef actorRef = actorRegistration.getActorRef();finalCompletableFuture<Void> actorTerminationFuture =
actorRegistration.getTerminationFuture();//启动RpcEndpoint对应的RPC服务LOG.info("Starting RPC endpoint for {} at {} .",
rpcEndpoint.getClass().getName(),
actorRef.path());finalString akkaAddress =AkkaUtils.getAkkaURL(actorSystem, actorRef);finalString hostname;Option<String> host = actorRef.path().address().host();if(host.isEmpty()){
hostname ="localhost";}else{
hostname = host.get();}//解析EpcEndpoint实现的所有RpcGateway接口Set<Class<?>> implementedRpcGateways =newHashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));//额外添加RpcServer和AkkaBasedEndpoint类
implementedRpcGateways.add(RpcServer.class);
implementedRpcGateways.add(AkkaBasedEndpoint.class);finalInvocationHandler akkaInvocationHandler;//根据是否是FencedRpcEndpoint创建不同的动态代理对象if(rpcEndpoint instanceofFencedRpcEndpoint){// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
akkaInvocationHandler =newFencedAkkaInvocationHandler<>(
akkaAddress,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
actorTerminationFuture,((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
captureAskCallstacks);
implementedRpcGateways.add(FencedMainThreadExecutable.class);}else{
akkaInvocationHandler =newAkkaInvocationHandler(
akkaAddress,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
actorTerminationFuture,
captureAskCallstacks);}// Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = getClass().getClassLoader(); @SuppressWarnings("unchecked")RpcServer server =(RpcServer)Proxy.newProxyInstance(
classLoader,
implementedRpcGateways.toArray(newClass<?>[implementedRpcGateways.size()]),
akkaInvocationHandler);return server;}
3. ResourceManager中RPC服务的启动
RpcServer在RpcEndpoint的构造器中完成初始化后,接下来就是启动RpcEndpoint和RpcServer,这里以ResourceManager为例进行说明。
在启动集群时,看下如何启动ResourceManager的RPC服务的。如下调用链
ClusterEntrypoint.startCluster->runCluster
->dispatcherResourceManagerComponentFactory.create
->resourceManager.start();=>publicfinalvoidstart(){
rpcServer.start();}
继续探索RPC服务是如何启动的
首先在DefaultDispatcherResourceManagerComponentFactory中调用ResourceManager.start()方法启动ResourceManager实例,此时在ResourceManager.start()方法中会同步调用RpcServer.start()方法,启动ResourceManager所在RpcEndpoint中的RpcServer,如下。
- 调用ResourceManager.start()方法,此时会调用RpcEndpoint.start()父方法,启动ResourceManager组件的RpcServer。
- 通过动态代理AkkaInvocationHandler.invoke()方法执行流程,发现调用的是StartStoppable.start()方法,此时会直接调用AkkaInvocationHandler.start()本地方法。
- 在AkkaInvocationHandler.start()方法中,实际上会调用rpcEndpoint.tell(ControlMessages.START,ActorRef.noSender())方法向ResourceManager对应的Actor发送控制消息,表明当前Actor实例可以正常启动并接收来自远端的RPC请求。
- AkkaRpcActor调用handleControlMessage()方法处理ControlMessages.START控制消息。
- 将AkkaRpcActor中的状态更新为StartedState,此时ResourceManager的RpcServer启动完成,ResourceManager组件能够接收来自其他组件的RPC请求。
在flink1.12中省略了AkkaInvocationHandler的干预。
经过以上步骤,指定组件的RpcEndpoint节点就
正常启动
,此时RpcServer会作为独立的线程运行在JobManager或TaskManager进程中,
处理本地和远程提交的RPC请求
。
4. 实现相互通讯能力
当AkkaRpcService启动RpcEndpoint中的RpcServer后,RpcEndpoint组件仅能对外提供处理RPC请求的能力,RpcEndpoint组件需要在启动后
向其他组件注册自己的RpcEndpoint信息
,并完成组件之间的RpcConnection注册,才能相互访问和通信。而
创建RPC连接
需要调用RpcService.connect()方法。
如代码所示,在AkkaRpcService.connect()方法中,完成了RpcConnection对象的创建。
@Overridepublic<CextendsRpcGateway>CompletableFuture<C>connect(finalString address,finalClass<C> clazz){returnconnectInternal(
address,
clazz,(ActorRef actorRef)->{Tuple2<String,String> addressHostname =extractAddressHostname(actorRef);returnnewAkkaInvocationHandler(
addressHostname.f0,
addressHostname.f1,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),null,
captureAskCallstacks);});}
具体看AkkaRpcService.connectInternal()方法逻辑。
- 获取ActorRef引用对象。
- 调用Patterns.ask()方法,向actorRef对应的RpcEndpoint节点发送RemoteHandshakeMessage消息,确保连接的RpcEndpoint节点正常,如果成功,则RpcEndpoint会返回HandshakeSuccessMessage消息。
- 调用invocationHandlerFactory创建invocationHandler动态代理类,此时可以看到传递的接口列表为
new Class<?>[]{clazz}
,也就是当前RpcEndpoint需要访问的RpcGateway接口
。例如JobMaster访问ResourceManager时,这里就是ResourceManagerGateway接口。
private<CextendsRpcGateway>CompletableFuture<C>connectInternal(finalString address,finalClass<C> clazz,Function<ActorRef,InvocationHandler> invocationHandlerFactory){checkState(!stopped,"RpcService is stopped");LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
address,
clazz.getName());//获取actorRef实例 finalCompletableFuture<ActorRef> actorRefFuture =resolveActorAddress(address);//进行handshake操作,确保需要连接的RpcEndpoint节点正常 finalCompletableFuture<HandshakeSuccessMessage> handshakeFuture =
actorRefFuture.thenCompose((ActorRef actorRef)->FutureUtils.toJava(//调用Patterns.ask()方法,向actorRef对应的//RpcEndpoint节点发送RemoteHandshakeMessage消息,//确保连接的RpcEndpoint节点正常,如果成功,则//RpcEndpoint会返回HandshakeSuccessMessage消息。 Patterns.ask(
actorRef,newRemoteHandshakeMessage(
clazz,getVersion()),
configuration.getTimeout().toMilliseconds()).<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$
.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class))));//创建RPC动态代理类 return actorRefFuture.thenCombineAsync(
handshakeFuture,(ActorRef actorRef,HandshakeSuccessMessage ignored)->{InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);// Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and // all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom // ClassLoader ClassLoader classLoader = getClass().getClassLoader(); @SuppressWarnings("unchecked")C proxy =(C)Proxy.newProxyInstance(
classLoader,newClass<?>[]{clazz}, invocationHandler);return proxy;},
actorSystem.dispatcher());}
经过以上步骤,实现了创建RpcEndpoint组件之间的RPC连接,此时集群RPC组件之间可以进行相互访问,例如JobMaster可以向ResourceManager发送Slot资源请求。
RPC 服务启动的 Akka actor 能接收来自RpcGateway RPC 调用。
参考:《Flink设计与实现:核心原理与源码解析》–张利兵
版权归原作者 roman_日积跬步-终至千里 所有, 如有侵权,请联系我们删除。