** 源码视频课程(连载中):**
https://edu.csdn.net/course/detail/39418
1.本节课目的
--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown
1.Flink集群启动模式
2.深入解析StandaloneSessionClusterEntrypoint类、ClusterEntrypoint.runClusterEntrypoint()/startCluster()方法
2.本节内容
1.集群启动模式
2.集群启动和初始化
3、本节详细内容
1.集群启动模式
ClusterEntrypoint是Flink集群入口点的基类,该类是抽象类,实现着有
SessionClusterEntrypoint、JobClusterEntrypoint、ApplicationClusterEntryPoint
1.Per-job
每一个提交的Job单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。
任务之间的资源隔离性好。已经被标记为不推荐使用状态
2.Session
Session集群能够运行多个Flink作业,切这些作业共享运行中的Dispatcher、ResourceManager等组件服务。
集群资源使用率高
3.Application
对于per-job模式,jar包的解析、生成JobGraph是在客户端上执行的,如果任务特别多的话,那么这些生成JobGraph会对客户端服务器有压力。
Application 模式会在客户端将运行任务需要的依赖都上传到 Flink Master,然后在 Master 端进行任务的提交。
如果一个main()方法中有多个env.execute()/executeAsync(),在Application模式下,这些作业会被视为属于同一个应用,在同一个集群中执行(如果在Per-Job模式下,就会启动多个集群)。
Application模式弥补了per-job的不足。
config.yaml
--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown
# JobManager 的主机地址
jobmanager.rpc.address: localhost
# The RPC port where the JobManager is reachable.
#可访问JobManager的RPC端口
jobmanager.rpc.port: 6123
#jobmanager绑定ip,
jobmanager.bind-host: localhost
#JVM 进程总内存
jobmanager.memory.process.size: 1600m
#taskmanager绑定ip,
taskmanager.bind-host: localhost
#唯一标识 window下必须配置
taskmanager.resource-id: q1
taskmanager.cpu.cores: 1
#任务的堆内存
taskmanager.memory.task.heap.size: 512m
#托管内存
taskmanager.memory.managed.size: 512m
#网络内存(Network Memory)
taskmanager.memory.network.min: 128m
taskmanager.memory.network.max: 128m
#任务堆外内存
taskmanager.memory.task.off-heap.size: 0m
#框架内存
taskmanager.memory.framework.heap.size: 256m
#框架堆外内存
taskmanager.memory.framework.off-heap.size: 128m
#JVM Metaspace
taskmanager.memory.jvm-metaspace.size: 128m
#JVM 开销
taskmanager.memory.jvm-overhead.min: 128m
taskmanager.memory.jvm-overhead.max: 128m
#心跳参数
heartbeat.timeout: 50000
heartbeat.interval: 10000
taskmanager.host: localhost
# 进程总内存大小
taskmanager.memory.process.size: 1728m
#每个 TaskManager上并发的 slot 数
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#用于未指定的程序的并行度和其他并行度。
parallelism.default: 1
jobmanager.execution.failover-strategy: region
rest.address: localhost
rest.bind-address: localhost
StandaloneSessionClusterEntrypoint入口类
--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown
参数1:-Dlog.file=./log/flink-jobmanager-1.local.log -Dlog4j.configuration=./a_conf/log4j.properties -Dlog4j.configurationFile=./a_conf/log4j.properties -Dlogback.configurationFile=./a_conf/logback.xml
参数2:-c config
参数3:添加a_lib下的jar
--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown
package org.apache.flink.runtime.entrypoint;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
/** Entry point for the standalone session cluster. */
/** Session集群的入口点 */
public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint {
/** main函数中进行构造传入Configuration */
public StandaloneSessionClusterEntrypoint(Configuration configuration) {
super(configuration);
}
/**
* 构造DefaultDispatcherResourceManagerComponentFactory对象
* ClusterEntrypoint.createDispatcherResourceManagerComponentFactory 初始化
* Dispatcher,ResourceManager时候会被调用
* 这个在初始化组件的时候需要会被调用
* @param configuration
* @return
*/
@Override
protected DefaultDispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
StandaloneResourceManagerFactory.getInstance());
}
/** args=["-c","a_conf"] */
public static void main(String[] args) {
// startup checks and logging
/**检查参数和启动日志*/
EnvironmentInformation.logEnvironmentInfo(
LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
/**
* SignalHandler信号注册器
* Flink 进程启动时的一部分初始化步骤以确保在接收到终止信号时能够优雅地关闭。
* 如,当 Flink 作业管理器(JobManager)或任务管理器(TaskManager)接收到 SIGTERM 信号时,
* 信号处理器可能会被触发,以开始关闭进程并释放资源。
*/
SignalHandler.register(LOG);
/**
* 注册一个安全关闭钩子。
* 当 JVM 接收到终止信号(如 SIGTERM 或 SIGINT)或调用 System.exit() 方法时,
* 可以确保在 JVM 关闭前执行必要的清理和释放资源操作。
* 注册一个安全关闭钩子。JVM允许花费的最长时间被杀死前的关机时间是5秒。
* 5秒之内不关闭则强制调用Runtime.getRuntime().halt(EXIT_CODE)关闭see JvmShutdownSafeguard.run
*/
JvmShutdownSafeguard.installAsShutdownHook(LOG);
/**
* 设置配置的目录
* configDir=a_conf
*/
final EntrypointClusterConfiguration entrypointClusterConfiguration =
ClusterEntrypointUtils.parseParametersOrExit(
args,
new EntrypointClusterConfigurationParserFactory(),
StandaloneSessionClusterEntrypoint.class);
/**
* 加载配置参数
* flink-conf.yaml 中配置的参数
*/
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
/** 将configuration 传入 StandaloneSessionClusterEntrypoint 创建对象*/
StandaloneSessionClusterEntrypoint entrypoint =
new StandaloneSessionClusterEntrypoint(configuration);
/**
* 最终会调用 ClusterEntrypoint.runClusterEntrypoint
* 所有的初始化最终都会在ClusterEntrypoint 进行
*/
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}
}
ClusterEntrypoint源码分析
--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown
/**
* ClusterEntrypoint Flink集群入口点的基类
* 调用ClusterEntrypoint.startCluster方法进行初始化
* @param clusterEntrypoint
*/
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
/** StandaloneSessionClusterEntrypoint */
final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
LOG.error(
String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
int returnCode;
Throwable throwable = null;
try {
returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
} catch (Throwable e) {
throwable = ExceptionUtils.stripExecutionException(e);
returnCode = RUNTIME_FAILURE_RETURN_CODE;
}
LOG.info(
"Terminating cluster entrypoint process {} with exit code {}.",
clusterEntrypointName,
returnCode,
throwable);
System.exit(returnCode);
}
public void startCluster() throws ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
try {
/**
* FlinkSecurityManager Flink 做一些安全的检查
*cluster.intercept-user-system-exit配置默认DISABLED
*cluster.processes.halt-on-fatal-error默认为false
*
* 1.是否拦截用户作业中的 System.exit() 调用当这个配置选项被设置为 true 时,
* Flink 会拦截用户作业中的 System.exit() 调用,并防止作业因为此类调用而意外终止。
* 2.如果Flink遇到了不能恢复的错误,是否直接让JobManager进程终止。
* 默认不做任何处理
*/
FlinkSecurityManager.setFromConfiguration(configuration);
/**
* PluginManager flink插件管理类。在 Apache Flink 的上下文中,PluginManager 负责加载和管理 Flink 插件。
* PluginManager负责管理集群插件,这些插件是使用单独的类加载器加载的,以便它们的依赖关系,不干涉Flink的依赖关系。
* 比如我们自己实现一个写s3、或者我们自己定义数据源 插件以JAR的形式存在
* 通过如下参数配置FLINK_PLUGINS_DIR、plugins
* 总结:FLINK_PLUGINS_DIR、plugins 两个插件配置
*/
PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configuration);
/**
* 配置文件系统,configureFileSystems(configuration, pluginManager) 方法通常用于配置 Flink 集群中使用的文件系统(FileSystems)。
* Flink 支持多种类型的文件系统,如 Hadoop 的分布式文件系统(HDFS)、本地文件系统、云存储服务等,用于读取和写入数据。
* 初始化文件系统设置
* 本地:file
* hadoop:hdfs
*FileSystem类进行初始化
*/
configureFileSystems(configuration, pluginManager);
/**
* 初始化安全上下文环境 默认HadoopSecurityContext
* Hadoop安全上下文,使用先前初始化的UGI(UserGroupInformation)和适当的安全凭据。比如Kerberos
* 总结:初始化安全环境,创建安全环境的时候会做一系列的检查
* security.module.factory.classes:包含哪些SecurityModuleFactory,org.apache.flink.runtime.security.modules.HadoopModuleFactory
* 值为HadoopModuleFactory,JaasModuleFactory,ZookeeperModuleFactory
* security.context.factory.classes:包含哪些 SecurityContextFactory,HadoopSecurityContextFactory 或者 NoOpSecurityContextFactory
*/
SecurityContext securityContext = installSecurityContext(configuration);
/**
* cluster.uncaught-exception-handling 默认Log
* 根据配置为当前线程设置未捕获的异常处理程序。 默认为LOG打印
* 比如程序遇到了未捕获的异常则交给log 也就是打印出来
*/
ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
securityContext.runSecured(
(Callable<Void>)
() -> {
/** 真正组件初始化的地方 */
runCluster(configuration, pluginManager);
return null;
});
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
try {
// clean up any partial state
shutDownAsync(
ApplicationStatus.FAILED,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
ExceptionUtils.stringifyException(strippedThrowable),
false)
.get(
INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
strippedThrowable.addSuppressed(e);
}
throw new ClusterEntrypointException(
String.format(
"Failed to initialize the cluster entrypoint %s.",
getClass().getSimpleName()),
strippedThrowable);
}
}
版权归原作者 星&脉 所有, 如有侵权,请联系我们删除。