- kafka-中的组成员
- kafka四大核心- 生产者API- 允许应用程序发布记录流至一个或者多个kafka的主题(topics)。- 消费者API- 允许应用程序订阅一个或者多个主题,并处理这些主题接收到的记录流- StreamsAPI- 允许应用程序充当流处理器(stream processor),从一个或者多个主题获取输入流,并生产一个输出流到一个或 者多个主题,能够有效的变化输入流为输出流。- ConnectorAPI- 允许构建和运行可重用的生产者或者消费者,能够把kafka主题连接到现有的应用程序或数据系统。例如:一个连 接到关系数据库的连接器可能会获取每个表的变化
1、kafka消息发送流程
在消息发送过程中设计到了两个线程一个main线程和一个Send线程,在main线程中创建了一个双端队列RecordAccumulator,main线程将消息发送给双端队列,send线程不断从双端线程中拉去消息发送到kafka Broker:
外部数据(生产者),发送消息经过main线程其中数据在main中先经过main的拦截器、序列化器、分区器后被推送到双端队列中,双端队列(RecordAccumulator 缓冲区总大小,默认是32m,但是这个值可以调),数据在双端队列中被分为多个双端队列的容器(当双端队列的容器batch.size的数据累计到16k的时候会自动发送到sengder端,还有一种情况就是数据不满足16k的时候,可以调senser等待linger.ms设置时间,时间到了后就会发送数据,单位是ms、默认值是0ms,延迟建设设置在5~100ms之间),当数满足以上条件的时候,sender从双端线程RecordAccumulator去拉取数据,当数到达sender线程的时候,数据会在client端分成多个分区的request(默认是5个request)然后kafka集群会去sender去接受sender推送归来的、每拉取一个数据会给sender一个返回值(ack,ack返回的次数,默认是5次),说明已经接收到数据(特殊情况,当消息队列中的数据推送过去后,kafka没有给sender返回值,这时候为了重发送消息(重试发送的时间间隔默认是100ms),如果设置了重试发送,还需要设置重试次数(默认是 int 最大值2147483647),然后信息就会进去kafka集群)
2、Kafka 的设计架构你知道吗?
kafka的框架主要有以下的几个方面构成:
1、Producer:消息生产者,向kafka客户端 发送消息的
2、Consumer:消息消费者,从卡夫卡客户端拉取消息的
3、Consumer Group(CG):消费者组,由多个消费者组成,消费者组内的每个消费者负责消费不同分区的数据,一个分区只能有一个组内的消费者消费,消费者租之间互不影响,所有消费者组,即消费者组是逻辑上的一个订阅者。
4、Broker:一台kafka服务器就是一个broker,一个集群有多个broker
5、Topic:队列,生产者消费者都面向一个topic
6、Partition:一个topic上可以分为多个分区,每个分区都是一个有序的队列
7、Replica:副本,每一个topic都有很多个副本,一个leader和多个follower
8、Leader:每个分区多个副本的’领导者‘数据发送的对象,和消费者拉取的对象都属leader
9、follower:保持和leader数据同步,备份数据,防止leader崩溃数据丢失
框架:
数据从生产者发送到kafka集群的每个broker上的topic(发送的是每个leader,然后follow会会作为leader的备份数据存在broker上边),然后每个topic都有分区,分为不同的分区,然后经过处理,发送到消费者组里边,分别发送,每个消费者组之间不糊影响
3、Kafka 分区的目的?
1、分区便于合理使用存储资源,每个分区在broker上存储,可以把海量数据按照分区切割成一块一块数据存储在多台broker上,合理控制分区的任务,可以实现负载均衡的效果
2、提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区进行消费消费数据
4、你知道 Kafka 是如何做到消息的有序性?
保证消息的有序性需要依赖以下几个机制和策略:
1、单一分区内的消息有序,每一个topic内只有一个partation分区,因为分区内有序,可以将所有消息发送到一个分区内,在创建一个topic的时候可以将分区数调为1
2、使用幂等性生产者(默认自动开启),幂等性主要是防止生产者重复发送消息,并且发送到一个分区,保证消息的有序性
5、ISR、OSR、AR 是什么?
ISR:可用的、存活的,leader+follower 是存活的broker
OSR:已经停止的broker
AR:是内部选举选举的顺序,例如AR[2,0,1]
三者的关系:
AR=ISR+OSR
6、Kafka 在什么情况下会出现消息丢失
1、生产者发送过来的数据,leadr收到应答后,此时生产者以为数据发送成了但是此时,leader挂掉了,但是follower的数据还没同步完,但是follower变成了leader,这个时候数据就会丢失,因为此时的leader是数据是follower没有同步完的数据,导致消息丢失。(图1)
2、生产者发送过来的数据,leader和isr队列里所有节点收齐数据后应答:
leadr收到数据后所有的follower开始同步数据,但是这时候有一个follower副本挂掉了,然后迟迟不能同步数据,但是leadr也挂断了然后选举了那个坏的follower作为leader导致数据丢失。(图2)
解决方案:
Leader维护了一个动态的in-sync replica set(ISR),意为和 Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)
如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s
图1
图2
7、怎么尽可能保证 Kafka 的可靠性
1、尽可能的防止数据丢失,采用6的方法
2、将ack的级别调成-1“all”,分区副本数要大于等于2
3、ISR里应答的最小副本数大于等于2
解释:
ack:
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;
acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
数据完全可靠性的条件:ack设置为-1+分区副本大于等于2+isr应答副本大于等于2
8、Kafka中如何做到数据唯一,即数据去重?
重复原因:
当ask为-1的时候,可能出现数据重复问题,数据发送给了leader,follower也同步成功了,此时准备应答ask为-1的时候leader挂了,然后follower为leader,然后发送没有收到-1的信号,然后又从新发送新的信息给leader导致数据重复
去重:
Exactly Once:数据唯一
开启幂等性(幂等性,就是producer无论向broker发送多少重复的数据,只能是持久化的一条数据,保证的不重复,幂等性是默认开启的,类似于distinct)
数据唯一:幂等性+ack-1+分区副本数大于等于2+isr最下副本数大于等于2(不停机情况下)
但是幂等性只能保证在服务器不停机的情况下不会出现重复数据,当服务器运行的时候突然停机,就会出现leader重新阅读的情况导致数据丢失,这时候需要开启事务
事务:开启事务的前提要开启幂等性,生产者请求kafka服务器一个唯一id、这个id不能重复(幂等性需要),然后服务器返回这个id、,然后消费者发送信息请求给kafka,然后kafka给生产者一个返回值这时候生产者发送数据给topic,kafka事务协调器进行持久化请求给transacti分区(这个分区默认有50个分区,每个分区负责一部分事务)
当开始幂等性+事务+ack-1+分区副本大于等于2+isr最小分区副本大于等于2 就可以保证数据的唯一性(可能存在停机的情况下,用)
事务:
9、生产者如何提高吞吐量?
在消息发送过程中双端队列中调节batch.size:批次大小,默认16k(可以调大一点),linger.ms:信息等待时间调到(5~100ms),compression.type:压缩snappy (压缩类型有none、gzip、snappy、lz4 和 zstd),RecordAccumulator:缓冲区大小,修改为64m(默认32m)
10、zk在kafka集群中有何作用
zk储存kafka的信息:
主要储存的信息有:
1、broker【012】 服务器信息
2、一个json 记录了谁是leader,哪些服务器可用
3、辅助选举leader
zk中有一个节点 consumers 这个里面,老版本0.9版本之前,存放的是消费者的偏移量(offset,这次消费者消费到哪个地方了,下次从这个地方继续消费),新版本的根本没放在zk中,直接放在集群中了
11、简述kafka集群中的Leader选举机制
当选举leader的时候,broker会被选举为Contorller leader,负责集群管理breaker的上线和下线,和所有topic分区副本的leader的选举
contorllet依赖于zooker,启动集群的时候会在zk中注册记录,然后将节点信息上传到zkisr【】中,谁先注册谁就是leader如果集群的某一个leader挂掉的话,contorllet会监听到变化,然后选举新的leader(在isr中存活为前提,按照ar(kafka分区中的所有副本统称)中的排序优先选)例如ar【1,0,2】 isr【1,0,2】那么选举顺序就是1,0,2轮询查询
12、kafka是如何处理数据乱序问题的。
出现乱序的原因:
1)生产者在发送3请求的时候,发生异常,发生异常需要重新发送,所以排在了后面,在进行落盘的时候,先落盘1,2 ,落盘3的时候发现是4,需要等,等到3出现为止,然后将 3,4 ,5排序,排序后再进行落盘。
顺序错乱了,会自动排序(开启幂等性)。
kafka在1.0之前版本保证分区有序,是采用:max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)
1.0及以后的版本采用,开启幂等性,然后设置max.in.flight.requests.per.connection需要设置小于等于5, 未开启幂等性:max.in.flight.requests.per.connection需要设置为1,
启用幂等性,kafka服务端会缓存producer的5个request的元数据,这都可保证最近5个request的数据是有序的
13、kafka中节点如何服役和退役
服役:1、需要创建以下json,创建一个均衡的主题,然后生成一个负载均衡的计划(kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate)2、执行之前写的json,然后将生成的未来分区策略复制,3、创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中),4、然后执行副本存储计划,5、验证副本存储计划
退役:先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡1、创建json创建一个负载均衡主题2、创建执行计划3、创建副本存储计划4、执行副本5、验证副本存储计划
14、Kafka中Leader挂了,Follower挂了,然后再启动,数据如何同步?
leader:leader发生故障后,会从isr中从新选出一个新的leader,为了保证多个副本之间的一致性,其余的follower会将各自的log文件高于hw的部分截掉,然后从新的leader同步数据(木桶理论),只能保证副本时间的一致性不能保证数据不会重复
LEO:每个副本的最后一个offset(偏移量),leo其实就是最新的offset+1
HW:所有副本中最小的leo
follower故障:follower故障后会被临时的提出isr,这个期间leader和follower会一直的接收数据,但是等到那个被临时踢出的follower恢复后,follower会读取本地磁盘的记录上次的hw,并将文件高于hw的部分你截取掉,然后从新开始从leader读取数据,等到该follower的leo大于等于这个hw的时候,就可以从新进入isr队列
15、kafka中初始化的时候Leader选举有一定的规律,如何打破这个规律呢?
写一个json然后把,指定的leader follower写进去,然后根据这个json创建topic就能,创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中),执行副本存储计划,打破这个leader的选举规律
版权归原作者 KasarJ 所有, 如有侵权,请联系我们删除。