0


Kafka配置动态SASL_SCRAM认证

Kafka配置动态SASL_SCRAM认证


Kafka中需要加上认证,并动态新增用户,SASL/SCRAM验证可以支持
本文章是对https://blog.csdn.net/qq_38616503/article/details/117529690中的内容整理与重新记录

1.启动Zookeeper和Kafka

第一步,在没有设置任何权限的配置下启动Kafka和Zookeeper,如需要从头安装Kafka,可参见Kafka的安装单机安装以及集群安装

2.创建SCRAM证书

(1)创建broker通信用户:admin(在使用sasl之前必须先创建,否则启动报错)

bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--alter --add-config 'SCRAM-SHA-256=[password=admin-sec],SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin

(2)创建生产用户producer

bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=prod-sec],SCRAM-SHA-512=[password=prod-sec]' --entity-type users --entity-name producer

(2)创建消费用户:consumer

bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=cons-sec],SCRAM-SHA-512=[password=cons-sec]' --entity-type users --entity-name consumer

SCRAM-SHA-256/SCRAM-SHA-512是对密码加密的算法,二者有其一即可

3.维护SCRAM证书

3.1查看SCRAM证书

bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--describe --entity-type users --entity-name consumer
bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--describe --entity-type users --entity-name producer

3.1删除SCRAM证书

bin/kafka-configs.sh --zookeeper localhost:2181--alter --delete-config 'SCRAM-SHA-512'--delete-config 'SCRAM-SHA-256'--entity-type users --entity-name producer

4.服务端配置

在用户证书创建完毕之后开始Kafka服务端的配置
(1)创建JAAS文件:

cat > kafka_server_jaas.conf <<EOF
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec";};EOF

(2)将JAAS配置文件位置作为JVM参数传递给每个Kafka Broker【bin/kafka-server-start.sh】添加-Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf

-Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf kafka.Kafka "$@"

(3)配置server.properties【config/server.properties】

#认证配置
listeners=SASL_PLAINTEXT://IP:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
advertised.listeners=SASL_PLAINTEXT://IP:9092#ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

可以根据自己的需求选择SASL_SSL或SASL_PLAINTEXT, PLAINTEXT为不加密明文传输,性能好一点。配置完后重启Kafka和Zookeeper

5.客户端配置

(1)创建的三个用户的三个JAAS文件:
kafka_client_scram_admin_jaas.conf
kafka_client_scram_producer_jaas.conf
kafka_client_scram_consumer_jaas.conf

cat > kafka_client_scram_admin_jaas.conf <<EOF
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec";};EOF

cat > kafka_client_scram_producer_jaas.conf <<EOF
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="producer"
password="prod-sec";};EOF

cat > kafka_client_scram_consumer_jaas.conf <<EOF
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="consumer"
password="cons-sec";};EOF

(2)修改启动脚本引入JAAS文件

###生产者配置bin/kafka-console-producer.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_client_scram_producer_jaas.conf

###消费者配置bin/kafka-console-consumer.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_client_scram_consumer_jaas.conf

(3)配置consumer.properties和producer.properties,都加入以下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
bootstrap.servers=192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092

(4)创建主题

bin/kafka-topics.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--create --topic topictest --partitions 3--replication-factor 1

(5)启动生产

bin/kafka-console-producer.sh --broker-list 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092--topic topictest --producer.config config/producer.properties

(6)对生产者赋予写的权限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--add --allow-principal User:producer --operation Write --topic topictest

(7)查看权限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--list

(8)对消费者赋予读的权限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--add --allow-principal User:consumer --operation Read --topic topictest

(9)对消费者赋予组的权限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181--add --allow-principal User:consumer --operation Read --group test-consumer-group

(10)启动消费者

bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092--topic topictest --from-beginning --consumer.config config/consumer.properties

6.Java代码测试

6.1生产者

maven的pom.xml

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.0</version><!--<version>0.10.2.0</version>--></dependency>

kafka_client_scram_producer_jaas.conf

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="producer"
password="prod-sec";};

代码:

import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class MySaslScramProducer {
    
    public static MySaslScramProducer ins ;
    
    private Producer<String, String> producer;
    
    private MySaslScramProducer(){
        
        Properties props = new Properties();
        props.put("bootstrap.servers","192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092");
        props.put("acks","1");
        props.put("retries",3);
        props.put("batch.size",16384);
        props.put("linger.ms",1);
        props.put("buffer.memory",33554432);//props.put("compression.type","gzip");//props.put("max.request.size","5242880");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//配置文件采用项目相对路径访问,plan-text鉴权将以下注解开放即可
        System.out.println(MySaslScramProducer.class.getResource("/").getPath()+"kafka_client_scram_producer_jaas.conf");
          System.setProperty("java.security.auth.login.config", MySaslScramProducer.class.getResource("/").getPath()+"kafka_client_scram_producer_jaas.conf");
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","SCRAM-SHA-256");
        
        producer = new KafkaProducer<>(props);}
    
    public static MySaslScramProducer getIns(){if(ins == null){synchronized(MySaslScramProducer.class){if(ins == null){
                    ins = new MySaslScramProducer();}}}return ins;}
    
    public Future<RecordMetadata>send(String topic, String valueStr){//采用异步发送,在失败时打印出失败的日志,备核查
        Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>(topic, valueStr), new Callback(){
            
            @Override
            public voidonCompletion(RecordMetadata metadata, Exception exception){if(exception != null){//发送失败的打印出来到error.log
                    System.out.println("sendi failed--->>> "+ valueStr);}else{
                    System.out.println("topic:"+ metadata.topic()+" ,partition:"+metadata.partition()+" , offset:"+ metadata.offset()+" -> "+ valueStr);}}});return meta;}
    
    public voidclose(){if(producer != null) producer.close();}
    
    public staticvoidmain(String[] args) throws InterruptedException {
        
        String valueStr ="{\"metric\":\"host.mem.pused\",\"value\":\"97.781098\",\"tags\":{\"resCi\":\"TA_RES_PHYSICALHOST\",\"dataType\":0,\"ip\":\"132.121.93.69\",\"cmd\":\"\",\"resId\":\"auto217A77657DDC70403B949090D3EA5543\",\"itemKey\":\"vm.memory.size[pavailable]\"},\"timestamp\":\"1617673320000\"}";
        MySaslScramProducer.getIns().send("topictest", valueStr);
        MySaslScramProducer.getIns().close();}}

6.2消费者

kafka_client_scram_consumer_jaas.conf

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="consumer"
password="cons-sec";};

代码:

package cn.gzsendi;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SaslScramTopicTest {
    
    public static boolean stop = false;
    
    private static Logger logger = LoggerFactory.getLogger(SaslScramTopicTest.class);

    public staticvoidmain(String[] args){
        
        
        KafkaConsumer<String, String> consumer = null;
        Properties props = new Properties();
        props.put("bootstrap.servers","192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092");
        props.put("group.id","liutest");
        props.put("enable.auto.commit","true");// 自动提交
        props.put("auto.offset.reset","latest");
        props.put("auto.commit.interval.ms","1000");
        props.put("session.timeout.ms","300000");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        
        System.setProperty("java.security.auth.login.config", SaslScramTopicTest.class.getResource("/").getPath()+"kafka_client_scram_consumer_jaas.conf");
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","SCRAM-SHA-256");
        
        consumer = new KafkaConsumer<>(props);
        
        String topicName ="topictest";
        consumer.subscribe(Arrays.asList(topicName));while(!stop){
            try {
                ConsumerRecords<String, String> records = consumer.poll(100);for(ConsumerRecord<String, String> record : records){
                    String valueStr = record.value();
                    
                    try {
                        logger.info(valueStr);
                        logger.info("topic:"+ record.topic()+" ,partition:"+ record.partition()+" ,offset:"+record.offset()+"  -> "+ record.value());}catch(Exception e){
                        System.out.println("error------->>> "+ valueStr);}}}catch(Exception e){
                e.printStackTrace();}}if(consumer != null)
            consumer.close();}/**
     *
     * <跳过历史数据,从最新的数据开始消费>
     *
     * @param consumer
     * @throws
     */
    public staticvoidassignOffset(KafkaConsumer<String, String> consumer){if(consumer == null){return;}
        
        Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<TopicPartition, OffsetAndMetadata>();
        consumer.poll(100);
        Set<TopicPartition> assignment = consumer.assignment();
        consumer.seekToEnd(assignment);//consumer.seekToBeginning(assignment);for(TopicPartition topicPartition : assignment){long position = consumer.position(topicPartition);
            offsetMap.put(topicPartition, new OffsetAndMetadata(position));
            consumer.commitSync(offsetMap);}}}
标签: kafka java 分布式

本文转载自: https://blog.csdn.net/jxlhljh/article/details/127286603
版权归原作者 冰之杍 所有, 如有侵权,请联系我们删除。

“Kafka配置动态SASL_SCRAM认证”的评论:

还没有评论