kafka-configs 脚本是用来设置主题级别参数的。其实,它的功能还有很多。比如在这个例子中,我们使用它来创建 SASL/SCRAM 认证中的用户信息。可以使用下列命令来查看刚才创建的用户数据。
./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users
#(可以单独指定某个用户 --entity-name producer,如下)
./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name producer
ZK客户端命令行查看:
./zkCli.sh -server localhost:2181
ls /config/users
3. 配置kafka jaas文件
配置了用户之后,我们需要为 Broker 创建一个对应的 JAAS 文件。在实际场景中,需要为每台单独的物理 Broker 机器都创建一份 JAAS 文件。
Kafka 的 jaas认证配置文件,配置的是登录类,超管密码和管理的帐号密码列表
vim kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username ="admin"
password="admin123"
user_admin="admin123"
user_producer="producer123"
user_consumer="consumer123";
};
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin123"
user_producer="producer123"
user_consumer="consumer123";
};
Client {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="kafka"
password="kafka123";
};
KafkaServer中usename配置的是kafka服务端使用的账号和密码,后面的user_xxx事预设的普通帐号认证信息。
中间部分配置的是PLAIN认证方式的账户和密码,其中producer1是账户名,producer123是密码。
Client配置了broker到Zookeeper的连接用户名密码,这里要和前面zookeeper配置中的zk_jaas.conf.conf 中 user_kafka 的账号和密码相同。
关于这个文件内容,需要注意以下两点:
1)不要忘记最后一行和倒数第二行结尾处的分号;
2)JAAS 文件中不需要任何空格键。
4. kafka 配置文件启用SASL认证
Kafka 服务配置文件 server.propertis,配置认证协议及认证实现类
cat server.properties其它内容都注释掉,然后追加如下内容:
broker.id=0
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
delete.topic.enable=true
auto.create.topics.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
Host.name=43.138.0.199
5. kafka 启动脚本添加认证文件路径的环境变量
Kafka 安全认证可以直接通过环境变量 -Djava.security.auth.login.config 设置,修改 Kafka 启动脚本 kafka-start-server.sh 文件最后一行,增加一个参数指向 jaas 配置文件的绝对路径
vi kafka-server-start.sh
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/home/lighthouse/kafka_2.12-2.2.1/config/kafka_server_jaas.conf kafka.Kafka "$@"
6. kafka客户端配置
- 配置consumer.properties和producer.properties,都要加入以下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
- 生产者配置
使用kafka-console-producer.sh脚本测试生产者,由于开启安全认证和授权,此时使用console-producer脚本来尝试发送消息,那么消息会发送失败,原因是没有指定合法的认证用户,因此客户端需要做相应的配置,需要创建一个名为producer.conf的配置文件给producer程序使用。
config目录下创建一个producer.conf的文件,cat producer.conf文件内容如下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.Scra
mLoginModule required username="producer" password="producer123";
注意:Topic设置写权限
- 消费者配置
使用kafka-console-consumer.sh脚本测试生产者,由于开启安全认证和授权,因此客户端需要做相应的配置。需要为 consumer 用户创建consumer.conf给消费者程序,同时设置对topic的读权限。
config目录下创建一个consumer.conf的文件,cat consumer.conf文件内容如下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.Scra
mLoginModule required username="consumer" password="consumer123";
注意:Topic设置读权限。
- 在生产者和消费者启动脚本中引入JAAS文件
vim bin/kafka-console-producer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
# 添加这行
export KAFKA_OPTS="-Djava.security.auth.login.config=../config/kafka_server_jaas.conf"
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
# vim bin/kafka-console-consumer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
# 添加这行
export KAFKA_OPTS="-Djava.security.auth.login.config=../config/kafka_server_jaas.conf"
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
启动kafka
nohup kafka-server-start.sh /path-to-kafka/config/server.properties &
我自己写了一个脚本,同时启动zookeeper和kafka
文件名称叫startup.sh内容如下:
cd /home/lighthouse/zk-3.4.14/zookeeper-3.4.14/bin
./zkServer.sh start
cd /home/lighthouse/kafka_2.12-2.2.1/
nohup bin/kafka-server-start.sh config/server.properties > output.txt &
zookeeper没有启动成功,我找下原因。把整个过程重新整理了一遍,发现zookeeper、kafka启动成功了。
检查验证
./zkCli.sh -server localhost:2181
ls /brokers/ids
1. 生产者测试
1) 创建一个测试主题test_topic
./kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test_topic
2)查看创建的Topic
./kafka-topics.sh --list --zookeeper localhost:2181 test_topic
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic
注:kafka开启认证后,当生产者往Topic写数据需要为主题配置权限(write),即对生产者赋予写的权限。
这里使用producer用户认证授权,通过ACL为producer 用户分配操作test_topic权限:
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --operation Write --topic 'test_topic'
写在最后
在结束之际,我想重申的是,学习并非如攀登险峻高峰,而是如滴水穿石般的持久累积。尤其当我们步入工作岗位之后,持之以恒的学习变得愈发不易,如同在茫茫大海中独自划舟,稍有松懈便可能被巨浪吞噬。然而,对于我们程序员而言,学习是生存之本,是我们在激烈市场竞争中立于不败之地的关键。一旦停止学习,我们便如同逆水行舟,不进则退,终将被时代的洪流所淘汰。因此,不断汲取新知识,不仅是对自己的提升,更是对自己的一份珍贵投资。让我们不断磨砺自己,与时代共同进步,书写属于我们的辉煌篇章。
需要完整版PDF学习资源私我
加入社区》https://bbs.csdn.net/forums/4304bb5a486d4c3ab8389e65ecb71ac0
版权归原作者 2401_86640535 所有, 如有侵权,请联系我们删除。