1.kafka&zookeeper集群模式部署
1.1.kafka集群管理说明
1.kafka的主题会有多个分区,每个分区有多个副本,副本中有⼀个被选为领导者(选举是Kafka内部的机制,会
考虑副本的数据同步进度.健康状态等因素),负责处理所有的读写请求。
2.消费者组Rebalance机制是重分配订阅的topic的每个分区
3.消费者Rebalance触发时机
a.消费者组中consumer的个数发⽣变化(有新的consumer加入到消费者组,或者是某个consumer停⽌了)
b.订阅的topic的个数发⽣变化(消费者可以订阅多个主题,假设当前的消费者组订阅了三个主题,但有⼀个主
题突然被删除)
c.订阅的topic分区数发⽣变化(新的Partition加入topic)
4.⽣产者的分区写入策略:轮询策略(默认的策略),⽣产者在发送消息时,会按照分区的顺序(通常是根据
分区的顺序编号)依次将消息发送到各个分区。比如,如果有三个分区,那么消息1会被发送到分区0,消息2
会被发送到分区1,消息3会被发送到分区2,然后循环重复这个过程
1.2.环境介绍
kafka&zookeeper部署在kafka01,kafka02,kafka02服务器上,每台服务器部署一个节点,采取三个节点集群模式。 本部署适合生产部署,客户端使用需要配置本地/etc/hosts。
服务器名称主机ip地址kafka01192.112.8.4kafka02192.112.8.5kafka03192.112.8.6
部署架构图如下
1.3.创建目录并上传文件
root用户在三台服务器创建安装包目录
mkdir -p /test/kafka_soft
只要将以下文件上传到/test/kafka_soft目录即可
由于传输文件只能用test用户,先用test用户在test目录下新建tempfile文件夹,再用test用户
将
将kafka_2.13-3.8.0.tgz,apache-zookeeper-3.9.2-bin.tar.gz,kafka_install.sh和zk_install.sh分别上传到三台服务器的
/test/tempfile目录,再移动到/test/kafka_soft目录
zk_install.sh内容为
#!/bin/bash
ZK_HOME="/test/kafka_soft"
ZK_DATA_DIR="/testlog/zookeeper/data"
ZK_LOG_DIR="/testlog/zookeeper/logs"
SERVERS=("kafka01" "kafka02" "kafka03")
SERVER_ID=$1
ip1="192.112.8.4"
ip2="192.112.8.5"
ip3="192.112.8.6"
HOSTNAME1="kafka01"
HOSTNAME2="kafka02"
HOSTNAME3="kafka03"
HOSTS_FILE="/etc/hosts"
if grep -q "$ip1 $HOSTNAME1" "$HOSTS_FILE"; then
echo "IP $ip1 already exists in $HOSTS_FILE."
else
echo "$ip1 $HOSTNAME1" >> "$HOSTS_FILE"
echo "Added $ip1 to $HOSTS_FILE."
fi
if grep -q "$ip2 $HOSTNAME2" "$HOSTS_FILE"; then
echo "IP $ip2 already exists in $HOSTS_FILE."
else
echo "$ip2 $HOSTNAME2" >> "$HOSTS_FILE"
echo "Added $ip2 to $HOSTS_FILE."
fi
if grep -q "$ip3 $HOSTNAME3" "$HOSTS_FILE"; then
echo "IP $ip3 already exists in $HOSTS_FILE."
else
echo "$ip3 $HOSTNAME3" >> "$HOSTS_FILE"
echo "Added $ip3 to $HOSTS_FILE."
fi
mkdir -p $ZK_DATA_DIR
mkdir -p $ZK_LOG_DIR
cd $ZK_HOME
tar -xzvf apache-zookeeper-3.8.4-bin.tar.gz
cat > $ZK_HOME/apache-zookeeper-3.8.4-bin/conf/zoo.cfg <<EOF
tickTime=2000
dataDir=$ZK_DATA_DIR
dataLogDir=$ZK_LOG_DIR
clientPort=2181
maxClientCnxns=60
initLimit=10
syncLimit=4
server.1=kafka01:2888:3888
server.2=kafka02:2888:3888
server.3=kafka03:2888:3888
EOF
echo $SERVER_ID > $ZK_DATA_DIR/myid
cd $ZK_HOME/apache-zookeeper-3.8.4-bin/bin
sed -i 's#ZOO_LOG_DIR="$ZOOKEEPER_PREFIX/logs"#ZOO_LOG_DIR="/testlog/zookeeper/logs"#' zkEnv.sh
sed -i 's#ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-1000}"#ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-8192}"#' zkEnv.sh
sed -i 's#ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-256}"#ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-4096}"#' zkEnv.sh
chown -R kafka:kafka /test/kafka_soft
chmod -R 755 /test/kafka_soft
chown -R kafka:kafka /testlog/zookeeper
chmod -R 755 /testlog/zookeeper
#su -c " $ZK_HOME/apache-zookeeper-3.8.4-bin/bin/zkServer.sh start" -s /bin/bash kafka
echo "ZooKeeper配置和启动已完成。"
kafka_install.sh内容为
#!/bin/bash
kafka_HOME="/test/kafka_soft"
kafka_DATA_DIR="/testlog/kafka/data"
kafka_LOG_DIR="/testlog/kafka/logs"
SERVERS=("kafka01" "kafka02" "kafka03")
SERVER_ID=$1
HOSTNAME=$2
mkdir -p $kafka_DATA_DIR
mkdir -p $kafka_LOG_DIR
cd $kafka_HOME
tar -xzvf kafka_2.13-3.8.0.tgz
cd $kafka_HOME/kafka_2.13-3.8.0/config
mv server.properties server_back.properties
cat > $kafka_HOME/kafka_2.13-3.8.0/config/server.properties <<EOF
broker.id=$SERVER_ID
listeners=SASL_PLAINTEXT://$HOSTNAME:9092
advertised.listeners=SASL_PLAINTEXT://$HOSTNAME:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
log.dirs=/testlog/kafka/logs
max.poll.interval.ms=600000
session.timeout.ms=120000
fetch.min.bytes=2048576
num.partitions=3
log.retention.hours=2400
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181
EOF
cat > $kafka_HOME/kafka_2.13-3.8.0/config/kafka_server_jaas.conf <<EOF
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_producer="test"
user_consumer="test"
};
EOF
cat > $kafka_HOME/kafka_2.13-3.8.0/config/kafka_client_jaas.conf <<EOF
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
EOF
cat > $kafka_HOME/kafka_2.13-3.8.0/config/client.properties <<EOF
# client.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
EOF
sed -i 's#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"#export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G -Djava.security.auth.login.config=/test/kafka_soft/kafka_2.13-3.8.0/config/kafka_server_jaas.conf "#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-server-start.sh
sed -i 's#export KAFKA_HEAP_OPTS="-Xmx512M"#export KAFKA_HEAP_OPTS="-Xmx2048M -Djava.security.auth.login.config=/test/kafka_soft/kafka_2.13-3.8.0/config/kafka_client_jaas.conf"#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-console-producer.sh
sed -i 's#export KAFKA_HEAP_OPTS="-Xmx512M"#export KAFKA_HEAP_OPTS="-Xmx2048M -Djava.security.auth.login.config=/test/kafka_soft/kafka_2.13-3.8.0/config/kafka_client_jaas.conf"#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-console-consumer.sh
sed -i 's#LOG_DIR="$base_dir/logs"#LOG_DIR="/testlog/kafka/logs"#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-run-class.sh
sed -i 's#KAFKA_HEAP_OPTS="-Xmx256M"#KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-run-class.sh
chown -R kafka:kafka /test/kafka_soft
chmod -R 755 /test/kafka_soft
chown -R kafka:kafka /testlog/kafka
chmod -R 755 /testlog/kafka
echo "kafka配置已完成。"
1.4.安装zookeeper
1.4.1.安装zookeeper
root用户在kafka01执行zk_install.sh
cd /test
mv tempfile/* ./kafka_soft
cd /test/kafka_soft
chmod 755 zk_install.sh
./zk_install.sh 1
root用户在kafka02执行zk_install.sh
cd /test
mv tempfile/* ./kafka_soft
cd /test/kafka_soft
chmod 755 zk_install.sh
./zk_install.sh 2
root用户在kafka02执行zk_install.sh
cd /test
mv tempfile/* ./kafka_soft
cd /test/kafka_soft
chmod 755 zk_install.sh
./zk_install.sh 3
1.4.2.启动和验证zookeeper
三台服务器均执行启动命令
su test
/test/kafka_soft/apache-zookeeper-3.9.2-bin/bin/zkServer.sh start
三台服务器均执行查看节点状态命令
su test
/test/kafka_soft/apache-zookeeper-3.9.2-bin/bin/zkServer.sh status
如下图,一个节点是leader,另外两个节点是follower
1.5.安装kafka
1.5.1.安装kafka(jdk需要自行安装)
root用户在kafka01执行kafka_install.sh
cd /test/kafka_soft
chmod 755 kafka_install.sh
./kafka_install.sh 1 kafka01
root用户在kafka02执行kafka_install.sh
cd /test/kafka_soft
chmod 755 kafka_install.sh
./kafka_install.sh 2 kafka02
root用户在kafka02执行kafka_install.sh
cd /test/kafka_soft
chmod 755 kafka_install.sh
./kafka_install.sh 3 kafka02
1.5.2.启动kafka
三台服务器均执行启动命令
su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-server-start.sh -daemon ../config/server.properties
1.5.3.验证kafka集群(如果连接的ssh是test用户则无需执行su test)
ssh登录任意一台kafka服务器,以任意一台服务创建topic,执行
su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-topics.sh --create --bootstrap-server kafka02:9092 --replication-factor 3 --partitions 3 --topic test1 --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties
ssh登录任意一台kafka服务器,以任意一台为服务查看topic,执行
su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-topics.sh --list --bootstrap-server kafka01:9092 --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties
ssh登录任意一台kafka服务器,以任意一台服务作为producer发送消息,执行
su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-console-producer.sh --bootstrap-server kafka01:9092 --topic test1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
ssh登录任意一台kafka服务器,以任意一台服务作为consumer接受消息,执行
su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-console-consumer.sh --bootstrap-server kafka02:9092 --topic test1 --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
在producer端发送消息,consumer可以正常接受消息,验证通过
测试完之后删掉测试数据
su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-topics.sh --bootstrap-server kafka02:9092 --delete --topic test1 --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties
1.6.kafka开启acl
1.6.1.修改配置
修改配置文件/test/kafka_soft/kafka_2.13-3.8.0/config/server.properties
将下列配置新增到配置文件中,所有节点均需执行
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin
重启各个节点
su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-server-stop.sh
./kafka-server-start.sh -daemon ../config/server.properties
在添加acl规则前所有用户都有创建主题,查看主题,生产消息和消费消息权限
在添acl规则后只有admin用户拥有所有权限,普通用户没有任何权限
1.6.2.权限验证
在添加规则前producer用户无法生产和消费消息
su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-console-producer.sh --bootstrap-server kafka03:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
./kafka-console-consumer.sh --bootstrap-server kafka02:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
给producer用户添加主题(test )的acl可写规则
su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-acls.sh --authorizer-properties zookeeper.connect=kafka02:2181 --add --allow-principal User:producer --operation Write --topic test
给producer用户添加主题(test )的acl可读规则
./kafka-acls.sh --authorizer-properties zookeeper.connect=kafka02:2181 --add --allow-principal User:producer --operation Read --topic test --group '*'
查看主题(test )acl规则
./kafka-acls.sh --authorizer-properties zookeeper.connect=kafka02:2181 --list --topic test
添加acl规则后主题test 生产和消费消息
./kafka-console-producer.sh --bootstrap-server kafka02:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
./kafka-console-consumer.sh --bootstrap-server kafka02:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
移除acl规则
./kafka-acls.sh --authorizer-properties zookeeper.connect=kafka02:2181 --remove --allow-principal User:alice --operation Write --topic test
1.7.kafka其他命令
创建自定义保留主题的消息的时间的,retention.ms是毫秒
su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-topics.sh --create --bootstrap-server kafka02:9092 --replication-factor 3 --partitions 3 --topic test-half -config retention.ms=1800000 --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties
查看主题信息
./kafka-configs.sh --describe --entity-type topics --entity-name test-half --bootstrap-server kafka02:9092 -command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties
修改主题保留消息时间
./kafka-configs.sh --alter --entity-type topics --entity-name test-half --add-config retention.ms=1800000 --bootstrap-server kafka02:9092 --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties
1.8.安装kafka connect
默认全部用test用户
1.8.1.单点配置
su test
mkdir -p /test/kafka_soft/plugins/Fileconnector
touch /test/kafka_soft/test.txt
touch /test/kafka_soft/test.sink.txt
cp /test/kafka_soft/kafka_2.13-3.8.0/libs/connect-file-3.8.0.jar /test/kafka_soft/plugins/Fileconnector/
1、在kafka01上修改connect-standalone.properties配置
修改/test/kafka_soft/kafka_2.13-3.8.0/config/connect-standalone.properties
将
bootstrap.servers=localhost:9092
改为
bootstrap.servers=kafka01:9092
并在/test/kafka_soft/kafka_2.13-3.8.0/config/connect-standalone.properties文件下面追加
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpassword";
producer.bootstrap.servers=kafka01:9092
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpassword";
consumer.bootstrap.servers=kafka01:9092
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpassword";
offset.flush.interval.ms=10000
listenters=HTTP://:8083
plugin.path=/test/kafka_soft/plugins
2、在kafka01上修改connect-file-source.properties配置
修改/test/kafka_soft/kafka_2.13-3.8.0/config/connect-file-source.properties
将
file=test.txt
topic=connect-test
改为
file=/test/kafka_soft/test.txt
topic=test
3、在kafka01上修改connect-file-sink.properties配置
修改/test/kafka_soft/kafka_2.13-3.8.0/config/connect-file-sink.properties
将
file=test.sink.txt
topics=test
改为
file=/test/kafka_soft/test.sink.txt
topics=test
4、单点启动kafka connect
su test
cd /test/kafka_soft/kafka_2.13-3.8.0
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
5、验证
若验证用户是非admin用户需要配置读写权限
新开一个ssh窗口,三台服务器均可
su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./kafka-console-consumer.sh --bootstrap-server kafka02:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
新开kafka01的ssh窗口
cd /test/kafka_soft
echo "20241030-01" >>test.txt
echo "20241030-02" >>test.txt
查看topic是否能消费消息
查看test.sink.txt是否有消息进入
cd /test/kafka_soft
cat test.sink.txt
1.8.2.集群配置
1、修改配置,三个节点均需执行
su test
mkdir -p /test/kafka_soft/plugins/Fileconnector
touch /test/kafka_soft/test.txt
touch /test/kafka_soft/test.sink.txt
cp /test/kafka_soft/kafka_2.13-3.8.0/libs/connect-file-3.8.0.jar /test/kafka_soft/plugins/Fileconnector/
cd /test/kafka_soft/kafka_2.13-3.8.0/config
mv connect-distributed.properties connect-distributed_20241030.properties
cat > /test/kafka_soft/kafka_2.13-3.8.0/config/connect-distributed.properties <<EOF
bootstrap.servers=192.112.8.4:9092,192.112.8.5:9092,192.112.8.6:9092
group.id=connect-cluster
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpassword";
producer.bootstrap.servers=192.112.8.4:9092,192.112.8.5:9092,192.112.8.6:9092
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpassword";
consumer.bootstrap.servers=192.112.8.4:9092,192.112.8.5:9092,192.112.8.6:9092
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpassword";
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
listeners=HTTP://:8083
plugin.path=/test/kafka_soft/plugins
EOF
2、kafka connect集群启动
三个节点均需执行
su test
cd /test/kafka_soft/kafka_2.13-3.8.0/bin
./connect-distributed.sh -daemon ../config/connect-distributed.properties
3、验证
curl http://192.112.8.4:8083
curl http://192.112.8.5:8083
curl http://192.112.8.6:8083
4、生成连接器
cat > /test/kafka_soft/file-source-connector-test.json <<EOF
{
"name": "file-source-connector",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max": "1",
"file": "/test/kafka_soft/xiaoli.txt",
"topic": "test "
}
}
EOF
cd /test/kafka_soft
curl -X POST -H "Content-Type: application/json" --data @file-source-connector-test.json http://192.112.8.4:8083/connectors
4、列出连接器
curl -X GET http://192.112.8.4:8083/connectors
5、查看连接器状态
curl -X GET http://192.112.8.4:8083/connectors/file-source-connector/status
6、删除连接器
curl -X DELETE http://192.112.8.4:8083/connectors/file-source-connector
版权归原作者 逆风而行ゝ 所有, 如有侵权,请联系我们删除。