0


Kafka配置动态SASL_SCRAM认证

Kafka配置动态SASL_SCRAM认证


Kafka中需要加上认证,并动态新增用户,SASL/SCRAM验证可以支持
本文章是对https://blog.csdn.net/qq_38616503/article/details/117529690中的内容整理与重新记录

1.启动Zookeeper和Kafka

第一步,在没有设置任何权限的配置下启动Kafka和Zookeeper,如需要从头安装Kafka,可参见Kafka的安装单机安装以及集群安装

2.创建SCRAM证书

(1)创建broker通信用户:admin(在使用sasl之前必须先创建,否则启动报错)

  1. bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--alter --add-config 'SCRAM-SHA-256=[password=admin-sec],SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin

(2)创建生产用户producer

  1. bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=prod-sec],SCRAM-SHA-512=[password=prod-sec]' --entity-type users --entity-name producer

(2)创建消费用户:consumer

  1. bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=cons-sec],SCRAM-SHA-512=[password=cons-sec]' --entity-type users --entity-name consumer

SCRAM-SHA-256/SCRAM-SHA-512是对密码加密的算法,二者有其一即可

3.维护SCRAM证书

3.1查看SCRAM证书

  1. bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--describe --entity-type users --entity-name consumer
  2. bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--describe --entity-type users --entity-name producer

3.1删除SCRAM证书

  1. bin/kafka-configs.sh --zookeeper localhost:2181--alter --delete-config 'SCRAM-SHA-512'--delete-config 'SCRAM-SHA-256'--entity-type users --entity-name producer

4.服务端配置

在用户证书创建完毕之后开始Kafka服务端的配置
(1)创建JAAS文件:

  1. cat > kafka_server_jaas.conf <<EOF
  2. KafkaServer {
  3. org.apache.kafka.common.security.scram.ScramLoginModule required
  4. username="admin"
  5. password="admin-sec";};EOF

(2)将JAAS配置文件位置作为JVM参数传递给每个Kafka Broker【bin/kafka-server-start.sh】添加-Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf

  1. -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf
  1. exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf kafka.Kafka "$@"

(3)配置server.properties【config/server.properties】

  1. #认证配置
  2. listeners=SASL_PLAINTEXT://IP:9092
  3. security.inter.broker.protocol=SASL_PLAINTEXT
  4. sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
  5. sasl.enabled.mechanisms=SCRAM-SHA-256
  6. advertised.listeners=SASL_PLAINTEXT://IP:9092#ACL配置
  7. allow.everyone.if.no.acl.found=false
  8. super.users=User:admin
  9. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

可以根据自己的需求选择SASL_SSL或SASL_PLAINTEXT, PLAINTEXT为不加密明文传输,性能好一点。配置完后重启Kafka和Zookeeper

5.客户端配置

(1)创建的三个用户的三个JAAS文件:
kafka_client_scram_admin_jaas.conf
kafka_client_scram_producer_jaas.conf
kafka_client_scram_consumer_jaas.conf

  1. cat > kafka_client_scram_admin_jaas.conf <<EOF
  2. KafkaClient {
  3. org.apache.kafka.common.security.scram.ScramLoginModule required
  4. username="admin"
  5. password="admin-sec";};EOF
  6. cat > kafka_client_scram_producer_jaas.conf <<EOF
  7. KafkaClient {
  8. org.apache.kafka.common.security.scram.ScramLoginModule required
  9. username="producer"
  10. password="prod-sec";};EOF
  11. cat > kafka_client_scram_consumer_jaas.conf <<EOF
  12. KafkaClient {
  13. org.apache.kafka.common.security.scram.ScramLoginModule required
  14. username="consumer"
  15. password="cons-sec";};EOF

(2)修改启动脚本引入JAAS文件

  1. ###生产者配置bin/kafka-console-producer.sh
  2. exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_client_scram_producer_jaas.conf
  3. ###消费者配置bin/kafka-console-consumer.sh
  4. exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_client_scram_consumer_jaas.conf

(3)配置consumer.properties和producer.properties,都加入以下配置

  1. security.protocol=SASL_PLAINTEXT
  2. sasl.mechanism=SCRAM-SHA-256
  3. bootstrap.servers=192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092

(4)创建主题

  1. bin/kafka-topics.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--create --topic topictest --partitions 3--replication-factor 1

(5)启动生产

  1. bin/kafka-console-producer.sh --broker-list 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092--topic topictest --producer.config config/producer.properties

(6)对生产者赋予写的权限

  1. bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--add --allow-principal User:producer --operation Write --topic topictest

(7)查看权限

  1. bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--list

(8)对消费者赋予读的权限

  1. bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--add --allow-principal User:consumer --operation Read --topic topictest

(9)对消费者赋予组的权限

  1. bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--add --allow-principal User:consumer --operation Read --group test-consumer-group

(10)启动消费者

  1. bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092--topic topictest --from-beginning --consumer.config config/consumer.properties

6.Java代码测试

6.1生产者

maven的pom.xml

  1. <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.0</version><!--<version>0.10.2.0</version>--></dependency>

kafka_client_scram_producer_jaas.conf

  1. KafkaClient {
  2. org.apache.kafka.common.security.scram.ScramLoginModule required
  3. username="producer"
  4. password="prod-sec";};

代码:

  1. import java.util.Properties;
  2. import java.util.concurrent.Future;
  3. import org.apache.kafka.clients.producer.Callback;
  4. import org.apache.kafka.clients.producer.KafkaProducer;
  5. import org.apache.kafka.clients.producer.Producer;
  6. import org.apache.kafka.clients.producer.ProducerRecord;
  7. import org.apache.kafka.clients.producer.RecordMetadata;
  8. public class MySaslScramProducer {
  9. public static MySaslScramProducer ins ;
  10. private Producer<String, String> producer;
  11. private MySaslScramProducer(){
  12. Properties props = new Properties();
  13. props.put("bootstrap.servers","192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092");
  14. props.put("acks","1");
  15. props.put("retries",3);
  16. props.put("batch.size",16384);
  17. props.put("linger.ms",1);
  18. props.put("buffer.memory",33554432);//props.put("compression.type","gzip");//props.put("max.request.size","5242880");
  19. props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
  20. props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//配置文件采用项目相对路径访问,plan-text鉴权将以下注解开放即可
  21. System.out.println(MySaslScramProducer.class.getResource("/").getPath()+"kafka_client_scram_producer_jaas.conf");
  22. System.setProperty("java.security.auth.login.config", MySaslScramProducer.class.getResource("/").getPath()+"kafka_client_scram_producer_jaas.conf");
  23. props.put("security.protocol","SASL_PLAINTEXT");
  24. props.put("sasl.mechanism","SCRAM-SHA-256");
  25. producer = new KafkaProducer<>(props);}
  26. public static MySaslScramProducer getIns(){if(ins == null){synchronized(MySaslScramProducer.class){if(ins == null){
  27. ins = new MySaslScramProducer();}}}return ins;}
  28. public Future<RecordMetadata>send(String topic, String valueStr){//采用异步发送,在失败时打印出失败的日志,备核查
  29. Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>(topic, valueStr), new Callback(){
  30. @Override
  31. public voidonCompletion(RecordMetadata metadata, Exception exception){if(exception != null){//发送失败的打印出来到error.log
  32. System.out.println("sendi failed--->>> "+ valueStr);}else{
  33. System.out.println("topic:"+ metadata.topic()+" ,partition:"+metadata.partition()+" , offset:"+ metadata.offset()+" -> "+ valueStr);}}});return meta;}
  34. public voidclose(){if(producer != null) producer.close();}
  35. public staticvoidmain(String[] args) throws InterruptedException {
  36. String valueStr ="{\"metric\":\"host.mem.pused\",\"value\":\"97.781098\",\"tags\":{\"resCi\":\"TA_RES_PHYSICALHOST\",\"dataType\":0,\"ip\":\"132.121.93.69\",\"cmd\":\"\",\"resId\":\"auto217A77657DDC70403B949090D3EA5543\",\"itemKey\":\"vm.memory.size[pavailable]\"},\"timestamp\":\"1617673320000\"}";
  37. MySaslScramProducer.getIns().send("topictest", valueStr);
  38. MySaslScramProducer.getIns().close();}}

6.2消费者

kafka_client_scram_consumer_jaas.conf

  1. KafkaClient {
  2. org.apache.kafka.common.security.scram.ScramLoginModule required
  3. username="consumer"
  4. password="cons-sec";};

代码:

  1. package cn.gzsendi;
  2. import java.util.Arrays;
  3. import java.util.HashMap;
  4. import java.util.Map;
  5. import java.util.Properties;
  6. import java.util.Set;
  7. import org.apache.kafka.clients.consumer.ConsumerRecord;
  8. import org.apache.kafka.clients.consumer.ConsumerRecords;
  9. import org.apache.kafka.clients.consumer.KafkaConsumer;
  10. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  11. import org.apache.kafka.common.TopicPartition;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. public class SaslScramTopicTest {
  15. public static boolean stop = false;
  16. private static Logger logger = LoggerFactory.getLogger(SaslScramTopicTest.class);
  17. public staticvoidmain(String[] args){
  18. KafkaConsumer<String, String> consumer = null;
  19. Properties props = new Properties();
  20. props.put("bootstrap.servers","192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092");
  21. props.put("group.id","liutest");
  22. props.put("enable.auto.commit","true");// 自动提交
  23. props.put("auto.offset.reset","latest");
  24. props.put("auto.commit.interval.ms","1000");
  25. props.put("session.timeout.ms","300000");
  26. props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  27. props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  28. System.setProperty("java.security.auth.login.config", SaslScramTopicTest.class.getResource("/").getPath()+"kafka_client_scram_consumer_jaas.conf");
  29. props.put("security.protocol","SASL_PLAINTEXT");
  30. props.put("sasl.mechanism","SCRAM-SHA-256");
  31. consumer = new KafkaConsumer<>(props);
  32. String topicName ="topictest";
  33. consumer.subscribe(Arrays.asList(topicName));while(!stop){
  34. try {
  35. ConsumerRecords<String, String> records = consumer.poll(100);for(ConsumerRecord<String, String> record : records){
  36. String valueStr = record.value();
  37. try {
  38. logger.info(valueStr);
  39. logger.info("topic:"+ record.topic()+" ,partition:"+ record.partition()+" ,offset:"+record.offset()+" -> "+ record.value());}catch(Exception e){
  40. System.out.println("error------->>> "+ valueStr);}}}catch(Exception e){
  41. e.printStackTrace();}}if(consumer != null)
  42. consumer.close();}/**
  43. *
  44. * <跳过历史数据,从最新的数据开始消费>
  45. *
  46. * @param consumer
  47. * @throws
  48. */
  49. public staticvoidassignOffset(KafkaConsumer<String, String> consumer){if(consumer == null){return;}
  50. Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<TopicPartition, OffsetAndMetadata>();
  51. consumer.poll(100);
  52. Set<TopicPartition> assignment = consumer.assignment();
  53. consumer.seekToEnd(assignment);//consumer.seekToBeginning(assignment);for(TopicPartition topicPartition : assignment){long position = consumer.position(topicPartition);
  54. offsetMap.put(topicPartition, new OffsetAndMetadata(position));
  55. consumer.commitSync(offsetMap);}}}
标签: kafka java 分布式

本文转载自: https://blog.csdn.net/jxlhljh/article/details/127286603
版权归原作者 冰之杍 所有, 如有侵权,请联系我们删除。

“Kafka配置动态SASL_SCRAM认证”的评论:

还没有评论