0


Kafka安全认证机制详解之SASL_SCRAM

SASL/SCRAM验证可以动态新增用户并分配权限。
SASL/SCRAM 通过将认证用户信息保存在 ZooKeeper 的方式,避免了动态修改需要重启 Broker 的弊端。在实际使用过程中,可以使用 Kafka 提供的命令动态地创建和删除用户,无需重启整个集群。因此,如果打算使用 SASL/PLAIN,不妨改用 SASL/SCRAM 试试。不过要注意的是,后者是 0.10.2 版本引入的。
kafka官方文档:
https://kafka.apache.org/documentation/#security_sasl_scram

一、配置

配置SCRAM证书

下面命令创建了一个证书:tly 密码是:123456

kafka-configs.sh --zookeeper localhost:2181 --alter --add-config SCRAM-SHA-256=[iterations=8192,password=123456],SCRAM-SHA-512=[password=123456] --entity-type users --entity-name tly

创建admin证书,密码也是admin

kafka-configs.sh --zookeeper localhost:2181 --alter --add-config SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin] --entity-type users --entity-name admin

查看证书

查看指定证书加上–entity-name tly 不加则查看所有证书

kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name tly

删除证书

kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config SCRAM-SHA-512 --delete-config SCRAM-SHA-256 --entity-type users --entity-name tly

image.png

二、服务端配置

创建配置文件

在kafka的config文件夹下创建kafka-server-jaas.conf文件

KafkaServer {
        org.apache.kafka.common.security.scram.ScramLoginModule required
        username="admin"
        password="admin"
        user_admin="admin"
        password="admin";
};

分发脚本

scp kafka-server-jaas.conf root@kafka2-82373:/opt/kafka_2.13-2.6.0/config

scp kafka-server-jaas.conf root@kafka3-82373:/opt/kafka_2.13-2.6.0/config

测试权限功能

生成三个证书

admin是用来broker之间相互认证的
producer是用来做生产者的
consumer是用来做消费者的

kafka-configs.sh --zookeeper kafka1-33529:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin
kafka-configs.sh --zookeeper kafka1-33529:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name producer
kafka-configs.sh --zookeeper kafka1-33529:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name consumer

查看证书清单:

kafka-configs.sh --zookeeper kafka1-33529:2181 --describe --entity-type users

image.png

创建kafka_server_jaas.conf配置文件

这个配置文件是用来在broker之间相互认证用的,用户名密码对应的就是上一步创建admin证书时指定的用户名密码。

vim kafka_server_jaas.conf

# 添加如下内容
KafkaServer {
  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin"password="admin";};

分发此配置文件到另外两个节点上,由于我是直接在/root路径下创建的,因此也同步到对应的路径下;如果实在config路径下创建的,则同步的时候也同步到对应的路径下

scp kafka_server_jaas.conf root@kafka2-33529:/root/kafka_server_jaas.conf
scp kafka_server_jaas.conf root@kafka3-33529:/root/kafka_server_jaas.conf

修改kafak启动脚本

编辑kafka-server-start.sh 脚本,将我们的jaas文件添加到对应的环境中;

# 编辑启动脚本vim /opt/kafka_2.13-2.6.0/bin/kafka-server-start.sh

# 注释掉这一行(最后一行)#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"# 添加下面的代码exec$base_dir/kafka-run-class.sh $EXTRA_ARGS-Djava.security.auth.login.config=/root/kafka_server_jaas.conf kafka.Kafka "$@"

分发编辑好的脚本到其它节点:

scp /opt/kafka_2.13-2.6.0/bin/kafka-server-start.sh root@kafka2-33529:/opt/kafka_2.13-2.6.0/bin/kafka-server-start.sh

scp /opt/kafka_2.13-2.6.0/bin/kafka-server-start.sh root@kafka3-33529:/opt/kafka_2.13-2.6.0/bin/kafka-server-start.sh

修改server.properties文件

修改server.conf配置文件,启用认证并指定认证方式,指定超级管理员,启用acl控制权限等设置

# 编辑配置文件vim /opt/kafka_2.13-2.6.0/config/server.properties

# 添加下面的额内容,注意:每台节点标记的时候对应的kafkaIP需要变更listeners=SASL_PLAINTEXT://kafka3-33529:9092
advertised.listeners=SASL_PLAINTEXT://kafka3-33529:9092
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin

重启服务

每个节点都要重启

# 重启zk
zkServer.sh restart

# 重启kafka
kafka-server-stop.sh
kafka-server-start.sh -daemon /opt/kafka_2.13-2.6.0/config/server.properties

重启后多次执行jps命令,确保kafka服务没有挂掉
image.png

修改生产者和消费者配置文件

修改生产者和消费者配置文件,增加权限认证

# 修改生产者配置文件vim /opt/kafka_2.13-2.6.0/config/producer.properties

# 添加以下内容security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256

# 分发配置文件到其它节点scp /opt/kafka_2.13-2.6.0/config/producer.properties root@kafka2-33529:/opt/kafka_2.13-2.6.0/config/producer.properties
scp /opt/kafka_2.13-2.6.0/config/producer.properties root@kafka3-33529:/opt/kafka_2.13-2.6.0/config/producer.properties

# 修改消费者配置文件vim /opt/kafka_2.13-2.6.0/config/consumer.properties

# 添加以下内容security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256

# 分发配置文件到其它节点scp /opt/kafka_2.13-2.6.0/config/consumer.properties root@kafka2-33529:/opt/kafka_2.13-2.6.0/config/consumer.properties
scp /opt/kafka_2.13-2.6.0/config/consumer.properties root@kafka3-33529:/opt/kafka_2.13-2.6.0/config/consumer.properties

配置生产者和消费者jaas文件

# 新建生产者jaas文件vim kafka_producer_jaas.conf

# 增加以下内容,用户名密码为第一步在创建证书时创建的用户名密码
KafkaClient {
  org.apache.kafka.common.security.scram.ScramLoginModule required
    username="producer"password="123456";};# 分发到其它节点scp kafka_producer_jaas.conf root@kafka2-33529:/root/kafka_producer_jaas.conf
scp kafka_producer_jaas.conf root@kafka3-33529:/root/kafka_producer_jaas.conf

# 新建生产者jaas文件vim kafka_consumer_jaas.conf

# 增加以下内容,用户名密码为第一步在创建证书时创建的用户名密码
KafkaClient {
  org.apache.kafka.common.security.scram.ScramLoginModule required
    username="consumer"password="123456";};# 分发到其它节点scp kafka_consumer_jaas.conf root@kafka2-33529:/root/kafka_consumer_jaas.conf
scp kafka_consumer_jaas.conf root@kafka3-33529:/root/kafka_consumer_jaas.conf

配置生产者jaas文件到启动脚本中

# 编辑配置文件vim /opt/kafka_2.13-2.6.0/bin/kafka-console-producer.sh

# 注释掉下面内容# exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"# 新增下面内容exec$(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/root/kafka_producer_jaas.conf kafka.tools.ConsoleProducer "$@"

配置消费者jaas文件到启动脚本中

# 编辑配置文件vim /opt/kafka_2.13-2.6.0/bin/kafka-console-consumer.sh

# 注释掉下面内容#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"# 新增下面内容exec$(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/root/kafka_consumer_jaas.conf kafka.tools.ConsoleConsumer "$@"

创建topic测试

创建名为test1的topic用来测试

kafka-topics.sh -create--zookeeper kafka1-33529:2181 -replication-factor 3--partitions3--topic test1

生产消息

kafka-console-producer.sh --bootstrap-server kafka1-33529:9092 --topic test1 --producer.config /opt/kafka_2.13-2.6.0/config/producer.properties

提示没有权限对topic1进行操作:
image.png
对生产者证书进行acl授权:

kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=kafka1-33529:2181 \--add --allow-principal User:producer --operation Write --topic test1 

再次生产消息,可以正常生产消息了:
image.png
消费消息

kafka-console-consumer.sh --bootstrap-server kafka1-33529:9092 --topic test1 --from-beginning --consumer.config /opt/kafka_2.13-2.6.0/config/consumer.properties

提示没权限消费消息:
image.png
对消费者证书进行acl授权,一定要增加–group,否则当前消费者组没有权限,会报Not authorized to access group: test-consumer-group错误

kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=kafka1-33529:2181 \--add --allow-principal User:consumer --operation Read --topic test1  --group test-consumer-group

再次消费消息即可获取到消息:
image.png

标签: kafka 安全 大数据

本文转载自: https://blog.csdn.net/weixin_43929753/article/details/135401035
版权归原作者 4935同学 所有, 如有侵权,请联系我们删除。

“Kafka安全认证机制详解之SASL_SCRAM”的评论:

还没有评论