0


kerberos认证Flink的kafka connector和kafka client配置

在这里插入图片描述

一、flink-connector-kakfa

1. kafka配置文件

kafka jaas必须配置,如果缺少,则报一下错误。

Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

对于Flink只能通过配置

java.security.auth.login.config

的方式。

jaas配置

1.1 方式一:

System.setProperty配置系统变量:

System.setProperty("java.security.auth.login.config","D:\\configs\\kafka_client_jaas_keytab.conf");

kafka_client_jaas_keytab.conf文件内容如下:

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab = true   
    useTicketCache=false
    storeKey = true
    keyTab="D://configs//xxx.keytab"
    principal="[email protected]"
    serviceName="kafka";
};

1.2 方法二:在IDEA中添加jvm参数:

-Djava.security.auth.login.config=D:\\configs\\kafka_client_jaas_keytab.conf

在这里插入图片描述

注意:将参数添加至kafka 的properties中是错误的。如下:

Properties properties =newProperties();
properties.setProperty("java.security.auth.login.config","D:\\configs\\kafka_client_jaas_keytab.conf");FlinkKafkaProducer<String> producer =newFlinkKafkaProducer<>(topic, simpleStringSchema, properties);

2 配置Flink kerberos

2.1 Idea中配置jvm环境变量

idea配置

-Dsecurity.kerberos.krb5-conf.path=D:\configs\krb5.conf -Dsecurity.kerberos.login.keytab=D:\configs\xxx.keytab [email protected]
2.2 传递stream env

直接传递参数给flink StreamExecutionEnvironment

Properties flinkProps =newProperties();
        flinkProps.setProperty("security.kerberos.krb5-conf.path","D:\\configs\\krb5.conf");
        flinkProps.setProperty("security.kerberos.login.keytab","D:\\configs\\xxx.keytab");
        flinkProps.setProperty("security.kerberos.login.principal","[email protected]");
        flinkProps.setProperty("security.kerberos.login.contexts","Client,KafkaClient");
        flinkProps.setProperty("state.backend","hashmap");// Configuration flinkConfig = ConfigUtils.getFlinkConfig();Configuration flinkConfig =newConfiguration();
        flinkConfig.addAllToProperties(flinkProps);StreamExecutionEnvironment senv =StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);

3. 查看是否连接成功

kafka连接成功可以看到如下日志内容:

09:38:26.473 [Sink: Unnamed (6/8)#0] INFO  org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
... ...
09:38:27.534 [kafka-producer-network-thread | producer-3] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-3] Cluster ID: vj0AfElIS12S0Cp0WDBU7Q
... ...
09:38:27.618 [[email protected]] WARN  org.apache.kafka.common.security.kerberos.KerberosLogin - [[email protected]]: TGT renewal thread has been interrupted and will exit.

4. 配置成cache是不行的。

注意:设置成如下cache格式的,是不行的。
虽然flink已经设置了kerberos的principal和keytab 。

System.setProperty("java.security.auth.login.config","D:\\configs\\kafka_client_jaas_cache.conf");

kafka_client_jaas_cache.conf文件内容:

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};

会报如下错误:

Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user

附代码:

@TestpublicvoidtestWrite()throwsException{// jvm配置:-Dsecurity.kerberos.krb5-conf.path=D:\configs\krb5.conf -Dsecurity.kerberos.login.keytab=D:\configs\xxx.keytab [email protected] // System.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_keytab.conf");Properties flinkProps =newProperties();
        flinkProps.setProperty("security.kerberos.krb5-conf.path","D:\\configs\\krb5.conf");
        flinkProps.setProperty("security.kerberos.login.keytab","D:\\configs\\xxx.keytab");
        flinkProps.setProperty("security.kerberos.login.principal","[email protected]");
        flinkProps.setProperty("security.kerberos.login.contexts","Client,KafkaClient");
        flinkProps.setProperty("state.backend","hashmap");// Configuration flinkConfig = ConfigUtils.getFlinkConfig();Configuration flinkConfig =newConfiguration();
        flinkConfig.addAllToProperties(flinkProps);StreamExecutionEnvironment senv =StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);Properties properties =newProperties();
        properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,"xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667");
        properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
        properties.setProperty(SaslConfigs.SASL_MECHANISM,"GSSAPI");
        properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME,"kafka");// flink-connector-kafka api中错误配置jaas的方法:properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG,String.format(JAAS_CONFIG_KEYTAB_TEMPLATE, "D:\\configs\\xxx.keytab", "[email protected]"));String topic ="flinkcdc";SimpleStringSchema simpleStringSchema =newSimpleStringSchema();FlinkKafkaProducer<String> producer =newFlinkKafkaProducer<>(topic, simpleStringSchema, properties);
        senv.fromElements("hello world","coming again").addSink(producer);
        senv.execute("test");}

二、kafka-client方式

1. kafka 的jaas配置

配置 java的

java.security.auth.login.config

或者 kafka 的

sasl.jaas.config

都是可以的。
但注意jaas配置优先级
sasl.jaas.config > java.security.auth.login.config
所以如果配置了 sasl.jaas.config, 就会导致 java.security.auth.login.config 失效

上代码:
首先需要注意

sasl.jaas.config

中的路径分隔符不能是

\\

必须是

/

错误的:
D:\\configs\\kafka_client_jaas_keytab.conf
正确的:
D:/configs/kafka_client_jaas_keytab.conf
privatestaticfinalStringJAAS_CONFIG_KEYTAB_TEMPLATE="com.sun.security.auth.module.Krb5LoginModule required\n"+"debug=true\n"+"doNotPrompt=true\n"+"storeKey=true\n"+"useKeyTab=true\n"+"keyTab=\"%s\"\n"+"principal=\"%s\";";@TestpublicvoidtestKafkaWrite(){Properties properties =newProperties();
        properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,"xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667");
        properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
        properties.setProperty(SaslConfigs.SASL_MECHANISM,"GSSAPI");
        properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME,"kafka");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 以下二者选其中之一就可以了。System.setProperty("java.security.auth.login.config","D:\\configs\\kafka_client_jaas_keytab.conf");// properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG, String.format(JAAS_CONFIG_KEYTAB_TEMPLATE, "D:/configs/xxx.keytab", "[email protected]"));try{KafkaProducer<String,String> producer =newKafkaProducer<>(properties);ProducerRecord<String,String> record1 =newProducerRecord<>("flinkcdc","hello kafka");ProducerRecord<String,String> record2 =newProducerRecord<>("flinkcdc","coming soon");Future<RecordMetadata> f1 = producer.send(record1);Future<RecordMetadata> f2 = producer.send(record2);

            producer.flush();List<Future<RecordMetadata>> fs =newArrayList<>();
            fs.add(f1);
            fs.add(f2);for(Future<RecordMetadata> future : fs){RecordMetadata metadata = future.get();System.out.println(metadata.toString());}}catch(Exception e){thrownewRuntimeException(e);}}

kafka_client_jaas_keytab.conf 文件内容和flink-conector-kakfka的一样。

三、kafka console 启动命令

console producer启动命令:

bin/kafka-console-producer.sh --bootstrap-server xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667 --topic flinkcdc --producer-property security.protocol=SASL_PLAINTEXT

console consumer启动命令:

bin/kafka-console-consumer.sh --bootstrap-server xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667 --topic flinkcdc --from-beginning  --consumer-property security.protocol=SASL_PLAINTEXT --group tester
标签: kafka flink

本文转载自: https://blog.csdn.net/lisacumt/article/details/132097093
版权归原作者 lisacumt 所有, 如有侵权,请联系我们删除。

“kerberos认证Flink的kafka connector和kafka client配置”的评论:

还没有评论