0


kafka3.8+zookeeper3.9集群自动化部署、sasl+acl配置、kafka connect配置部署

1.kafka&zookeeper集群模式部署

1.1.kafka集群管理说明

1.kafka的主题会有多个分区,每个分区有多个副本,副本中有⼀个被选为领导者(选举是Kafka内部的机制,会
考虑副本的数据同步进度.健康状态等因素),负责处理所有的读写请求。
2.消费者组Rebalance机制是重分配订阅的topic的每个分区
3.消费者Rebalance触发时机
a.消费者组中consumer的个数发⽣变化(有新的consumer加入到消费者组,或者是某个consumer停⽌了)
b.订阅的topic的个数发⽣变化(消费者可以订阅多个主题,假设当前的消费者组订阅了三个主题,但有⼀个主
题突然被删除)
c.订阅的topic分区数发⽣变化(新的Partition加入topic)
4.⽣产者的分区写入策略:轮询策略(默认的策略),⽣产者在发送消息时,会按照分区的顺序(通常是根据
分区的顺序编号)依次将消息发送到各个分区。比如,如果有三个分区,那么消息1会被发送到分区0,消息2
会被发送到分区1,消息3会被发送到分区2,然后循环重复这个过程
在这里插入图片描述

1.2.环境介绍

kafka&zookeeper部署在kafka01,kafka02,kafka02服务器上,每台服务器部署一个节点,采取三个节点集群模式。 本部署适合生产部署,客户端使用需要配置本地/etc/hosts。
服务器名称主机ip地址kafka01192.112.8.4kafka02192.112.8.5kafka03192.112.8.6
部署架构图如下

1.3.创建目录并上传文件

root用户在三台服务器创建安装包目录

mkdir -p /test/kafka_soft

只要将以下文件上传到/test/kafka_soft目录即可
由于传输文件只能用test用户,先用test用户在test目录下新建tempfile文件夹,再用test用户

将kafka_2.13-3.8.0.tgz,apache-zookeeper-3.9.2-bin.tar.gz,kafka_install.sh和zk_install.sh分别上传到三台服务器的
/test/tempfile目录,再移动到/test/kafka_soft目录

zk_install.sh内容为

#!/bin/bash
ZK_HOME="/test/kafka_soft"
ZK_DATA_DIR="/testlog/zookeeper/data"
ZK_LOG_DIR="/testlog/zookeeper/logs"

SERVERS=("kafka01" "kafka02" "kafka03")
SERVER_ID=$1
ip1="192.112.8.4"
ip2="192.112.8.5"
ip3="192.112.8.6"
HOSTNAME1="kafka01"
HOSTNAME2="kafka02"
HOSTNAME3="kafka03"
HOSTS_FILE="/etc/hosts"

if grep -q "$ip1 $HOSTNAME1" "$HOSTS_FILE"; then
    echo "IP $ip1 already exists in $HOSTS_FILE."
else
    echo "$ip1 $HOSTNAME1" >> "$HOSTS_FILE"
   echo "Added $ip1 to $HOSTS_FILE."
fi
if grep -q "$ip2 $HOSTNAME2" "$HOSTS_FILE"; then
    echo "IP $ip2 already exists in $HOSTS_FILE."
else
    echo "$ip2 $HOSTNAME2" >> "$HOSTS_FILE"
    echo "Added $ip2 to $HOSTS_FILE."
fi
if grep -q "$ip3 $HOSTNAME3" "$HOSTS_FILE"; then
    echo "IP $ip3 already exists in $HOSTS_FILE."
else
    echo "$ip3 $HOSTNAME3" >> "$HOSTS_FILE"
    echo "Added $ip3 to $HOSTS_FILE."
fi
mkdir -p  $ZK_DATA_DIR 
mkdir -p  $ZK_LOG_DIR

cd $ZK_HOME
tar -xzvf apache-zookeeper-3.8.4-bin.tar.gz

cat > $ZK_HOME/apache-zookeeper-3.8.4-bin/conf/zoo.cfg <<EOF
tickTime=2000
dataDir=$ZK_DATA_DIR
dataLogDir=$ZK_LOG_DIR
clientPort=2181
maxClientCnxns=60
initLimit=10
syncLimit=4
server.1=kafka01:2888:3888
server.2=kafka02:2888:3888
server.3=kafka03:2888:3888
EOF

echo $SERVER_ID > $ZK_DATA_DIR/myid

cd $ZK_HOME/apache-zookeeper-3.8.4-bin/bin
sed -i 's#ZOO_LOG_DIR="$ZOOKEEPER_PREFIX/logs"#ZOO_LOG_DIR="/testlog/zookeeper/logs"#' zkEnv.sh
sed -i 's#ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-1000}"#ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-8192}"#' zkEnv.sh
sed -i 's#ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-256}"#ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-4096}"#' zkEnv.sh

chown -R kafka:kafka /test/kafka_soft
chmod -R 755 /test/kafka_soft
chown -R kafka:kafka /testlog/zookeeper
chmod -R 755 /testlog/zookeeper
#su -c " $ZK_HOME/apache-zookeeper-3.8.4-bin/bin/zkServer.sh start" -s /bin/bash kafka
echo "ZooKeeper配置和启动已完成。"

kafka_install.sh内容为

#!/bin/bash
kafka_HOME="/test/kafka_soft"
kafka_DATA_DIR="/testlog/kafka/data"
kafka_LOG_DIR="/testlog/kafka/logs"

SERVERS=("kafka01" "kafka02" "kafka03")

SERVER_ID=$1
HOSTNAME=$2

mkdir -p  $kafka_DATA_DIR 
mkdir -p  $kafka_LOG_DIR

cd $kafka_HOME
tar -xzvf kafka_2.13-3.8.0.tgz

cd $kafka_HOME/kafka_2.13-3.8.0/config
mv server.properties server_back.properties

cat > $kafka_HOME/kafka_2.13-3.8.0/config/server.properties <<EOF
broker.id=$SERVER_ID
listeners=SASL_PLAINTEXT://$HOSTNAME:9092
advertised.listeners=SASL_PLAINTEXT://$HOSTNAME:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
log.dirs=/testlog/kafka/logs
max.poll.interval.ms=600000
session.timeout.ms=120000
fetch.min.bytes=2048576
num.partitions=3
log.retention.hours=2400
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181
EOF

cat > $kafka_HOME/kafka_2.13-3.8.0/config/kafka_server_jaas.conf <<EOF
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_producer="test"
    user_consumer="test"

};
EOF

cat > $kafka_HOME/kafka_2.13-3.8.0/config/kafka_client_jaas.conf <<EOF
KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin";
};
EOF

cat > $kafka_HOME/kafka_2.13-3.8.0/config/client.properties <<EOF
# client.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required  username="admin"     password="admin";
EOF

sed -i 's#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"#export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G -Djava.security.auth.login.config=/test/kafka_soft/kafka_2.13-3.8.0/config/kafka_server_jaas.conf "#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-server-start.sh

sed -i 's#export KAFKA_HEAP_OPTS="-Xmx512M"#export KAFKA_HEAP_OPTS="-Xmx2048M -Djava.security.auth.login.config=/test/kafka_soft/kafka_2.13-3.8.0/config/kafka_client_jaas.conf"#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-console-producer.sh

sed -i 's#export KAFKA_HEAP_OPTS="-Xmx512M"#export KAFKA_HEAP_OPTS="-Xmx2048M -Djava.security.auth.login.config=/test/kafka_soft/kafka_2.13-3.8.0/config/kafka_client_jaas.conf"#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-console-consumer.sh

sed -i 's#LOG_DIR="$base_dir/logs"#LOG_DIR="/testlog/kafka/logs"#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-run-class.sh
sed -i 's#KAFKA_HEAP_OPTS="-Xmx256M"#KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"#'  /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-run-class.sh

chown -R kafka:kafka /test/kafka_soft
chmod -R 755 /test/kafka_soft
chown -R kafka:kafka /testlog/kafka
chmod -R 755 /testlog/kafka
echo "kafka配置已完成。"

1.4.安装zookeeper

1.4.1.安装zookeeper

root用户在kafka01执行zk_install.sh

cd /test 
mv tempfile/* ./kafka_soft
cd /test/kafka_soft
chmod 755 zk_install.sh
./zk_install.sh 1

root用户在kafka02执行zk_install.sh

cd /test 
mv tempfile/* ./kafka_soft
cd /test/kafka_soft
chmod 755 zk_install.sh
./zk_install.sh  2

root用户在kafka02执行zk_install.sh

cd /test 
mv tempfile/* ./kafka_soft
cd /test/kafka_soft
chmod 755 zk_install.sh
./zk_install.sh  3

1.4.2.启动和验证zookeeper

三台服务器均执行启动命令

su test
/test/kafka_soft/apache-zookeeper-3.9.2-bin/bin/zkServer.sh start

三台服务器均执行查看节点状态命令

su test
/test/kafka_soft/apache-zookeeper-3.9.2-bin/bin/zkServer.sh status

如下图,一个节点是leader,另外两个节点是follower在这里插入图片描述

1.5.安装kafka

1.5.1.安装kafka(jdk需要自行安装)

root用户在kafka01执行kafka_install.sh

cd /test/kafka_soft
chmod 755 kafka_install.sh
./kafka_install.sh 1 kafka01

root用户在kafka02执行kafka_install.sh

cd /test/kafka_soft
chmod 755 kafka_install.sh
./kafka_install.sh  2 kafka02

root用户在kafka02执行kafka_install.sh

cd /test/kafka_soft
chmod 755 kafka_install.sh
./kafka_install.sh 3 kafka02

1.5.2.启动kafka

三台服务器均执行启动命令

su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-server-start.sh -daemon ../config/server.properties

1.5.3.验证kafka集群(如果连接的ssh是test用户则无需执行su test)

ssh登录任意一台kafka服务器,以任意一台服务创建topic,执行

su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-topics.sh --create --bootstrap-server kafka02:9092 --replication-factor 3 --partitions 3 --topic test1   --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties

ssh登录任意一台kafka服务器,以任意一台为服务查看topic,执行

su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-topics.sh --list --bootstrap-server kafka01:9092 --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties

在这里插入图片描述

ssh登录任意一台kafka服务器,以任意一台服务作为producer发送消息,执行

su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-console-producer.sh --bootstrap-server  kafka01:9092 --topic test1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN

ssh登录任意一台kafka服务器,以任意一台服务作为consumer接受消息,执行

su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-console-consumer.sh --bootstrap-server kafka02:9092  --topic test1 --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

在producer端发送消息,consumer可以正常接受消息,验证通过
在这里插入图片描述

测试完之后删掉测试数据

su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-topics.sh --bootstrap-server kafka02:9092 --delete --topic test1  --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties

1.6.kafka开启acl

1.6.1.修改配置

修改配置文件/test/kafka_soft/kafka_2.13-3.8.0/config/server.properties
将下列配置新增到配置文件中,所有节点均需执行

authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin

重启各个节点

su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-server-stop.sh
./kafka-server-start.sh -daemon ../config/server.properties

在添加acl规则前所有用户都有创建主题,查看主题,生产消息和消费消息权限
在添acl规则后只有admin用户拥有所有权限,普通用户没有任何权限

1.6.2.权限验证

在添加规则前producer用户无法生产和消费消息

su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-console-producer.sh --bootstrap-server  kafka03:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
./kafka-console-consumer.sh --bootstrap-server kafka02:9092  --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

给producer用户添加主题(test )的acl可写规则

su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-acls.sh --authorizer-properties zookeeper.connect=kafka02:2181 --add --allow-principal User:producer  --operation Write --topic test 

给producer用户添加主题(test )的acl可读规则

./kafka-acls.sh --authorizer-properties zookeeper.connect=kafka02:2181 --add --allow-principal User:producer  --operation Read  --topic test  --group '*'

查看主题(test )acl规则

./kafka-acls.sh --authorizer-properties zookeeper.connect=kafka02:2181 --list --topic test 

添加acl规则后主题test 生产和消费消息

./kafka-console-producer.sh --bootstrap-server  kafka02:9092 --topic test  --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
./kafka-console-consumer.sh --bootstrap-server kafka02:9092  --topic test  --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

移除acl规则

./kafka-acls.sh --authorizer-properties zookeeper.connect=kafka02:2181 --remove --allow-principal User:alice  --operation Write --topic test 

1.7.kafka其他命令

创建自定义保留主题的消息的时间的,retention.ms是毫秒

su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-topics.sh --create --bootstrap-server kafka02:9092 --replication-factor 3 --partitions 3 --topic test-half   -config retention.ms=1800000 --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties 

查看主题信息

./kafka-configs.sh --describe --entity-type topics --entity-name test-half --bootstrap-server kafka02:9092  -command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties

修改主题保留消息时间

./kafka-configs.sh --alter --entity-type topics --entity-name test-half --add-config retention.ms=1800000 --bootstrap-server kafka02:9092 --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties

1.8.安装kafka connect

默认全部用test用户

1.8.1.单点配置

su test
mkdir -p /test/kafka_soft/plugins/Fileconnector
touch /test/kafka_soft/test.txt
touch /test/kafka_soft/test.sink.txt
cp /test/kafka_soft/kafka_2.13-3.8.0/libs/connect-file-3.8.0.jar  /test/kafka_soft/plugins/Fileconnector/

1、在kafka01上修改connect-standalone.properties配置
修改/test/kafka_soft/kafka_2.13-3.8.0/config/connect-standalone.properties

bootstrap.servers=localhost:9092

改为

bootstrap.servers=kafka01:9092

并在/test/kafka_soft/kafka_2.13-3.8.0/config/connect-standalone.properties文件下面追加

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required  username="admin"     password="adminpassword";

producer.bootstrap.servers=kafka01:9092
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required  username="admin"     password="adminpassword";

consumer.bootstrap.servers=kafka01:9092
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required  username="admin"     password="adminpassword";

offset.flush.interval.ms=10000
listenters=HTTP://:8083
plugin.path=/test/kafka_soft/plugins

2、在kafka01上修改connect-file-source.properties配置
修改/test/kafka_soft/kafka_2.13-3.8.0/config/connect-file-source.properties

file=test.txt
topic=connect-test

改为

file=/test/kafka_soft/test.txt
topic=test 

3、在kafka01上修改connect-file-sink.properties配置
修改/test/kafka_soft/kafka_2.13-3.8.0/config/connect-file-sink.properties

file=test.sink.txt
topics=test 

改为

file=/test/kafka_soft/test.sink.txt
topics=test 

4、单点启动kafka connect

su test
cd /test/kafka_soft/kafka_2.13-3.8.0
bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-source.properties  config/connect-file-sink.properties

5、验证
若验证用户是非admin用户需要配置读写权限
新开一个ssh窗口,三台服务器均可

su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-console-consumer.sh --bootstrap-server kafka02:9092  --topic test  --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

新开kafka01的ssh窗口

cd /test/kafka_soft
echo "20241030-01" >>test.txt
echo "20241030-02" >>test.txt

查看topic是否能消费消息

查看test.sink.txt是否有消息进入

cd /test/kafka_soft
cat test.sink.txt

1.8.2.集群配置

1、修改配置,三个节点均需执行

su test
mkdir -p /test/kafka_soft/plugins/Fileconnector
touch /test/kafka_soft/test.txt
touch /test/kafka_soft/test.sink.txt
cp /test/kafka_soft/kafka_2.13-3.8.0/libs/connect-file-3.8.0.jar  /test/kafka_soft/plugins/Fileconnector/
cd /test/kafka_soft/kafka_2.13-3.8.0/config
mv connect-distributed.properties connect-distributed_20241030.properties
cat > /test/kafka_soft/kafka_2.13-3.8.0/config/connect-distributed.properties <<EOF
bootstrap.servers=192.112.8.4:9092,192.112.8.5:9092,192.112.8.6:9092
group.id=connect-cluster
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required  username="admin"     password="adminpassword";
producer.bootstrap.servers=192.112.8.4:9092,192.112.8.5:9092,192.112.8.6:9092
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required  username="admin"     password="adminpassword";
consumer.bootstrap.servers=192.112.8.4:9092,192.112.8.5:9092,192.112.8.6:9092
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required  username="admin"     password="adminpassword";
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
listeners=HTTP://:8083
plugin.path=/test/kafka_soft/plugins
EOF

2、kafka connect集群启动
三个节点均需执行

su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./connect-distributed.sh -daemon ../config/connect-distributed.properties

3、验证

curl http://192.112.8.4:8083
curl http://192.112.8.5:8083
curl http://192.112.8.6:8083

4、生成连接器

cat >  /test/kafka_soft/file-source-connector-test.json <<EOF
   {
       "name": "file-source-connector",
       "config": {
           "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
           "tasks.max": "1",
           "file": "/test/kafka_soft/xiaoli.txt", 
           "topic": "test "
       }
       }
EOF
cd /test/kafka_soft
curl -X POST -H "Content-Type: application/json"  --data @file-source-connector-test.json   http://192.112.8.4:8083/connectors

4、列出连接器

  curl -X GET http://192.112.8.4:8083/connectors

5、查看连接器状态

  curl -X GET http://192.112.8.4:8083/connectors/file-source-connector/status

6、删除连接器

  curl -X DELETE http://192.112.8.4:8083/connectors/file-source-connector
标签: 自动化 kafka linq

本文转载自: https://blog.csdn.net/pamela394829274/article/details/143734542
版权归原作者 逆风而行ゝ 所有, 如有侵权,请联系我们删除。

“kafka3.8+zookeeper3.9集群自动化部署、sasl+acl配置、kafka connect配置部署”的评论:

还没有评论