0


Kafka服务端SASL/PLAIN+ACL认证授权安装操作

Kafka+zookeeper安全认证技术介绍

Kafka 目前支持多种认证方式,生产环境常见应用的SASL有以下几种:

  • SSL/TLS认证:本认证是基于SSL/TLS的加密方式,使用SSL/TLS证书对客户端和服务器进行身份验证,确保在传输层上安全保证数据安全可靠。
  • SASL/PLAIN认证:含义是简单身份验证和授权层应用程序接口,PLAIN认证是其中一种最简单的用户名、密码认证方式,生产环境使用维护简单易用。可用于Kafka和其他应用程序之间的认证。
  • SASL/SCRAM认证:SCRAM-SHA-256、SCRAM-SHA-512方式认证,本认证需要客户端、服务器共同协同完成认证过程,使用和维护上较为复杂。优势是可动态增加用户,而不必重启kafka组件服务端。
  • SASL/GSSAPI 认证:Kerberos认证,本认证适用于大型公司企业生产环境,通常结合Kerberos协议使用。使用Kerberos认证可集成目录服务,比如AD。通过本认证机制可实现优秀的安全性和良好的用户体验。
  • Kafka 自带ACL访问控制方式:Kafka提供了ACL(Access Control Lists)来控制用户对特定Topic或Topic Partition的访问权限,实现业务数据的安全性。

企业在选择认证方式时,需要结合业务特点,考虑到企业数据安全性,又要考虑部署后的性能和部署复杂度等因素。在生产实际部署时,可以根据企业安全政策、网络因素、对性能的考量等因素来正确选择合适的认证方式,确保选型的认证机制符合本企业需求。

环境配置

本方案需要测试环境主机裸金属环境为3台主机,

IPx.x.x.15-17
Zookeeper版本:apache-zookeeper-3.6.4-bin
Kafka版本:kafka_2.13-2.7.1

由于登录Kafka、zookeeper认证会严重依赖服务器主机名,所以需先配置/etc/hosts。
设置每个主机的主机名,并确保每台主机名不重复,执行

  1. hostnamectl set-hostname kafka1 &&bash
  1. cat>>/etc/hosts <<EOF
  2. x.x.x.15 kafka1
  3. x.x.x.16 kafka2
  4. x.x.x.17 kafka3
  5. EOF

配置SASL/PLAIN+ACL认证

kafka认证配置

目前需要编辑两个文件,一个是server.properties,另一个是kafka_server_jaas.conf。server.properties是kafka的主要配置文件,里面有很多参数可以调整。kafka_server_jaas.conf是sasl认证的配置文件,里面定义了认证的方式和用户信息。这两个文件都在kafka的config目录下。或者把kafka_server_jaas.conf设置在单独路径,但必须在kafka脚本里添加该路径变量。

Kafka的broker端配置:
若要开启SASL机制,需要在broker服务端进行三个方面的设置:
第一:配置“server.properties”的文件内容
第二:编写JAAS配置文件kafka_server_jaas.conf
第三:修改broker启动脚本等各种脚本,并指定JAAS的配置文件kafka_server_jaas.conf存放路径

编写broker服务端server.properties的SASL+ACL配置

分别在3台kafka节点修改,下面参数为SASL相关参数,其他参数正常配置。

  1. vim server.properties
  2. listeners=SASL_PLAINTEXT://x.x.x.15:9092 security.inter.broker.protocol=SASL_PLAINTEXT
  3. sasl.mechanism.inter.broker.protocol=PLAIN
  4. sasl.enabled.mechanisms=PLAIN
  5. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  6. super.users=User:sdx

参数解释:

Listeners:SASL端口,sasl_plaintext监听器,表示使用sasl和明文传输
security.inter.broker.protocol: 表示Broker间通信使用SASL协议认证
sasl.enabled.mechanisms:认证参数。kafka启用的sasl认证方式
sasl.mechanism.inter.broker.protocol:表示Broker间通信启用PLAIN机制
authorizer.class.name:启用ACL机制,设置kafka的ACL的授权类
super.users:指定超级用户,超级用户不需要进行ACL鉴权,拥有最高权限

全配置文件示例如下:

  1. broker.id=11
  2. zookeeper.request.timeout=30000listeners=SASL_PLAINTEXT://x.x.x.15:9092
  3. security.inter.broker.protocol=SASL_PLAINTEXT
  4. sasl.enabled.mechanisms=PLAIN
  5. sasl.mechanism.inter.broker.protocol=PLAIN
  6. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  7. super.users=User:sdx
  8. delete.topic.enable=true
  9. log.index.size.max.bytes=1024000
  10. num.network.threads=3
  11. num.io.threads=8
  12. socket.send.buffer.bytes=102400
  13. socket.receive.buffer.bytes=102400
  14. socket.request.max.bytes=104857600
  15. log.dirs=/data1/kafka
  16. log.index.size.max.bytes=1024000
  17. log.index.interval.bytes=4000000
  18. num.recovery.threads.per.data.dir=1
  19. offsets.topic.replication.factor=3
  20. transaction.state.log.replication.factor=1
  21. transaction.state.log.min.isr=1
  22. log.retention.hours=168
  23. log.segment.bytes=1073741824
  24. log.retention.check.interval.ms=300000
  25. zookeeper.connect=x.x.x.15:2181,x.x.x.16:2181,x.x.x.17:2181/kafka
  26. zookeeper.connection.timeout.ms=180000
  27. zookeeper.session.timeout.ms=60000
  28. zookeeper.sync.time.ms =2000
  29. group.initial.rebalance.delay.ms=0

编写broker端JAAS配置文件kafka_server_jaas.conf

创建包含所有客户端连接kafka认证用户信息的JAAS文件,认证用户可根据业务需要任意增加用户数量。
创建存放各种jaas配置文件目录

  1. mkdir /usr/local/kafka_2.13-2.7.1/else_config

文件内容如下:

  1. vim /usr/local/kafka_2.13-2.7.1/else_config/kafka_server_jaas.conf
  2. KafkaServer {
  3. org.apache.kafka.common.security.plain.PlainLoginModule required
  4. username="sdx"#kafka超级管理员,为broker间通信使用password="sdx123"#kafka超级管理员密码user_pj="sdx123"#客户端连kafka认证使用的用户sdx,密码sdx123user_usera="usera1"#客户端连kafka认证使用的用户usera,密码usera1。user_userb="userb2";#用户userb,密码userb2。末尾有分号,别漏掉!};#末尾有分号,别漏掉
  5. Client {# Clientbroker作为客户端连接zk服务端的认证
  6. org.apache.kafka.common.security.plain.PlainLoginModule required
  7. username="kz"# broker连接zookeeper认证的用户password="kz123";#broker连接zk认证的用户密码。末尾有分号别漏掉};#末尾有分号,别漏掉
  8. 注:
  9. 1)、user_name语句,是用来配置客户端连接Broker时使用的用户名和密码,也就是新建用户的用户名都以user_开头,等号后面是密码,密码可以是明文且没有复杂度要求。完整语句是user_USERNAME="PASSWORD"
  10. 2)、规划用户usera给生产者使用,用户userb给消费者使用。
  11. 3.1.3 修改broker启动脚本,并指定JAAS的配置文件存放路径
  12. 下面黄色标识为新增变量,位置如下
  13. vim /usr/local/kafka_2.13-2.7.1/bin/kafka-run-class.sh
  14. (( CYGWIN ))&&CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")KAFKA_OPTS='-Djava.security.auth.login.config=/usr/local/kafka_2.13-2.7.1/else_config/kafka_server_jaas.conf'

编辑认证携带文件

因为开启了安全认证,所以执行命令需要携带含有认证用户信息的认证文件。认证文件仍可放在/usr/local/kafka_2.13-2.7.1/else_config路径下。
编写生产者用户usera的认证文件:

  1. vim usera_producer.properties
  2. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \username="usera"\password="usera1";
  3. security.protocol=SASL_PLAINTEXT
  4. sasl.mechanism=PLAIN

usera_producer.properties的使用方法是通过–producer.config参数携带。

编写消费者用户userb的认证文件:
如果后续指定此配置文件无法消费,需要先查出消费者组名称,然后在文件第一行添加group.id参数,并指定消费者组。

  1. vim userb_consumer.properties
  2. #group.id=console-consumer-94652
  3. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \username="userb"\password="userb2";
  4. security.protocol=SASL_PLAINTEXT
  5. sasl.mechanism=PLAIN
  6. userb_consumer.properties的使用方法是通过--consumer.config参数携带。
  7. 编写生产者用户usera的认证文件:
  8. vim usera-writer-jaas.conf
  9. KafkaClient {
  10. org.apache.kafka.common.security.plain.PlainLoginModule required
  11. username="usera"password="usera1";};
  12. 编写消费者用户userb的认证文件:
  13. vim userb-read-jaas.conf
  14. KafkaClient {
  15. org.apache.kafka.common.security.plain.PlainLoginModule required
  16. username="userb"password="userb2";};

上述usera-writer-jaas.conf和userb-read-jaas.conf文件的使用方法均需通过–command-config参数携带。

分别修改生产者脚本和消费者脚本

  1. vim /usr/local/kafka_2.13-2.7.1/bin/kafka-console-producer.sh
  2. KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.13-2.7.1/else_config/usera-writer-jaas.conf"vim /usr/local/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh
  3. KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.13-2.7.1/else_config/userb-read-jaas.conf"

重启kafka

如果重启kafka会无法启动,会报与zookeeper认证失败。我们继续完善zookeeper认证策略设置。最后重启kakfa。

zookeeper认证配置

配置认证文件

编辑zookeeper服务端认证文件,Zookeeper服务端启用sasl认证,添加下面3行认证参数,其他参数正常设置。

  1. vim /usr/local/apache-zookeeper-3.6.4-bin/conf/zoo.cfg
  2. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider #authProvider.1表示开启认证功能,zookeeper配置SASL支持requireClientAuthScheme=sasl #开启SASL的认证方式
  3. zookeeper.sasl.client=true #开启SASL身份验证
  4. Zookeeper配置文件全文如下:
  5. tickTime=2000initLimit=10syncLimit=5dataDir=/data1/zookeeper/snapshot
  6. dataLogDir=/data1/zookeeper/datalogdir
  7. 4lw.commands.whitelist=*
  8. clientPort=2181
  9. server.11=x.x.x.15:2888:3888
  10. server.22=x.x.x.16:2888:3888
  11. server.33=x.x.x.17:2888:3888
  12. zookeeper.session.timeout.ms=60000
  13. zookeeper.connection.timeout.ms=60000
  14. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  15. requireClientAuthScheme=sasl
  16. zookeeper.sasl.client=true
  17. jaas用于定义认证身份信息,我们使用LoginModule实现,举例如org.apache.zookeeper.server.auth.DigestLoginModule,它是基于明文用户、密码的形式进行身份认证。
  18. vim /usr/local/apache-zookeeper-3.6.4-bin/conf/zk_jaas.conf
  19. Server {
  20. org.apache.kafka.common.security.plain.PlainLoginModule required
  21. username="sdx"#pj用户是zk集群之间认证使用的password="sdx123"#pj用户密码user_sdx="sdx123"user_kz="kz123"#用户kz是客户端broker(或zkCli.sh)与zk之间使用的,密码是kz123要与kafka_server_jaas.conf的Clinet里的用户密码相同。user_usera="usera1"#生产用户,供生产者程序认证使用user_userb="userb2";#消费用户,供消费者程序认证使用};

导入kafka认证的相关jar到zookeeper

Zookeeper的认证机制是使用插件,所以需要导入Kafka相关jar包,kafka-clients的相关jar包,在kafka服务下的lib目录中可以找到,因kafka版本不同,相关jar包版本会有所变化。
创建存放kafk的jar包目录

  1. /usr/local/apache-zookeeper-3.6.4-bin/lib

从kafka/lib目录下复制以下5个jar包到每个zookeeper主机的lib目录下,包名分别如下:

  1. kafka-clients-2.3.0.jar
  2. lz4-java-1.6.0.jar
  3. slf4j-api-1.7.25.jar
  4. slf4j-log4j12-1.7.25.jar
  5. snappy-java-1.1.7.3.jar

修改zookeeper的zkEnv.sh脚本文件

添加jaas配置文件添加到zookeeper的环境变量,添加如下新增变量SERVER_JVMFLAGS。黄色标识为新增变量。

  1. vim /usr/local/apache-zookeeper-3.6.4-bin/bin/zkEnv.sh
  2. ZOOBINDIR="${ZOOBINDIR:-/usr/bin}"ZOOKEEPER_PREFIX="${ZOOBINDIR}/.."exportSERVER_JVMFLAGS="-Djava.security.auth.login.config=/usr/local/apache-zookeeper-3.6.4-bin/conf/zk_jaas.conf"#check to see if the conf dir is given as an optional argument

重启组件

先使用脚本关闭3台kafka进程,再关闭3台zookeeper组件进程,确认进程停止后,最后启动zookeeper进程,再启动3台kakfa进程。

进入zookeeper客户端查看ACL节点

发现会多出与ACL认证授权相关的节点

  1. # /usr/local/apache-zookeeper-3.6.4-bin/bin/zkCli.sh[zk: localhost:2181(CONNECTED)0]ls /
  2. [kafka, zookeeper][zk: localhost:2181(CONNECTED)1]ls /kafka
  3. [admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, kafka-acl, kafka-acl-changes, kafka-acl-extended, kafka-acl-extended-changes, latest_producer_id_block, log_dir_event_notification]

注:kafka-acl节点存储ACL信息。Kafka-acl-changes节点存储ACL变更信息

kafka命令使用方法举例

创建topic和删除topic

需要使用(必须使用)–zookeeper方式创建:
如果使用–bootstrap-server x.x.x.15:9092方式创建,则kafka日志会报Failed authentication

  1. 不指定用户创建
  1. ./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --create --topic abc --replication-factor 3 --partitions 3
  1. 指定用户创建
  1. ./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --create --topic abc --replication-factor 3 --partitions 3 --command-config /usr/local/kafka_2.13-2.7.1/else_config/usera_producer.properties
  1. 删除topic操作
  1. ./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --delete --topic dsc

查询topic

需要使用–zookeeper方式查询

  1. ./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --list
  2. ./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --describe

改topic分区数量

需要使用 --zookeeper方式

  1. ./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --topic abc --alter --partitions 4

输出如下

  1. WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
  2. Adding partitions succeeded!

ACL授权操作

对生产类用户授权

生产用户(如usera)对某topic授权某操作(如生产、写数据)权限:(acl授权过程也可通过业务端java代码实现,也可通过运维手动授权实现)

  1. # ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.x.15:2181/kafka --add --allow-principal User:usera --operation Write --topic abc

对消费类用户授权

消费用户(如userb)对某topic、某主机授权某操作(如消费、读数据)权限:(acl授权过程也可通过业务端java代码实现,也可通过运维手动授权实现)

  1. ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.x.15:2181/kafka --add --allow-principal User:userb --operation Read --topic abc

查看授权情况

  1. ./bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=x.x.x.15:2181/kkafka
  2. Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=topic-new, patternType=LITERAL)`:(principal=User:usera, host=*, operation=WRITE, permissionType=ALLOW)(principal=User:userb, host=*, operation=READ, permissionType=ALLOW)

对消费者组授权

对消费者组进行acl授权:对消费者组console-consumer-48035认证授权消费topic abc

如果对所有topic和grpup授权,可以用星号* , 如–topic=* --group=*

  1. ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.x.15:2181/kafka --allow-principal User:userb --consumer --topic=abc --group=console-consumer-48035 --add

查看acl授权信息

也可–topic指定topic,–group指定消费组

  1. ./bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=x.x.x.15:2181/kafka

生产和消费

生产

必须使用–bootstrap-server方式生产,必须携带认证文件

  1. ./bin/kafka-console-producer.sh --bootstrap-server x.x.x.15:9092 --topic abc --producer.config /usr/local/kafka_2.13-2.7.1/else_config/usera_producer.properties

消费

必须使用–bootstrap-server方式消费,必须携带认证文件

  1. ./bin/kafka-console-consumer.sh --bootstrap-server x.x.x.15:9092 --topic abc --from-beginning --consumer.config /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties

如果消费报错如 Not authorized to access group: console-consumer-48035,需要对console-consumer-48035消费者组认证。
方式如下:只添加group.id参数即可

  1. vim /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties
  2. group.id=console-consumer-48035
  3. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \username="userb"\password="userb";
  4. security.protocol=SASL_PLAINTEXT
  5. sasl.mechanism=PLAIN

消费者组相关操作

查询消费组–list

查询消费者组(粗查询),需要使用–bootstrap-server

  1. ./bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.15:9092 --list --command-config /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties

查询消费组–describe

查询消费者组(细查询–group指定消费组 或–all-groups查所有组),需要使用–bootstrap-server查询。

  1. ./bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.15:9092 --describe --all-groups --command-config /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties

查看消费组消费偏移量

需要使用–bootstrap-server查询。

  1. ./bin/kafka-consumer-groups.sh --describe --bootstrap-server x.x.x.15:9092 --group console-consumer-48035 --command-config /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties

查看log日志文件内容

查看Log文件基本数据信息

  1. # /usr/local/kafka_2.13-2.7.1/bin/kafka-dump-log.sh --files /data1/kafka/pctest-1/00000000000000000001.log --print-data-log
  2. Dumping /data1/kafka/pctest-1/00000000000000000001.log
  3. Starting offset: 1
  4. baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1715322009004 size: 72 magic: 2 compresscodec: NONE crc: 3420315536 isvalid: true| offset: 1 CreateTime: 1715322009004 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: test

查看Log文件具体数据信息

  1. # /usr/local/kafka_2.13-2.7.1/bin/kafka-dump-log.sh --files /data1/kafka/aaa-0/00000000000000000000.log --print-data-log
  2. Dumping /data1/kafka/aaa-0/00000000000000000000.log
  3. Starting offset: 0
  4. baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: false position: 0 CreateTime: 1718268803238 size: 200 magic: 2 compresscodec: NONE crc: 710512664 isvalid: true| offset: 0 CreateTime: 1718268803238 keysize: -1 valuesize: 130 sequence: -1 headerKeys: [] payload: ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZ

注:payload参数后为实际数据

查看index文件具体内容

  1. ./kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.index

offset为索引值,position为具体位置,可以看到大概每隔600条消息,就建立一个索引。配置项为log.index.size.max.bytes,来控制创建索引的大小;

  1. offset: 972865 position: 25163202
  2. offset: 973495 position: 25179579
  3. offset: 974125 position: 25195956
  4. offset: 974755 position: 25212333
  5. offset: 975385 position: 25228710
  6. offset: 976015 position: 25245087
  7. offset: 976645 position: 25261464
  8. offset: 977275 position: 25277841

查看timeindex文件具体内容

  1. ./kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.timeindex

输出如下:

  1. timestamp: 1691292274425 offset: 475709
  2. timestamp: 1691292274426 offset: 476947
  3. timestamp: 1691292274427 offset: 478255
  4. timestamp: 1691292274428 offset: 479543
  5. timestamp: 1691292274429 offset: 480848
  6. timestamp: 1691292274430 offset: 481767
  7. timestamp: 1691292274431 offset: 483209
  8. timestamp: 1691292274432 offset: 484869
  9. timestamp: 1691292274433 offset: 486408

哪里不懂可以在评论中评论~
文档持续更新中~


本文转载自: https://blog.csdn.net/qq_40477248/article/details/142813401
版权归原作者 斯普信专业组 所有, 如有侵权,请联系我们删除。

“Kafka服务端SASL/PLAIN+ACL认证授权安装操作”的评论:

还没有评论