0


zookeeper

zookeeper是什么?

分部式系统管理框架,主要来解决分布式应用集群中应用系统的一致性问题:相当于各种分布式应用的 注册中心+文件系统+通知机制

用于注册各种分布式应用,存储和管理这些分布式应用的元数据,如果应用或服务本身状态发送变化就会通知客户端

zookeeper选举机制

第一次leader选举:

比较服务器节点的myid,谁的myid最大就获取其他节点的选票,当选票超过服务器节点数量的半数则当选leader,其他节点为follower,即使以后再有其他myid更大的节点加入集群也不会影响之前的选举结果。

非第一次leader选举:

  • 如果是非leader故障,直接替换新的节点,继续做follower,与现存的leader节点建立连接并同步数据,
  • 如果是leader节点故障,则需要重新选举新的leader,先比较每个存活节点的epoch(参与选举的次数),如有最大的则直接当选leader- 若epoch有相同的节点,再比较zxid(写操作的事务id),如有最大的节点则直接当选leader- 若zxid也有相同的节点,继续比较sid(等同于myid),由最大的节点当选leader

中间件

实现应用解耦、异步处理

web应用型的中间件:

nginx、haproxy、tomcat、php

消息队列型(MQ):

reids、kafka、rabbitMQ、rocketMQ、activeMQ

消息队列的好处:

应用解藕、异步处理、数据缓冲、流量削峰、可恢复等

消息队列的模式:

点对点模式:一对一,一个消息只能由一个

部署 Zookeeper 集群

//准备 3 台服务器做 Zookeeper 集群 192.168.116.22 192.168.116.23 192.168.116.24

1.安装前准备
  1. //关闭防火墙
  2. systemctl stop firewalld
  3. systemctl disable firewalld
  4. setenforce 0
  5. //安装 JDK
  6. yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
  7. java -version
2.安装zookeeper
  1. cd /opt
  2. 上传安装包
  3. 解压
  4. tar xf apache-zookeeper-3.6.4-bin.tar.gz
  5. 移动
  6. mv apache-zookeeper-3.6.4-bin /usr/local/zookeeper-3.6.4
  1. //修改配置文件
  2. cd /usr/local/zookeeper-3.6.4/conf
  3. 备份
  4. cp zoo_sample.cfg zoo.cfg
  5. vim zoo.cfg
  6. tickTime=2000
  7. #通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
  8. initLimit=10
  9. #Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
  10. syncLimit=5
  11. #Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
  12. dataDir=/usr/local/zookeeper-3.6.4/data
  13. ●修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
  14. dataLogDir=/usr/local/zookeeper-3.6.4/logs
  15. ●添加,指定存放日志的目录,目录需要单独创建
  16. clientPort=2181
  17. #客户端连接端口
  18. #添加集群信息
  19. server.1=192.168.116.22:3188:3288
  20. server.2=192.168.116.23:3188:3288
  21. server.3=192.168.116.24:3188:3288
  22. #server.A=B:C:D
  23. A是一个数字,表示这个是第几号服务器。集群模式下需要在zoo.cfgdataDir指定的目录下创建一个文件myid,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server
  24. B是这个服务器的地址。
  25. C是这个服务器Follower与集群中的Leader服务器交换信息的端口。
  26. D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
  27. cd /usr/local/zookeeper-3.6.4
  28. mkdir data logs
  29. //在每个节点的dataDir指定的目录下创建一个 myid 的文件
  30. echo 1 > /usr/local/zookeeper-3.6.4/data/myid
  31. echo 2 > /usr/local/zookeeper-3.6.4/data/myid
  32. echo 3 > /usr/local/zookeeper-3.6.4/data/myid
3.配置zookeeper启动脚本
  1. vim /etc/init.d/zookeeper
  2. #!/bin/bash
  3. #chkconfig: 2345 20 90
  4. #description:Zookeeper Service Control Script
  5. ZK_HOME='/usr/local/zookeeper-3.6.4'
  6. case $1 in
  7. start)
  8. echo "---------- zookeeper 启动 ------------"
  9. $ZK_HOME/bin/zkServer.sh start
  10. ;;
  11. stop)
  12. echo "---------- zookeeper 停止 ------------"
  13. $ZK_HOME/bin/zkServer.sh stop
  14. ;;
  15. restart)
  16. echo "---------- zookeeper 重启 ------------"
  17. $ZK_HOME/bin/zkServer.sh restart
  18. ;;
  19. status)
  20. echo "---------- zookeeper 状态 ------------"
  21. $ZK_HOME/bin/zkServer.sh status
  22. ;;
  23. *)
  24. echo "Usage: $0 {start|stop|restart|status}"
  25. esac
  26. // 设置开机自启
  27. chmod +x /etc/init.d/zookeeper
  28. chkconfig --add zookeeper
  29. //分别启动 Zookeeper
  30. service zookeeper start
  31. //查看当前状态
  32. service zookeeper status

部署kafka

1.安装kafka
  1. 安装 Kafka
  2. cd /opt/
  3. tar zxvf kafka_2.13-2.7.1.tgz
  4. mv kafka_2.13-2.7.1 /usr/local/kafka
​2.修改配置文件
  1. cd /usr/local/kafka/config/
  2. cp server.properties{,.bak}
  3. vim server.properties
  4. broker.id=0
  5. #21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2
  6. listeners=PLAINTEXT://192.168.116.22:9092
  7. #31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
  8. num.network.threads=3
  9. #42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
  10. num.io.threads=8
  11. #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
  12. socket.send.buffer.bytes=102400
  13. #48行,发送套接字的缓冲区大小
  14. socket.receive.buffer.bytes=102400
  15. #51行,接收套接字的缓冲区大小
  16. socket.request.max.bytes=104857600
  17. #54行,请求套接字的缓冲区大小
  18. log.dirs=/usr/local/kafka/logs
  19. #60行,kafka运行日志存放的路径,也是数据存放的路径
  20. num.partitions=1
  21. #65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
  22. num.recovery.threads.per.data.dir=1
  23. #69行,用来恢复和清理data下数据的线程数量
  24. log.retention.hours=168
  25. #103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除
  26. log.segment.bytes=1073741824
  27. #110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件
  28. zookeeper.connect=192.168.116.22:2181,192.168.116.23:2181,192.168.116.24:2181
  29. #123行,配置连接Zookeeper集群地址
3.修改环境变量
  1. vim /etc/profile
  2. export KAFKA_HOME=/usr/local/kafka
  3. export PATH=$PATH:$KAFKA_HOME/bin
4.配置 Zookeeper 启动脚本
  1. vim /etc/init.d/kafka
  2. #!/bin/bash
  3. #chkconfig:2345 22 88
  4. #description:Kafka Service Control Script
  5. KAFKA_HOME='/usr/local/kafka'
  6. case $1 in
  7. start)
  8. echo "---------- Kafka 启动 ------------"
  9. ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
  10. ;;
  11. stop)
  12. echo "---------- Kafka 停止 ------------"
  13. ${KAFKA_HOME}/bin/kafka-server-stop.sh
  14. ;;
  15. restart)
  16. $0 stop
  17. $0 start
  18. ;;
  19. status)
  20. echo "---------- Kafka 状态 ------------"
  21. count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
  22. if [ "$count" -eq 0 ];then
  23. echo "kafka is not running"
  24. else
  25. echo "kafka is running"
  26. fi
  27. ;;
  28. *)
  29. echo "Usage: $0 {start|stop|restart|status}"
  30. esac
5.设置开机自启
  1. chmod +x /etc/init.d/kafka
  2. chkconfig --add kafka
  3. 分别启动 Kafka
  4. service kafka start

kafka架构组件

broker

kafka服务器节点

producer

生产者,发布消息到topic

consumer

消费者

consumer group

消息者组,是消息的实际订阅者,一个消费者组包含一个或多个消费者,且组内成员不能重复消费同一个partition数据

  • producer- topic消息队列- 一个或多个partition分区- 一个或多个replica副本(leader负责数据读写,follower只负责同步复制leader的数据)
  • consumer- offset偏移量(用来记录消费者上一次消费到的位置)

zookeeper

存储和管理fafka集群的元数据,生产者和消费者的动作都需要zk的管理和支持。

比如生产者推送数据到kafka需要先通过zk寻找kafka集群节点的位置,消费者可以从zk获取offset记录的上一次消费的位置再继续往后消费。

kafka管理操作

  1. 创建topic
  2. kafka-topics.sh --zookeeper IP1:2181,IP2:2181,IP3:2181 --create --topic 队列名 --partitions 分区数 --replication-factor 副本数
  3. 查看topic列表
  4. kafka-topics.sh --zookeeper IP1:2181,IP2:2181,IP3:2181 --list
  5. 查看topic详细信息
  6. kafka-topics.sh --zookeeper IP1:2181,IP2:2181,IP3:2181 --discribe --topic 队列名
  7. 修改topic分区配置
  8. kafka-topics.sh --zookeeper IP1:2181,IP2:2181,IP3:2181 --alter --topic 队列名 --partitions 分区数(只能增不能减)
  9. 删除topic
  10. kafka-topics.sh --zookeeper IP1:2181,IP2:2181,IP3:2181 --delete --topic 队列名
  11. topic推送消息
  12. kafka-console-producer.sh --broker-list IP1:9092,IP2:9092,IP3:9092 --topic 队列名
  13. topic拉取消息
  14. kafka-console-consumer.sh --bootstrap-server IP1:9092,IP2:9092,IP3:9092 --topic 队列名 [--from-beginning]

kafka通过 ack 应答机制来保证数据的可靠性

在 producer.properties 配置文件中设置 request.required.ack 参数 acks 参数设置 0,代表生产者不等kafka确认就会发送下一条消息。此机制性能最高,可靠性最低,当leader的broker故障可能会丢失数据 1,代表生产者会等leader接收到数据并确认后才会发送下一条数据、此机制性能其次,可靠性一般,当follower同步完成前leader故障可能会导致数据丢失 -1|all,代表生产者需要等所有的follower都同步完成并确认后才会发送下一条数据。此机制性能最低,可靠性最高

面试题

问你们公司用的kafka是如何部署的?

先说明kafka版本

如果是2.X版本,要先部署3|5节点的zk集群,然后再在每个zk节点上部署kafka应用。

如果是3.X版本,kafka不再依赖zk,所以可以直接部署3|5节点的kafka集群。


本文转载自: https://blog.csdn.net/HanYuHao11/article/details/135762621
版权归原作者 沈阳最速传说と疾走の猛虎!贵物刀一郎です 所有, 如有侵权,请联系我们删除。

“zookeeper”的评论:

还没有评论