版本
操作系统:linux
kafka:kafka_2.13-3.3.2
zookeeper:apache-zookeeper-3.7.1-bin
部署
1.下载zookeeper和kafka安装包
cd /home
wget https://downloads.apache.org/kafka/3.3.2/kafka_2.12-3.3.2.tgz
wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
2.解压
tar -zxvf kafka_2.12-3.3.2.tgz
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz
3.修改zookeeper配置文件并启动
cd apache-zookeeper-3.7.1-bin/config
cp zoo_sample.cfg zoo.cfg
修改zoo.cfg文件,增加以下命令
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
创建zk_server_jaas.conf文件(开启kafka sasl认证使用),文件内容
Server {
# 认证方式为DigestLoginModule
org.apache.zookeeper.server.auth.DigestLoginModule required
# zk集群使用的账号
username="kafka"
# zk集群使用的密码
password="kafka"
# kafka连接使用的账号和密码,写法为user_账号=“密码”
user_kafka="kafka";
};
此外,认证过程需要导入kafka的依赖类,在下载的kafka/libs目录找到以下四个jar包,在zookeeper目录下创建zk_sasl_dependency目录(具体目录可根据实际情况进行修改,后续在配置文件中指定到实际目录就可以)
kafka-clients-3.3.2.jar
lz4-java-1.8.0.jar
slf4j-api-1.7.36.jar
slf4j-reload4j-1.7.36.jar
snappy-java-1.1.8.4.jar
修改bin/zkEnv.sh文件,将zk的sasl认证信息加载到jvm参数中,在zk服务启动时加载认证的jar包和认证信息
for i in /home/apache-zookeeper-3.7.1-bin/zk_sasl_dependency/*.jar;
do
CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/home/apache-zookeeper-3.7.1-bin/config/zk_server_jaas.conf "
进入bin目录启动zk
./zkServer.sh start
4.修改kafka配置并启动
新增kafka_server_jaas.conf文件
cd /home/kafka_2.13-3.3.2/config
创建kafka_server_jaas.conf
KafkaServer {
# 指定认证方法为PLAIN
org.apache.kafka.common.security.plain.PlainLoginModule required
# kafka多个broker认证的账号
username="admin"
# kafka多个broker认证的密码
password="admin"
# 定义一个用户账号为admin,密码为admin
user_admin="admin"
# 定义一个用户账号为kafka,密码为kafka,需要和zk中定义的账号密码保持一致
user_kafka="kafka";
};
# 猜测可能是集群环境下当前服务作为client的配置,但未进行实践,单机模式下可不用
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="kafka"
password="kafka";
};
开启SASL认证,修改server.properties配置文件,增加以下内容
# 允许外部端口连接
listeners=SASL_PLAINTEXT://192.168.0.106:9092
# 认证方式
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
allow.everyone.if.no.acl.found=true
# 对外提供服务的代理地址
advertised.listeners=SASL_PLAINTEXT://192.168.0.106:9092
修改bin目录下kafka启动脚本,在jvm参数中增加认证信息
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G“ 修改为
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka_2.13-3.3.2/config/kafka_server_jaas.conf"
启动服务
./kafka-server-start.sh ../config/server.properties
验证
使用kafka自带的客户端脚本进行测试验证
1.添加客户端认证信息
config目录下新建kafka_client_jaas.conf,添加以下内容
KafkaClient {
# 指定连接方式
org.apache.kafka.common.security.plain.PlainLoginModule required
# 客户端用户名,与kafka_server_jaas.conf中账号密码保持一致
username="kafka"
# 客户端密码
password="kafka";
};
2.修改config目录下producer.properties和consumer.properties,增加以下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
3.修改客户端和服务端的启动脚本kafka-console-producer.sh和kafka-console-consumer.sh,将kafka_client_jaas.conf认证信息添加至启动参数中
export KAFKA_HEAP_OPTS="-Xmx512M"
修改为
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_2.13-3.3.2/config/kafka_client_jaas.conf"
4.kafka-topics.sh添加认证信息,确保能使用命令行创建/删除topic
config目录下添加sasl_client.conf文件,指定命令执行时的认证方式,文件内容:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
修改kafka-topics.sh启动脚本,使用kafka_client_jaas.conf客户端文件中的验证参数
新增
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka_2.13-3.3.2/config/kafka_client_jaas.conf"
创建名称为test、只有一个分区的topic
bin/kafka-topics.sh --create --topic test --partitions 1 --bootstrap-server 192.168.0.106:9092 --command-config config/sasl_client.conf
5.启动生产者和消费者服务
启动生产者
./bin/kafka-console-producer.sh --broker-list 192.168.0.106:9092 --topic test --producer.config config/producer.properties
启动消费者
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.106:9092 --topic test --from-beginning --consumer.config config/consumer.properties
生产者发送消息后消费者可正常收到,整体验证结束
常用命令
topic
1.查看所有topic
bin/kafka-topics.sh --list --bootstrap-server 192.168.0.106:9092 --command-config config/sasl_client.conf
2.查看名称为test的topic
bin/kafka-topics.sh --topic test --describe --bootstrap-server 192.168.0.106:9092 --command-config config/sasl_client.conf
3.创建test的topic,并且创建一个分区
bin/kafka-topics.sh --create --topic test --partitions 1 --bootstrap-server 192.168.0.106:9092 --command-config config/sasl_client.conf
4.删除名称为test的topic
bin/kafka-topics.sh --delete --topic test --bootstrap-server 192.168.0.106:9092 --command-config config/sasl_client.conf
groups
使用kafka-consumer-groups.sh脚本之前,需要参照kafka-topics.sh脚本先进行修改,将kafka_client_jaas.conf文件中的配置信息导入到环境变量中
1.查看所有的groups信息
bin/kafka-consumer-groups.sh --list --bootstrap-server 192.168.0.106:9092 --command-config config/sasl_client.conf
2.查看指定的groups信息
bin/kafka-consumer-groups.sh --group test --describe --bootstrap-server 192.168.0.106:9092 --command-config config/sasl_client.conf
3.查看指定分组的offset和描述
bin/kafka-consumer-groups.sh --group 'test' --offsets --describe --bootstrap-server 192.168.0.106:9092 --command-config config/sasl_client.conf
producer
生产消息
bin/kafka-console-producer.sh --broker-list 192.168.0.106:9092 --topic test --producer.config config/producer.properties
consumer
消费消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.106:9092 --topic test --from-beginning --consumer.config config/consumer.properties
相关网址
参考博客
https://www.cnblogs.com/ilovena/p/10123516.html
https://blog.csdn.net/small_tu/article/details/109534634
kafka官网
https://kafka.apache.org/
kafka中文文档
https://kafka.apachecn.org/
版权归原作者 JMH741 所有, 如有侵权,请联系我们删除。