目录
1 Security
在Kafka的官方文档中介绍了如下几种安全方式,以满足用户不同场景对安全的需求。
● SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0。主要是为 Kerberos 使用,如果当前已有 Kerberos 认证,只需要为集群中每个 Broker 和访问用户申请 Principle ,然后在 Kafka 配置文件中开启 Kerberos 的支持即可。官方文档详见 Authentication using SASL/Kerberos。
● SASL/PLAIN - starting at version 0.10.0.0。一个简单的用户名和密码身份认证机制,通常与 TLS/SSL 一起用于加密,以实现身份验证。是一种比较容易使用的方式,但是也有一个很明显的缺点,这种方式会把用户账户文件配置到静态文件中,每次想要添加新的账户都需要重启 Kafka 去加载静态文件,才能生效,十分不方便。官方文档详见 Authentication using SASL/PLAIN。
● SASL/SCRAM-SHA-256 和 SASL/SCRAM-SHA-512 - starting at version 0.10.2.0。通过将认证信息保存在 ZooKeeper 里面,从而动态的获取用户信息,相当于把 ZK 用作一个认证中心使用。这种认证可以在使用过程中,使用 Kafka 提供的命令动态地创建和删除用户,无需重启整个集群,十分方便。官方文档详见 Authentication using SASL/SCRAM 。
● SASL/OAUTHBEARER - starting at version 2.0。 Kafka 引入的新认证机制,主要是为了实现与 OAuth2 框架的集成,Kafka 不提倡单纯使用 OAUTHBEARER,因为它生成的不安全 Json Web Token,必须配以 SSL 加密才能在生产环境中使用。官方文档详见 Authentication using SASL/OAUTHBEARER 。
2 SASL+ACL实现用户及权限认证
以 Kafka 3.3.1 为例。
2.1 下载
wget https://repo.huaweicloud.com/apache/kafka/3.3.1/kafka_2.12-3.3.1.tgz -P /data
cd /data
tar -zxf kafka_2.12-3.3.1.tgz
ln -s kafka_2.12-3.3.1 kafka
cd /data/kafka
2.2 Kafka服务配置
Kafka server 的 jaas 文件如下
/data/kafka/config/jaas/kafka_server_jaas.conf
。这里创建五个用户,密码分别如下,后会会对其进行分别授权。
用户密码adminadmin123kwriterkwriter123kreaderkreader123shargasharga123shargbshargb123
mkdir /data/kafka/config/jaas/
vim /data/kafka/config/jaas/kafka_server_jaas.conf
配置如下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"password="admin123"user_admin="admin123"user_kwriter="kwriter123"user_kreader="kreader123"user_sharga="sharga123"user_shargb="shargb123";};
2.3 修改Kafka 服务启动脚本
1 复制一份作为带安全的启动脚本
cp /data/kafka/bin/kafka-server-start.sh /data/kafka/bin/sasl_kafka-server-start.sh
2 修改大概 44 行为如下,引入上面的 jaas.conf 文件,修改为如下
exportLOG_DIR=/var/log/kafka-logs
exportKAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka/config/jaas/kafka_server_jaas.conf"exec$base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
2.4 配置server.properties
拷贝一份
cp /data/kafka/config/server.properties /data/kafka/config/jaas/server.properties
,配置如下:
broker.id=0listeners=SASL_PLAINTEXT://192.168.179.3:9092
advertised.listeners=SASL_PLAINTEXT://192.168.179.3:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# ACL 入口类
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 设置 admin 用户为超级管理员
super.users=User:admin
auto.create.topics.enable=true
log.dirs=/data/kafka/data/kafka-logs
zookeeper.connect=192.168.179.3:2181/kafka
创建需要的目录
mkdir -p /data/kafka/data/{kafka-logs,zookeeper}
2.5 启动Zookeeper
ZK服务地址可以使用Kafka自带的,也可以使用已部署在的ZK。如果已有ZK可以跳过这一步。这里以Kafka自带的为例。
配置 zookeeper.properties,
vim /data/kafka/config/zookeeper.properties
:
dataDir=/data/kafka/data/zookeeper
clientPort=2181maxClientCnxns=0
admin.enableServer=false
启动ZK
/data/kafka/bin/zookeeper-server-start.sh -daemon /data/kafka/config/zookeeper.properties
查看日志:
tail -f /var/log/zookeeper/server.log
可以通过如下命令进一步查看 znode 信息
/data/kafka/bin/zookeeper-shell.sh 192.168.179.3:2181
2.6 启动Kafka 集群
# 1 启动 Kafka
/data/kafka/bin/sasl_kafka-server-start.sh -daemon /data/kafka/config/jaas/server.properties
# 2 查看启动日志tail -f /var/log/kafka-logs/server.log
关闭的命令如下:
/data/kafka/bin/kafka-server-stop.sh
/data/kafka/bin/zookeeper-server-stop.sh
如果需要调整Kafka JVM参数,可以如下进行修改,调整期堆大小,或者其它参数
vim /data/kafka/bin/sasl_kafka-server-start.sh +28
2.7 ACL
接下来我们对用户进行授权,大概按照如下进行设置。
用户密码权限说明adminadmin123All超级管理员用户kwriterkwriter123producer生产者类权限,Writer、Create、Describekreaderkreader123consumer消费者类权限,Read、Describeshargasharga123All仅对sharga-* 类Topic有所有权shargbshargb123All仅对shargb-* 类Topic有所有权
# 1 授予kwriter用户为生产者权限
/data/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.179.3:2181/kafka \
--add --allow-principal User:kwriter --producer --topic "*"# 2 授予kreader 用户为消费者权限
/data/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.179.3:2181/kafka \
--add --allow-principal User:kreader --consumer --topic "*" --group "*"# 3 对sharga用户进行授权,前缀模式
/data/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.179.3:2181/kafka \
--add --allow-principal User:sharga --group "*" --topic "sharga" --resource-pattern-type prefixed
# 4 对shargb用户进行授权,前缀模式
/data/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.179.3:2181/kafka \
--add --allow-principal User:shargb --group "*" --topic "shargb" --resource-pattern-type prefixed
# 5 查看赋权
/data/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.179.3:2181/kafka --list
小的测试验证(在没有使用正确用户时,执行如下会失败)
# 1 创建 Topic(客户端没有使用认证是,创建topic会失败)
/data/kafka/bin/kafka-topics.sh --create --bootstrap-server node01:9092 \
--partitions 1 --replication-factor 1 --topic test_sasl
# 2 查看 topic(客户端没有使用认证是,获取topic会失败)
/data/kafka/bin/kafka-topics.sh --list --bootstrap-server node01:9092 |greptest# 3 查看 topic详情(客户端没有使用认证是,获取topic详情会失败)
/data/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.179.3:9092 \
--describe --topic test_sasl
# 4 生产者(此时会因为权限问题,而报错,下面来解决这个问题)
/data/kafka/bin/kafka-console-producer.sh --bootstrap-server 192.168.179.3:9092 \
--topic test_sasl
# 5 消费者(此时会因为权限问题,而报错,下面来解决这个问题)
/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092 \
--from-beginning --topic test_sasl
2.7.1 admin
新增admin配置属性文件
vim /data/kafka/config/jaas/admin.properties
,内容如下:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin"password="admin123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
以 admin 身份执行如下命令
# 1 再次创建 topic,此时可以成功创建
/data/kafka/bin/kafka-topics.sh --create --bootstrap-server node01:9092 \
--command-config /data/kafka/config/jaas/admin.properties \
--partitions 1 --replication-factor 1 --topic test_sasl
# 2 list
/data/kafka/bin/kafka-topics.sh --list --bootstrap-server node01:9092 \
--command-config /data/kafka/config/jaas/admin.properties
# 3 topic详新信息
/data/kafka/bin/kafka-topics.sh --bootstrap-server node01:9092 \
--command-config /data/kafka/config/jaas/admin.properties \
--describe --topic test_sasl
如果需要删除Topic,可使用如下命令
/data/kafka/bin/kafka-topics.sh --bootstrap-server node01:9092 \
--command-config /data/kafka/config/jaas/admin.properties \
--delete --topic test_sasl
其它(可以执行完下面生产者或消费者命令后再执行查看)
# 1 查看当前的消费者组
/data/kafka/bin/kafka-consumer-groups.sh --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/admin.properties --list
# 2 查看消费者消费进度
/data/kafka/bin/kafka-consumer-groups.sh --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/admin.properties --describe --group test-consumer-group
# 3 查看 Kafka Topic 的 offset 信息
/data/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server node01:9092 \
--command-config /data/kafka/config/jaas/admin.properties --time -1 --topic test_sasl
2.7.2 生产者
新增生产者kwrite用户的配置属性文件
vim /data/kafka/config/jaas/kwriter.properties
,内容如下:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kwriter"password="kwriter123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
使用客户端命令行测试
/data/kafka/bin/kafka-console-producer.sh --bootstrap-server node01:9092 \
--producer.config /data/kafka/config/jaas/kwriter.properties --topic test_sasl
发送如下的数据:
{"id":100,"name":"one","age":1}{"id":101,"name":"two","age":2}{"id":102,"name":"three","age":3}
2.7.3 消费者
新增生产者kreader用户的配置属性文件
vim /data/kafka/config/jaas/kreader.properties
,内容如下:
group.id=test-consumer-group
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kreader"password="kreader123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
消费者测试:
/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092 \
--consumer.config /data/kafka/config/jaas/kreader.properties --from-beginning --topic test_sasl
2.7.4 sharga用户
新增生产者sharga用户的配置属性文件
vim /data/kafka/config/jaas/sharga.properties
,内容如下:
group.id=test-sharga-group
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="sharga"password="sharga123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
执行如下命令:
# 1 创建自己的topic## sharga-t01
/data/kafka/bin/kafka-topics.sh --create --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/sharga.properties --partitions 1 --replication-factor 1 --topic sharga-t01
# 2 生产者
/data/kafka/bin/kafka-console-producer.sh --bootstrap-server node01:9092 --producer.config \
/data/kafka/config/jaas/sharga.properties --topic sharga-t01
向sharga-t01的Topic发送如下消息:
{"id":100,"name":"one","age":1}{"id":101,"name":"two","age":2}{"id":102,"name":"three","age":3}
#3 同样创建 sharga-t02 Topic
/data/kafka/bin/kafka-topics.sh --create --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/sharga.properties --partitions 1 --replication-factor 1 --topic sharga-t02
# 4 生产者
/data/kafka/bin/kafka-console-producer.sh --bootstrap-server node01:9092 --producer.config \
/data/kafka/config/jaas/sharga.properties --topic sharga-t02
向sharga-t02的Topic发送如下消息:
{"id":100,"sex":"man"}{"id":101,"sex":"man"}{"id":102,"sex":"woman"}
# 5 查看当前的 topic 列表
/data/kafka/bin/kafka-topics.sh --list --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/sharga.properties
sharga用户仅能查看到自己的Topic
2.7.5 shargb用户
新增生产者shargb用户的配置属性文件
vim /data/kafka/config/jaas/shargb.properties
,内容如下:
group.id=test-shargb-group
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="shargb"password="shargb123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
执行如下命令:
# 1 创建自己的topic## shargb-t01
/data/kafka/bin/kafka-topics.sh --create --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/shargb.properties --partitions 1 --replication-factor 1 --topic shargb-t01
这里如果用 sharga用户创建 shargb-t01 就会包错(没有权限)
# 2 生产者
/data/kafka/bin/kafka-console-producer.sh --bootstrap-server node01:9092 --producer.config \
/data/kafka/config/jaas/shargb.properties --topic shargb-t01
向shargb-t01的Topic发送如下消息:
{"id":200,"name":"four","age":4}{"id":201,"name":"five","age":5}
#3 同样创建 shargb-t02 Topic
/data/kafka/bin/kafka-topics.sh --create --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/shargb.properties --partitions 1 --replication-factor 1 --topic shargb-t02
# 4 生产者
/data/kafka/bin/kafka-console-producer.sh --bootstrap-server node01:9092 --producer.config \
/data/kafka/config/jaas/shargb.properties --topic shargb-t02
向shargb-t02的Topic发送如下消息:
{"id":200,"addr":"北京"}{"id":201,"addr":"杭州"}
# 5 查看当前的 topic 列表
/data/kafka/bin/kafka-topics.sh --list --bootstrap-server node01:9092 --command-config \
/data/kafka/config/jaas/shargb.properties
shargb用户同样只能查看到自己的Topic
2.7.6 说明
这种方式是将用户和密码相关的信息保存在了服务器本地文件中了,经过前面的配置,基本涉及到安全认证的配置都放置到了
/data/kafka/config/jaas
目录下,因此如果需要在真实服务上使用时,这个目录需要经读写权限详细的分配给系统的每个用户,对于不允许使用此用户配置信息的其他用户,这个文件的读写权限可以不授予给这个用户。
2.8 生产者客户端代码设置
Properties props =newProperties();//...
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='kwriter' password='kwriter123';");
2.9 消费者客户端代码设置
Properties props =newProperties();//...
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='kreader' password='kreader123';");
2.10 Spring-kafka需配置如下
spring:#……kafka:bootstrap-servers: node01:9092#指定kafka server的地址,集群配多个,中间,逗号隔开producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:# sasl 安全认证security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='kwriter' password='kwriter123';
consumer:group-id: app_consumer_group #群组IDenable-auto-commit:trueauto-commit-interval: 1000s
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='kreader' password='kreader123';
2.11 Flink SQL
createtable kafka_sharga_t01(
id int,
name string,
age int)with('format'='json','properties.bootstrap.servers'='192.168.179.3:9092','connector'='kafka','topic'='sharga-t01','properties.group.id'='group1','scan.startup.mode'='earliest-offset','properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="sharga" password="sharga123";','properties.security.protocol'='SASL_PLAINTEXT','properties.sasl.mechanism'='PLAIN');
版权归原作者 Yore Yuen 所有, 如有侵权,请联系我们删除。