Kafka 认证模式命令使用示例
创建Topic
指定用户创建
[root@kafka01 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-topics.sh --bootstrap-server x.x.x.11:9092 --create --topic fkaaa35 --replication-factor 3 --partitions 3 --command-config /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties
Created topic fkaaa35.
创建Topic详细信息
[root@kafka01 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-topics.sh --bootstrap-server x.x.x.11:9092 --describe --command-config /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties
Topic: kafka35a TopicId: JfCNAbxdRj2RmqCCaakOng PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: kafka35a Partition: 0 Leader: 6 Replicas: 6 Isr: 6
Topic: fkaaa35 TopicId: cf4esxrdTwGMHGq1uWntmA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: fkaaa35 Partition: 0 Leader: 2 Replicas: 2,4,6 Isr: 2,4,6
Topic: fkaaa35 Partition: 1 Leader: 4 Replicas: 4,6,2 Isr: 4,6,2
Topic: fkaaa35 Partition: 2 Leader: 6 Replicas: 6,2,4 Isr: 6,2,4
查看Topic列表
[root@kafka01 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-topics.sh --bootstrap-server x.x.x.11:9092 --list --command-config /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties
__consumer_offsets
fffka35
ffka35
fka35
fkaaa35
kafka35
kafka35a
改变Topic分区数量
[root@kafka01 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-topics.sh --bootstrap-server x.x.x.11:9092 --topic fka35 --alter --partitions 4 --command-config /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties
再次查询验证分区数量
[root@kafka01 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-topics.sh --bootstrap-server x.x.x.11:9092 --describe --command-config /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties
Topic: kafka35a TopicId: JfCNAbxdRj2RmqCCaakOng PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: kafka35a Partition: 0 Leader: 6 Replicas: 6 Isr: 6
Topic: fka35 TopicId: VSEfw7yMTmybd1U7hcRWwg PartitionCount: 4 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: fka35 Partition: 0 Leader: 4 Replicas: 4 Isr: 4
Topic: fka35 Partition: 1 Leader: 6 Replicas: 6 Isr: 6
Topic: fka35 Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: fka35 Partition: 3 Leader: 4 Replicas: 4 Isr: 4
生产数据
必须使用–bootstrap-server方式生产,必须携带认证文件
/usr/local/kafka3.5-sasl-data/bin/kafka-console-producer.sh --bootstrap-server x.x.x.11:9092 --topic fkaaa35 --producer.config /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties
消费数据
[root@kafka18 bin]# /usr/local/kafka3.5-sasl-data/bin/kafka-console-consumer.sh --bootstrap-server x.x.x.18:9092 --topic fkaaa35 --from-beginning --consumer.config /usr/local/kafka3.5-sasl-data/config/kraft/userb_consumer.properties
查看消费组名称
[root@kafka18 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.18:9092 --list --command-config /usr/local/kafka3.5-sasl-data/config/kraft/userb_consumer.properties
console-consumer-94931
console-consumer-22274
查看某消费组消费消息
[root@kafka18 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.18:9092 --group console-consumer-22274 --describe --command-config /usr/local/kafka3.5-sasl-data/config/kraft/userb_consumer.properties
查看Topic情况
[root@kafka01 kafkasaslbroker]# /usr/local/kafka3.5-sasl-data/bin/kafka-topics.sh --bootstrap-server x.x.x.18:9092 --describe --command-config /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties
Topic: n35 TopicId: IbBjY51sTQGqASkZrk8WxQ PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: n35 Partition: 0 Leader: 6 Replicas: 6,2,4 Isr: 6,2,4
Topic: n35 Partition: 1 Leader: 2 Replicas: 2,4,6 Isr: 2,4,6
Topic: n35 Partition: 2 Leader: 4 Replicas: 4,6,2 Isr: 4,6,2
Topic: __consumer_offsets TopicId: Kbp3tGJ6QGyFhZK9_OeWww PartitionCount: 50 ReplicationFactor: 2 Configs: compression.type=producer,cleanup.policy=compact,segment.bytes=104857600
Topic: __consumer_offsets Partition: 0 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: __consumer_offsets Partition: 1 Leader: 4 Replicas: 4,6 Isr: 4,6
Topic: __consumer_offsets Partition: 2 Leader: 6 Replicas: 6,2 Isr: 6,2
Topic: __consumer_offsets Partition: 3 Leader: 2 Replicas: 2,6 Isr: 2,6
Topic: __consumer_offsets Partition: 4 Leader: 6 Replicas: 6,4 Isr: 6,4
Topic: __consumer_offsets Partition: 5 Leader: 4 Replicas: 4,2 Isr: 4,2
Topic: __consumer_offsets Partition: 6 Leader: 2 Replicas: 2,6 Isr: 2,6
Topic: __consumer_offsets Partition: 7 Leader: 6 Replicas: 6,4 Isr: 6,4
Topic: __consumer_offsets Partition: 8 Leader: 4 Replicas: 4,2 Isr: 4,2
Topic: __consumer_offsets Partition: 9 Leader: 4 Replicas: 4,6 Isr: 4,6
Topic: __consumer_offsets Partition: 10 Leader: 6 Replicas: 6,2 Isr: 6,2
Topic: __consumer_offsets Partition: 11 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: __consumer_offsets Partition: 12 Leader: 6 Replicas: 6,4 Isr: 6,4
Topic: __consumer_offsets Partition: 13 Leader: 4 Replicas: 4,2 Isr: 4,2
Topic: __consumer_offsets Partition: 14 Leader: 2 Replicas: 2,6 Isr: 2,6
Topic: __consumer_offsets Partition: 15 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: __consumer_offsets Partition: 16 Leader: 4 Replicas: 4,6 Isr: 4,6
Topic: __consumer_offsets Partition: 17 Leader: 6 Replicas: 6,2 Isr: 6,2
Topic: __consumer_offsets Partition: 18 Leader: 6 Replicas: 6,4 Isr: 6,4
Topic: __consumer_offsets Partition: 19 Leader: 4 Replicas: 4,2 Isr: 4,2
Topic: __consumer_offsets Partition: 20 Leader: 2 Replicas: 2,6 Isr: 2,6
Topic: __consumer_offsets Partition: 21 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: __consumer_offsets Partition: 22 Leader: 4 Replicas: 4,6 Isr: 4,6
Topic: __consumer_offsets Partition: 23 Leader: 6 Replicas: 6,2 Isr: 6,2
Topic: __consumer_offsets Partition: 24 Leader: 6 Replicas: 6,4 Isr: 6,4
Topic: __consumer_offsets Partition: 25 Leader: 4 Replicas: 4,2 Isr: 4,2
Topic: __consumer_offsets Partition: 26 Leader: 2 Replicas: 2,6 Isr: 2,6
Topic: __consumer_offsets Partition: 27 Leader: 4 Replicas: 4,6 Isr: 4,6
Topic: __consumer_offsets Partition: 28 Leader: 6 Replicas: 6,2 Isr: 6,2
Topic: __consumer_offsets Partition: 29 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: __consumer_offsets Partition: 30 Leader: 4 Replicas: 4,6 Isr: 4,6
Topic: __consumer_offsets Partition: 31 Leader: 6 Replicas: 6,2 Isr: 6,2
Topic: __consumer_offsets Partition: 32 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: __consumer_offsets Partition: 33 Leader: 6 Replicas: 6,2 Isr: 6,2
Topic: __consumer_offsets Partition: 34 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: __consumer_offsets Partition: 35 Leader: 4 Replicas: 4,6 Isr: 4,6
Topic: __consumer_offsets Partition: 36 Leader: 4 Replicas: 4,2 Isr: 4,2
Topic: __consumer_offsets Partition: 37 Leader: 2 Replicas: 2,6 Isr: 2,6
Topic: __consumer_offsets Partition: 38 Leader: 6 Replicas: 6,4 Isr: 6,4
Topic: __consumer_offsets Partition: 39 Leader: 6 Replicas: 6,4 Isr: 6,4
Topic: __consumer_offsets Partition: 40 Leader: 4 Replicas: 4,2 Isr: 4,2
Topic: __consumer_offsets Partition: 41 Leader: 2 Replicas: 2,6 Isr: 2,6
Topic: __consumer_offsets Partition: 42 Leader: 4 Replicas: 4,6 Isr: 4,6
Topic: __consumer_offsets Partition: 43 Leader: 6 Replicas: 6,2 Isr: 6,2
Topic: __consumer_offsets Partition: 44 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: __consumer_offsets Partition: 45 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: __consumer_offsets Partition: 46 Leader: 4 Replicas: 4,6 Isr: 4,6
Topic: __consumer_offsets Partition: 47 Leader: 6 Replicas: 6,2 Isr: 6,2
Topic: __consumer_offsets Partition: 48 Leader: 4 Replicas: 4,6 Isr: 4,6
Topic: __consumer_offsets Partition: 49 Leader: 6 Replicas: 6,2 Isr: 6,2
主机节点架构
地址主机名角色x.x.x.11:9092kafka01brokerx.x.x.14:9092kafka14brokerx.x.x.18:9092kafka18brokerx.x.x.11:9093kafka01controllerx.x.x.14:9093kafka14controllerx.x.x.18:9093kafka18controller
主机名设置
确保每台主机名不重复,执行如
hostnamectl set-hostname kafka01 &&bash
hostnamectl set-hostname kafka14 &&bash
hostnamectl set-hostname kafka18 &&bash
设置hosts文件解析
cat>>/etc/hosts <<EOF
x.x.x.11 kafka01
x.x.x.14 kafka14
x.x.x.18 kafka18
EOF
Kafka配置文件说明
部署路径如下:
Controller角色部署路径:/usr/local/kafka3.5-sasl-controller
Broker角色部署路径:/usr/local/kafka3.5-sasl-data
Controller 节点关键配置说明:
在Controller节点上需要配置/usr/local/kafka3.5-sasl-controller/config/kraft/server.properties文件
listeners=CONTROLLER://x.x.x.x:9091 # 控制器监听器地址# SASL认证配置
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
log.dirs=/kafkasaslcontroller # 数据目录 # 控制器存储配置
controller.socket.timeout.ms=30000
controller.metadata.max.age.ms=300000
controller.listener.names=CONTROLLER
# 控制器选举配置
controller.election.type=kraft #用于指定控制器选举的类型。Kafka 支持两种类型的控制器选举机制:zk_sync:基于 ZooKeeper 的同步控制器选举。kraft:KRaft 模式下的控制器选举,这是 Kafka 未来版本中的一个新特性,它不依赖于 ZooKeeper。
controller.metadata.storage.topic=kafka_controller_metadata #用于指定存储控制器元数据快照的Kafka内部主题。这个参数是在Kafka 0.10版本中引入的,用于替代旧版本中的Zookeeper。控制器是Kafka集群中负责负载均衡、分区领导者选举和集群范围内的变更(如新的broker加入或离开)的broker。控制器元数据快照包含了集群的所有元数据信息,例如broker列表、主题分区分配和副本集。默认情况下,Kafka使用_kafka_metadata这个内部主题来存储控制器的元数据快照。如果你需要修改这个参数,确保新指定的主题满足以下要求:这个主题必须是分区数为1,副本因子为(controller.broker.count+1)/ 2的事务主题。
这个主题的清理策略必须设置为delete,以便快照可以被删除。如果你需要修改这个参数,你可以在Kafka配置文件中设置新的主题名称,并确保新主题满足上述要求。然后,你需要创建这个主题,并设置合适的配置。例如,使用Kafka命令行工具:
kafka-topics.sh --create --topic my_custom_metadata_topic --partitions 1 --replication-factor 2 --config cleanup.policy=delete
controller.metadata.storage.replication.factor=3
controller.metadata.storage.min.insync.replicas=2# 动态配置更新
sasl.mechanism.controller.protocol=PLAIN #集群间认证时用的认证方式
super.users=User:admin #设置超级管理员
Broker节点关键配置说明:
在broker节点上需要配置/usr/local/kafka3.5-sasl-broker/config/kraft/server.properties文件。以下是一些关键的配置项:
listeners=SASL_PLAINTEXT://x.x.x.x:9092 #监听器配置# SASL 认证配置
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
log.dirs=/kafkasaslbroker #数据目录
controller.quorum.voters=1@controller-ip:9091 #控制器连接配置# 动态配置更新
dynamic.config.topic=kafka_config
# 其他配置
group.initial.rebalance.delay.ms=0
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
min.insync.replicas=2
设置SASL认证
编辑认证jaas文件
分别在controller和broker节点设置,文件内容相同
controller节点:
vim /usr/local/kafka3.5-sasl-controller/config/kraft/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"password="password"user_admin="password"user_test="test"user_producer="prod-sec"user_consumer="cons-sec";};
Broker节点:
vim /usr/local/kafka3.5-sasl-data/config/kraft/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"password="password"user_admin="password"user_test="test"user_producer="prod-sec"user_consumer="cons-sec";
};
编辑脚本变量
controller节点:
vim /usr/local/kafka3.5-sasl-controller/bin/kafka-run-class.sh
KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka3.5-sasl-controller/config/kraft/kafka_server_jaas.conf"
Broker节点:
vim /usr/local/kafka3.5-sasl-data/bin/kafka-run-class.sh
KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka3.5-sasl-data/config/kraft/kafka_server_jaas.conf"
生成集群id
在任意一个kafka节点上执行即可,初始化集群数据目录,首先生成存储目录唯一 ID。生成后保存生成的字符串。这个集群ID事实上是一个长度16位的字符串通过Base64编码后得来的,因此也可以不使用上述命令,直接自定义一个16位长度的纯英文和数字组成的字符串,然后将这个字符串编码为Base64格式作为这个集群ID也可以。可以使用相关工具Base64编码工具。
生成集群id:
[root@kafka18 kafkacontroller]# /usr/local/kafka3.5-sasl-controller/bin/kafka-storage.sh random-uuid
0awG6LDDRRSS0nYDN6LUEw
格式化所有kafka节点数据目录
然后分别在每个kafka进程执行下面命令,用该 ID 格式化 kafka 存储目录。完成集群元数据配置,-t指定刚才生成的字符串。
本部署方案为三controller节点,三broker节点分离部署方案,那么6个kafka进程都要执行格式化,一共执行6次。
首先格式化3个controller:
每个controller节点必须执行,一共执行3次。
[root@kafka01 kraft]# /usr/local/kafka3.5-sasl-controller/bin/kafka-storage.sh format -t 0awG6LDDRRSS0nYDN6LUEw -c /usr/local/kafka3.5-sasl-controller/config/kraft/server.properties
Formatting /kafkasaslcontroller with metadata.version 3.5-IV2.
执行后kafka的controller节点数据目录会生成2个文件
[root@kafka14 kafkasaslcontroller]# ll
total 8
-rw-r--r-- 1 root root 249 Sep 7 01:58 bootstrap.checkpoint
-rw-r--r-- 1 root root 86 Sep 7 01:58 meta.properties
其次格式化3个broker:
每个broker节点必须执行,一共执行3次。
[root@kafka18 kraft]# /usr/local/kafka3.5-sasl-data/bin/kafka-storage.sh format -t 0awG6LDDRRSS0nYDN6LUEw -c /usr/local/kafka3.5-sasl-data/config/kraft/server.properties
Formatting /kafkasaslbroker with metadata.version 3.5-IV2.
启动kafka kraft集群
启动方式与传统模式启动方法一样。首先启动3个controller节点,最后启动3个broker节点
首先启动controller节点:
/usr/local/kafka3.5-sasl-controller/bin/kafka-server-start.sh -daemon /usr/local/kafka3.5-sasl-controller/config/kraft/server.properties
其次启动broker节点:
/usr/local/kafka3.5-sasl-data/bin/kafka-server-start.sh -daemon /usr/local/kafka3.5-sasl-data/config/kraft/server.properties
关闭时首先关闭broker节点,最后关闭controller节点
编辑认证携带文件
在kafka的broker节点编辑设置。因为开启了安全认证,所以执行命令需要携带含有认证用户信息的认证文件。认证文件路径自定义即可,本方案放在 /usr/local/kafka3.5-sasl-data/config/kraft/路径下。
编写生产者用户的认证文件
vim /usr/local/kafka3.5-sasl-data/config/kraft/usera_producer.properties
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \username="producer"\password="prod-sec";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
usera_producer.properties的使用方法是通过--producer.config参数携带。
编写消费者用户userb的认证文件
如果后续指定此配置文件无法消费,需要先查出消费者组名称,然后在文件第一行添加group.id参数,并指定消费者组。
vim /usr/local/kafka3.5-sasl-data/config/kraft/userb_consumer.properties
#group.id=console-consumer-94652
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \username="consumer"\password="cons-sec";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
userb_consumer.properties的使用方法是通过--consumer.config参数携带。
3.3 编写生产者用户usera的客户端认证文件
vim /usr/local/kafka3.5-sasl-data/config/kraft/usera-writer-jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"password="prod-sec";};
编写消费者用户userb的客户端认证文件
vim /usr/local/kafka3.5-sasl-data/config/kraft/userb-read-jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"password="cons-sec";};
上述usera-writer-jaas.conf和userb-read-jaas.conf文件的使用方法均需通过--command-config参数携带。
分别修改生产者脚本和消费者脚本变量
修改broker端脚本,放在脚本第一行即可
vim /usr/local/kafka3.5-sasl-data/bin/kafka-console-producer.sh
KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka3.5-sasl-data/config/kraft/usera-writer-jaas.conf"vim /usr/local/kafka3.5-sasl-data/bin/kafka-console-consumer.sh
KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka3.5-sasl-data/config/kraft/userb-read-jaas.conf"
重启kafka broker节点
执行/usr/local/kafka3.5-sasl-data/bin/kafka-server-stop.sh,关闭kakfa broker进程后,启动controller、broker节点:
首先启动controller节点:
/usr/local/kafka3.5-sasl-controller/bin/kafka-server-start.sh -daemon /usr/local/kafka3.5-sasl-controller/config/kraft/server.properties
其次启动broker节点:
/usr/local/kafka3.5-sasl-data/bin/kafka-server-start.sh -daemon /usr/local/kafka3.5-sasl-data/config/kraft/server.properties
Kafka-controller和kafka-broker节点配置文件示例
kafka-controller节点配置示例
process.roles=controller
node.id=1# 每个主机的id必须不同
[email protected]:9093,[email protected]:9093,[email protected]:9093
listeners=CONTROLLER://x.x.x.11:9093 # 每个主机的ip必须不同,填写当前主机ip
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
controller.election.type=kraft
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=209715200
sasl.login.timeout.ms=1500000
sasl.connection.timeout.ms=30000000
sasl.mechanism.controller.protocol=PLAIN
super.users=User:admin
sasl.mechanism=PLAIN
log.dirs=/kafkasaslcontroller
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
kafka-broker节点配置示例
process.roles=broker
node.id=2# 每个主机的id必须不同
[email protected]:9093,[email protected]:9093,[email protected]:9093
listeners=SASL_PLAINTEXT://x.x.x.11:9092 # 每个主机的ip必须不同,填写当前主机ip
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=209715200
sasl.login.timeout.ms=15000000
sasl.connection.timeout.ms=30000000
sasl.mechanism.controller.protocol=PLAIN
super.users=User:admin
sasl.mechanism=PLAIN
log.dirs=/kafkasaslbroker
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
以上就是Kafka Kraft模式下配置SASL的配置过程,有哪里不懂可以下面评论~
文档持续更新中~
版权归原作者 斯普信专业组 所有, 如有侵权,请联系我们删除。