文章目录
概要
Flink1.19高可用提供了三种实现,分别是基于K8s的高可用,基于Zookeeper的高可用和本地Standalone的高可用。Flink有四个组件实现了高可用,分别是ResourceManager,dispatcher,
JobMaster和WebMonitor,其中最核心的就是他们的选举机制,本文会以resourcemanager为例进行讲解
前置知识
LeaderContender接口
凡是要实现高可用的组件都要实现这个接口
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.leaderelection;
import java.util.UUID;
/**
* Interface which has to be implemented to take part in the leader election process of the {@link
* LeaderElectionService}.
*/
public interface LeaderContender {
/**
* Callback method which is called by the {@link LeaderElectionService} upon selecting this
* instance as the new leader. The method is called with the new leader session ID.
*
* @param leaderSessionID New leader session ID
*/
void grantLeadership(UUID leaderSessionID);
/**
* Callback method which is called by the {@link LeaderElectionService} upon revoking the
* leadership of a former leader. This might happen in case that multiple contenders have been
* granted leadership.
*/
void revokeLeadership();
/**
* Callback method which is called by {@link LeaderElectionService} in case of an error in the
* service thread.
*
* @param exception Caught exception
*/
void handleError(Exception exception);
}
整体架构流程
1.启动resourcemanager,第一次启动肯定会有rm的选举
2.点进start()
3.startLeaderElection()方法会开始leader的选举
4.这里先进行了参数的检查,随后进行注册,继续往里点
5.我们先进入createLeaderElectionDriver()方法看一下,leader是如何选举的
6.工厂类创建driver,继续点进create方法,往里看
7.可以看见leaderElectionDriverFactory有两种实现,一种是zookeeper,还有一种是k8s,我们看其中一种zookeeper的
8.继续点
public ZooKeeperLeaderElectionDriver(
CuratorFramework curatorFramework, LeaderElectionDriver.Listener leaderElectionListener)
throws Exception {
this.curatorFramework = Preconditions.checkNotNull(curatorFramework);
this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);
this.leaderLatchPath =
ZooKeeperUtils.generateLeaderLatchPath(curatorFramework.getNamespace());
this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());
this.treeCache =
ZooKeeperUtils.createTreeCache(
curatorFramework,
"/",
new ZooKeeperLeaderElectionDriver.ConnectionInfoNodeSelector());
treeCache
.getListenable()
.addListener(
(client, event) -> {
switch (event.getType()) {
case NODE_ADDED:
case NODE_UPDATED:
Preconditions.checkNotNull(
event.getData(),
"The ZooKeeper event data must not be null.");
handleChangedLeaderInformation(event.getData());
break;
case NODE_REMOVED:
Preconditions.checkNotNull(
event.getData(),
"The ZooKeeper event data must not be null.");
handleRemovedLeaderInformation(event.getData().getPath());
break;
}
});
leaderLatch.addListener(this);
curatorFramework.getConnectionStateListenable().addListener(listener);
leaderLatch.start();
treeCache.start();
}
9.这个方法就完成了leader的选举,由于底层封装了看不见具体实现,但我们可以看到是用了一个叫Curator的框架进行实现的
10.我们返回回去,此时leader已经被选举出来了,调用notifyLeaderContenderOfLeaderShip()方法就可以完成leader的创建,点进去
11.点进grantLeaderShip方法
12.我们可以看见这是leaderContender接口的一个方法,我们之前说过凡是要实现高可用的组件都必须实现leaderContender接口,从图中可以看到leaderContender有四种实现,分别是我之前所说的JM,RM,WbE,DP,我们这里只看RM的
13.startNewLeaderResouceManager()方法创建RM并启动
14.我们点进startResourceManagerIfisLeader()
15.启动该leader对应的RM
到这里我们看到了选举机制是如何帮助我们启动组件的,接下来看高可用是怎么做的,组件故障了怎么办,如果RM leader故障了,会发生什么
1.在TaskExecutor里有一个段代码,上面圈出来的,有一个RM的Listener,这个Listener就是用来监听RMleader的
2.当leader发生变化时,就会调用notifyLeaderAddress这个方法,与新的RMleader建立连接
版权归原作者 BigDataLover520 所有, 如有侵权,请联系我们删除。