(1)Zookeeper安装
https://downloads.apache.org/zookeeper/
(1)更新系统的包管理器
sudo yum update
(2)安装JDK
sudo yum install java-1.8.0-openjdk-devel
(3)下载ZooKeeper
cd /usr/local/
wget https://downloads.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
(4)解压ZooKeeper
tar -xvf apache-zookeeper-3.7.1-bin.tar.gz
(5)重命名为”zookeeper”
mv apache-zookeeper-3.7.1-bin zookeeper
(6)创建ZooKeeper数据目录
mkdir /usr/local/zookeeper/data
mkdir /usr/local/zookeeper/logs
(7)创建ZooKeeper配置文件:
ZooKeeper的滴答时间(以毫秒为单位)、ZooKeeper存储数据的数据目录、ZooKeeper监听的客户端端口
vim /usr/local/zookeeper/conf/zoo.cfg
tickTime=2000
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181
(8)启动ZooKeeper
权限不足解决方案:su root、chmod a+xwr zkServer.sh
/usr/local/zookeeper/bin/zkServer.sh start
(9)使用如下命令检查ZooKeeper是否正在运行:
/usr/local/zookeeper/bin/zkServer.sh status
(2)Kafka安装
权限不足解决方案:
chmod a+xwr kafka-topics.sh
chmod a+xwr kafka-console-producer.sh
chmod a+xwr kafka-console-consumer.sh
chmod a+xwr kafka-consumer-groups.sh
(1)安装Kafka
cd /usr/local/
tar -zxvf /usr/local/kafka_2.11-2.4.0.tgz
mv kafka_2.11-2.4.0 kafka
(2)配置Kafka
vim /usr/local/kafka/config/server.properties
broker.id=0
listeners=PLAINTEXT://192.168.19.131:9092
log.dirs=/usr/local/kafka/data/kafka-logs
zookeeper.connect=192.168.19.131:2181
(3)启动Kafka
cd /usr/local/kafka/bin
./kafka-server-start.sh -daemon ../config/server.properties
ps -aux | grep server.properties
(4)使用Kafka
创建主题
./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 1 --partitions 1 --topic test
查看主题
./kafka-topics.sh --list --zookeeper 192.168.19.131:2181
发送消息
./kafka-console-producer.sh --broker-list 192.168.19.131:9092 --topic test
接收消息方式一:从最后一条消息的偏移量+1开始消费
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --topic test
接收消息方式二:从头开始消费
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --from-beginning --topic test
(5)单播消息:一个消费组只有一个消费者能消费
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test
(6)多播消息:不同的消费者处于不同的消费组
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group2 --topic test
(7)查看消费组
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --list
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --describe --group group1
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --describe --group group2
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
group1 test 0 22 22 0 consumer-group1
Current-offset:当前消费组已经消费的偏移量
Log-end-offset:主题对应分区消息的结束偏移量(HW)
Lag:当前消费组未消费的消息数
(8)主题分区
./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 1 --partitions 2 --topic topic1
./kafka-topics.sh --describe --zookeeper 192.168.19.131:2181 --topic topic1
cd /usr/local/kafka/data/kafka-logs/topic1-0
cd /usr/local/kafka/data/kafka-logs/topic1-1
说明:定期将自己消费分区的offset提交给kafka内部topic、key是consumerGroupId+topic+分区、value是当前offset值
说明:kafka会定期清理topic里的消息、默认保存7天、7天后消息会被删除
说明:通过此公式可以选出consumer消费的offset要提交到哪个分区:hash(consumerGroupId)%__consumer_offsets主题分区数
__consumer_offsets-0
__consumer_offsets-49
(3)Kafka集群
(1)Kafka集群、3个broker
3个server.properties
vim server0.properties
broker.id=0
listeners=PLAINTEXT://192.168.19.131:9092
log.dirs=/usr/local/kafka/data/kafka-logs-0
vim server1.properties
broker.id=1
listeners=PLAINTEXT://192.168.19.131:9093
log.dirs=/usr/local/kafka/data/kafka-logs-1
vim server2.properties
broker.id=2
listeners=PLAINTEXT://192.168.19.131:9094
log.dirs=/usr/local/kafka/data/kafka-logs-2
./kafka-server-start.sh -daemon ../config/server0.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
(2)副本:1个主题、2个分区、3个副本
./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 3 --partitions 2 --topic topic2
./kafka-topics.sh --describe --zookeeper 192.168.19.131:2181 --topic topic2
Topic: topic2 PartitionCount: 2 ReplicationFactor: 3 Configs:
Topic: topic2 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: topic2 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Leader:
写和读的操作都在Leader上、Leader负责把数据同步到follower、当leader挂了、经过主从选举、从多个follower中选举产生一个新Leader
Follower:
接收leader的同步的数据
Isr:
可以同步的broker节点和已同步的broker节点、存放在isr集合中、如果isr节点中的性能较差、会被踢出isr集合
总结:broker、主题、分区、副本
[root@web-server data]# ls
kafka-logs kafka-logs-0 kafka-logs-1 kafka-logs-2
[root@web-server kafka-logs-1]# ls
cleaner-offset-checkpoint log-start-offset-checkpoint meta.properties recovery-point-offset-checkpoint replication-offset-checkpoint topic2-0 topic2-1
[root@web-server kafka-logs-2]# ls
cleaner-offset-checkpoint log-start-offset-checkpoint meta.properties recovery-point-offset-checkpoint replication-offset-checkpoint topic2-0 topic2-1
(3)Kafka集群消息的发送
./kafka-console-producer.sh --broker-list 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --topic topic2
(4)Kafka集群消息的发送
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --consumer-property group.id=group1 --from-beginning --topic topic2
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --consumer-property group.id=group2 --from-beginning --topic topic2
难点:一个Partition只能被一个组中的一个Consumer消费、一个Consumer可以消费多个Partition。
注意:Kafka只在Partition分区的范围内保证消息消费的局部顺序性、不能在同一个topic主题中的多个Partition中保证总的消费顺序性。
(4)Kafka-eagle监控
(1)安装JDK
yum install java-1.8.0-openjdk-devel
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64
java -version
(2)解压
tar -zxvf kafka-eagle-bin-3.0.1.tar.gz
tar -zxvf efak-web-3.0.1-bin.tar.gz
mv efak-web-3.0.1 efak-web
mv efak-web ../
cd /usr/local/efak-web
(3)配置环境变量
vim /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64
export KE_HOME=/usr/local/efak-web
export PATH=$PATH:$JAVA_HOME/bin:$KE_HOME/bin
source /etc/profile
(4)kafka-eagle内部配置问
vim /usr/local/efak-web/conf/system-config.properties
efak.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.19.131:2181
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://192.168.3.53:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=root
(5)启动
./ke.sh start
admin/123456
windows版本启动问题解决方案:
例如:如果原来希望输入的命令为:
C:\Program Files\Java\jdk-11.0.12\bin\java.exe -jar xxx.jar
1
现在应改为:
"C:\Program Files\Java\jdk-11.0.12\bin\java.exe" -jar xxx.jar
(5)案例:zookeeper+kafka
一、安装docker
1、Docker 要求 CentOS 系统的内核版本高于 3.10 ,查看本页面的前提条件来验证你的CentOS 版本是否支持 Docker 。
通过 uname -r 命令查看你当前的内核版本
$ uname -r
2、使用 root 权限登录 Centos。确保 yum 包更新到最新。
$ sudo yum update
3、卸载旧版本(如果安装过旧版本的话)
$ sudo yum remove docker docker-common docker-selinux docker-engine
4、安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的
$ sudo yum install -y yum-utils device-mapper-persistent-data lvm2
5、设置yum源
$ sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
6、可以查看所有仓库中所有docker版本,并选择特定版本安装
$ yum list docker-ce --showduplicates | sort -r
7、安装docker
#$ sudo yum install docker-ce #没有版本默认安装最新版本、由于repo中默认只开启stable仓库,故这里安装的是最新稳定版17.12.0
$ sudo yum install <FQPN> # 例如:sudo yum install docker-ce-17.12.0.ce
8、启动并加入开机启动
$ sudo systemctl start docker
$ sudo systemctl enable docker
9、验证安装是否成功(有client和service两部分表示docker安装启动都成功了)
$ docker version
##############################################################################################
拉取zookeeker
docker pull wurstmeister/zookeeper
拉取kafka版本为2.12-2.2.0,不填写版本好则安装最新,但是个别系统会报错
docker pull wurstmeister/kafka:2.12-2.2.0
启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
启动kafka
docker run --name kafka01 \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=127.0.0.1:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-d wurstmeister/kafka
进入Kafka容器类kafka01是容器名称,也可以填写成容器ID
docker exec -it kafka01 /bin/bash
创建my_log topic
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.3.191:2181 --replication-factor 1 --partitions 1 --topic my_log
查询创建的主题
/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.3.191:2181
版权归原作者 JAVA灯塔 所有, 如有侵权,请联系我们删除。