0


zookeeper、kakfa添加用户加密

背景

zookeeper无权限访问到根目录

步骤

  1. 在kafka/config 目录中创建vi config/zookeeper_jaas.conf
  2. 在zookeeper_jaas.conf中添加Server {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="12345"user_admin="12345";};#user_{username}="{password}"
  3. zookeeper.properties最后添加配置#authauthProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProviderrequireClientAuthScheme=sasljaasLoginRenew=3600000
  4. 在zookeeper-server-start.sh中添加配置根据不同的目录位置进行修改-Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/zookeeper_jaas.conf``````if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/zookeeper_jaas.conf"fi
  5. 启动zookeeper./zookeeper-server-start.sh -daemon../config/zookeeper.properties
  6. 下面开始设置ACL配置1. 登录zookeeper192.168.6.42:2181IP地址根据自己的进行调整./zookeeper-shell.sh 192.168.6.42:21812. 添加用户addauth digest admin:12345addauth digest kafka:123453. 设置ACLip:192.168.6.42:cdrwa根据自己的ip地址进行修改setAcl / ip:192.168.4.235:cdrwa,ip:127.0.0.1:cdrwa,auth:kafka:cdrwa,auth:admin:cdrwasetAcl /consumers ip:192.168.4.235:cdrwa,ip:127.0.0.1:cdrwa,auth:kafka:cdrwa,auth:admin:cdrwa``````cdrwa: create: 你可以创建子节点。 read: 你可以获取节点数据以及当前节点的子节点列表。 write: 你可以为节点设置数据。 delete: 你可以删除子节点。 admin: 可以为节点设置权限4. 查看是否配置正确getAcl /getAcl /consumers
  7. 添加kafka的配置vim config/kafka_server_jaas.conf
  8. 添加内容KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin"password="12345"user_admin="12345";};Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin"password="12345";};
  9. 修改config/server.properties# AUTHsecurity.inter.broker.protocol=SASL_PLAINTEXTsasl.mechanism.inter.broker.protocol=PLAINsasl.enabled.mechanisms=PLAINauthorizer.class.name=kafka.security.authorizer.AclAuthorizerallow.everyone.if.no.acl.found=truelisteners=SASL_PLAINTEXT://0.0.0.0:9092advertised.listeners=SASL_PLAINTEXT://:9092#将zookeeper.connect改成zookeeper的地址zookeeper.connect=192.168.6.42:2181
  10. 调整kafka的启动脚本 kafka-server-start.sh-Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_server_jaas.conf根据自己的地址进行配置if["x$KAFKA_HEAP_OPTS"="x"];thenexportKAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_server_jaas.conf"fi
  11. 启动kafka./kafka-server-start.sh -daemon../config/server.properti
  12. 测试进入kafka目录,在config目录下创建kafka_client_jaas.conf文件,并写入如下内容。KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin"password="admin";};
  13. 配置提供者认证,修改提供者启动脚本,vi bin/kafka-console-producer.sh``````if["x$KAFKA_HEAP_OPTS"="x"];thenexportKAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_client_jaas.conf"fiexec$(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
  14. 启动消费者./kafka-console-producer.sh --broker-list 192.168.4.235:9092 --topic testTopic --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
  15. 配置消费者认证,修改提供者启动脚本,vi bin/kafka-console-consumer.sh``````if["x$KAFKA_HEAP_OPTS"="x"];thenexportKAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/opt/kafka_2.13-3.5.1/config/kafka_client_jaas.conf"fiexec$(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
  16. 启动消费者./kafka-console-consumer.sh --bootstrap-server 192.168.4.235:9092 --topic testTopic --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN有消息输出即为成功
  17. springboot的配置spring:kafka:# docker http://192.168.2.202:8080bootstrap-servers: http://192.168.6.42:9092# 配置用户名密码producer:properties:sasl:mechanism: PLAIN security:protocol: SASL_PLAINTEXT consumer:properties:sasl:mechanism: PLAIN security:protocol: SASL_PLAINTEXT
  18. 在相关的kafkaConfig中增加相关配置@BeanpublicKafkaTemplatekafkaTemplate(){Map<String,Object> configs =newHashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers_config); configs.put(ProducerConfig.RETRIES_CONFIG, pro_retry_config); configs.put(ProducerConfig.BATCH_SIZE_CONFIG, batch_size_config); configs.put(ProducerConfig.ACKS_CONFIG, acks_config); configs.put(ProducerConfig.LINGER_MS_CONFIG, linger_ms_config); configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, buffer_memory_config); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,key_serializer_config); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,value_serializer_config); configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression_type_config);if(Boolean.valueOf(auth_enabled)){ configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name()); configs.put(SaslConfigs.SASL_MECHANISM, sasl_mechanism);}DefaultKafkaProducerFactory producerFactory =newDefaultKafkaProducerFactory(configs);returnnewKafkaTemplate(producerFactory);}
  19. 最后在启动脚本中新增一个配置
-Djava.security.auth.login.config=客户端登录文件所在的位置
#eg:-Djava.security.auth.login.config=/opt/kafka/config/kafka_client_jaas.conf
    

本文转载自: https://blog.csdn.net/weixin_43964408/article/details/139587040
版权归原作者 遨游DATA 所有, 如有侵权,请联系我们删除。

“zookeeper、kakfa添加用户加密”的评论:

还没有评论