1.Flink开启kerberos认证遇到的问题
在配置flink任务时,不能通过对单个任务进行kerberos验证,只能在flink-conf文件中进行认证,这样遇到的麻烦就是,每次启动不同任务的时候,都需要进行依赖不同的conf文件
2.解决办法
通过在flink github项目中查看,发现有pr提交了代码,可以在flink任务启动之初进行,conf文件加载
通过修改 flink/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java文件中的main方法
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
int retCode = 31;
try {
// 更改当前代码内容,即可完成对conf文件的加载,进行conf文件的认证
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
CommandLine commandLine =
cli.getCommandLine(
new Options(), Arrays.copyOfRange(args, 1, args.length), true);
Configuration securityConfig = new Configuration(cli.configuration);
DynamicPropertiesUtil.encodeDynamicProperties(commandLine, securityConfig);
SecurityUtils.install(new SecurityConfiguration(securityConfig));
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
} finally {
System.exit(retCode);
}
}
这样在任务提交通过:
flink run -t yarn-per-job -d \
-Dsecurity.kerberos.login.keytab=/data2/home/zhu.hh/kafka3u1.keytab
-Dsecurity.kerberos.login.principal=kafka3u1@BELLE.COM
-Dsecurity.kerberos.login.contexts=KafkaClient \
即可完成认证
版权归原作者 zhu.hh 所有, 如有侵权,请联系我们删除。