ufw disabled #关闭防火墙 或者 放开指定端口
vim /etc/hosts #配置ip host映射关系
10.3.1.96 node1
10.3.1.97 node2
#1.所有机器安装jdk
apt install openjdk-8-jdk -y
java -version #export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_202
#2.部署zookeeper集群
cd /usr/local
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz
mv apache-zookeeper-3.8.4-bin zookeeper-3.8.4
cd /usr/local/zookeeper-3.8.4/conf && cp zoo_sample.cfg zoo.cfg
mkdir -p /usr/local/zookeeper-3.8.4/logs
mkdir -p /usr/local/zookeeper-3.8.4/data
vim zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-3.8.4/data
dataLogDir=/usr/local/zookeeper-3.8.4/logs
clientPort=2181
#客户端访问zk的端口
server.1=node1:2888:3888
server.2=node2:2888:3888
#说明:2888为组成zookeeper服务器之间的通信端口,3888为用来选举leader的端口,server后面的数字与后面的myid相对应
#注意前后不要有空格 否则报错Invalid config, exiting abnormally 可以通过:set list显示隐藏字符来处理
vim /etc/profile
export ZOOKEEPER_HOME=/usr/local/zookeeper-3.8.4
export PATH=$ZOOKEEPER_HOME/bin:$PATH
source /etc/profile
#node1节点执行
cd /usr/local/zookeeper-3.8.4/data && echo "1" > myid
#node2节点执行
cd /usr/local/zookeeper-3.8.4/data && echo "2" > myid
#所有机器启动zk
cd /usr/local/zookeeper-3.8.4/bin
./zkServer.sh start
./zkServer.sh status
#3.部署kafka 并修改配置文件(不同节点参数需修改)
cd /usr/local
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz
tar -zxvf kafka_2.12-3.7.0.tgz
mkdir -p /usr/local/kafka_2.12-3.7.0/logs
#node1进行如下操作。 node2同理:只是把id改为2 node1改为node2即可
vim /usr/local/kafka_2.12-3.7.0/config/server.properties
#id不重复 修改下面三行
broker.id=1
listeners=PLAINTEXT://node1:9092
advertised.listeners=PLAINTEXT://node1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
#建议开启 新增此行
delete.topic.enable=true
#修改此行
log.dirs=/usr/local/kafka_2.12-3.7.0/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#修改此行
zookeeper.connect=node1:2181,node2:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
vim /usr/local/kafka_2.12-3.7.0/config/consumer.properties
bootstrap.servers=node1:9092
group.id=test-consumer-group
vim /usr/local/kafka_2.12-3.7.0/config/producer.properties
bootstrap.servers=node1:9092
compression.type=none
#所有机器启动
cd /usr/local/kafka_2.12-3.7.0/bin
./kafka-server-start.sh -daemon ../config/server.properties
#配置systemctl 开机自启动 zookeeper和 kafka
vim /lib/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=forking
User=root
Group=root
ExecStart=/usr/local/zookeeper-3.8.4/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper-3.8.4/bin/zkServer.sh stop
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
vim /lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service
[Service]
Type=forking
User=root
Group=root
Environment="JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64"
ExecStart=/usr/local/kafka_2.12-3.7.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.7.0/config/server.properties
ExecStop=/usr/local/kafka_2.12-3.7.0/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
#which java; #然后ls -l 直到没有软链则为java安装路径
systemctl daemon-reload #刷新配置
systemctl start zookeeper.service
systemctl enable zookeeper.service
systemctl start kafka.service
systemctl enable kafka.service
注意:systemctl启动若不成功,可以修改Type 然后再尝试
命令行测试
#测试创建topic
./kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 3 --partitions 3 --topic test #老版本 --zookeeper node1:2181
./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic test
./kafka-topics.sh -list --bootstrap-server node1:9092
./kafka-console-producer.sh --broker-list node1:9092 --topic test
./kafka-console-consumer.sh --bootstrap-server node:9092 --topic test --from-beginning
#旧版Kafka,用的是zookeeper地址而非bootstrap.servers
#主要有两个目的/动机:一是优化元数据管理,原来的zk方案,极端情况下可能会造成数据不一致;二是简化部署和配置。
#bootstrap.servers只是用于客户端启动(bootstrap)的时候有一个可以热启动的一个连接者,一旦启动完毕客户端就应该可以得知当前集群的所有节点的信息,日后集群扩展的时候客户端也能够自动实时的得到新节点的信息,即使bootstrap.servers里面的挂掉了也应该是能正常运行的,除非节点挂掉后客户端也重启了
#服务器是Kafka,生产者(发送数据的)和消费者(接收数据的)是客户端
#删除topic
#1.kafka启动之前,在server.properties配置delete.topic.enable=true
#2.执行命令bin/kafka-topics.sh --delete --topic test --zookeeper zk:2181或者使用kafka-manager集群管理工具删除
#注意:如果没有设置 delete.topic.enable=true,则调用kafka 的delete命令无法真正将topic删除,而是显示(marked for deletion)
版权归原作者 李庆政370 所有, 如有侵权,请联系我们删除。