Kafka
Kafka最早是由LinkedIn公司开发的,作为其自身业务消息处理的基础,后LinkedIn公司将Kafka捐赠给Apache,现在已经成为Apache的一个顶级项目了,Kafka作为一个高吞吐的分布式的消息系统。Kafka是一种消息队列,主要用来处理大量数据状态下的消息队列,一般用来做日志的处理。
为什么使用Kafka?
- 系统解耦:耦合的状态表示当你实现某个功能的时候,是直接接入当前接口,而利用消息队列,可以将相应的消息发送到消息队列,这样的话,如果接口出了问题,将不会影响到当前的功能。
- 异步调用:异步处理替代了之前的同步处理,异步处理不需要让流程走完就返回结果,可以将消息发送到消息队列中,然后返回结果,剩下让其他业务处理接口从消息队列中拉取消费处理即可。
- 流量削峰:高流量的时候,使用消息队列作为中间件可以将流量的高峰保存在消息队列中,从而防止了系统的高请求,减轻服务器的请求处理压力。
优势:大数据领域、高吞吐量、低延迟、可扩展性、持久性、可靠性、容错性、高并发
应用领域:大数据领域、日志收集、消息系统、活动跟踪、数据处理、行为日志、等等方面
- 高吞吐:kafka高效读写
- 低延迟:分区分配策略(粘性分区策略)
- 可扩展性:采用了topic分区,分配到适应的broker
- 持久性:写磁盘持久化(.index文件和.log文件)
- 可靠性:producer的ack机制(幂等性<PID,partition,消息ID>)
- 容错性:分区副本集(ISR),数据一致性(HW),Leader容灾(controller)
- 高并发:broker集群,controller监控管理
Kafka的消费模式
Kafka的消费模式主要有两种:一种是一对一的消费,也即点对点的通信,即一个发送到消息队列一个接收。第二种为一对多的消费,即生产者发布消息到消息队列,消费者根据消息队列的订阅拉取消息消费。
一对一模式
消息生产者发布消息到消息队列中,通知消费者从队列中拉取消息进行消费。消息被消费之后则删除,Queue支持多个消费者,消费者之间存在竞争关系,对于一条消息而言,只有一个消费者可以消费,即一条消息只能被一个消费者消费。
发布/订阅模式
利用Topic存储消息,消息生产者将消息发布到Topic中,同时有多个消费者订阅此topic,消费者可以从中消费消息,注意发布到Topic中的消息会被多个消费者消费,消费者消费数据之后,数据不会被清除,Kafka会默认保留一段时间,然后再删除。
Kafka集群架构
- Producer:消息生产者,向Kafka中发布消息的角色。
- Consumer:消息消费者,即从Kafka中拉取消息消费的客户端。
- Consumer Group:消费者组,消费者组则是一组中存在多个消费者,消费者消费Broker中当前Topic的不同分区中的消息,消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够一个消费者组中的一个消费者所消费。
- Broker:经纪人,一台Kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个Topic。存在一个controller:负责管理集群broker的上下线、所有topic的分区副本分配和leader的选举等工作。Controller的工作管理是依赖于zookeeper的。
- Topic:主题,可以理解为一个队列,生产者和消费者都是面向一个Topic
- Partition:分区,为了实现扩展性,一个非常大的Topic可以分布到多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区有序,不能保证全局有序)(log文件,分片索引(.log文件,index文件))
- Replica:副本Replication,为保证集群中某个节点发生故障,节点上的Partition数据不丢失,Kafka可以正常的工作,Kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多个Follower
- Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
- Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader。
- Zookeeper:保存着集群 broker、 topic、 partition等meta 数据;另外,还负责broker故障发现,partition leader选举,负载均衡等功能
Kafka文件存储(顺序读写、分段命令、二分查找)
Kafka文件存储也是通过本地落盘的方式存储的(持久化机制,批量发送),主要是通过相应的log与index等文件保存具体的消息文件。
- log:相当于partition分区,log文件有序,生产者向log文件追加消息文件,里面包含多个分片segment。
- Segment:分片中存储着.log文件和.index文件,当.log文件大于1G时生成新的segment,目的是为了防止log文件过大导致定位效率低下,Kafka采取了分片和索引的机制来加速定位。
- .log文件:数据文件(消息日志文件),文件名为当前第一个消息的序号,里面存储索引和真实的数据(data),1G大小。每条数据对应一个索引(偏移量+数据大小),可以通过索引来定位查找。
- .index文件:索引文件,里面存储着offset(偏移量)+messageSize(数据大小)。文件名和.log文件名一样,扩展名不同。存储稀疏索引,避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。
- offset:Message在这个partition中的偏移量,offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message,可以认为offset是partition中Message的 id。consumer组中的每个consumer,都会实时记录自己消费到了哪个offset,以便出错恢复的时候,可以从上次的位置继续消费(0.9版本之前存在zk中,之后存在kafka一个内置的topic中,该topic为__consumer_offsets)
如果快速定位数据
通过数据具体的偏移量用二分查找法查找所在的segment(对比.index文件名获得),然后根据offset+数据大小来锁定.log文件中的真实数据。
分区原因
- 提高集群扩展性:消息topic由多个partition组成,且partition会根据分区规则(通过Controller,依赖zk)均衡分布到不同broker上。整个集群可以适应适合的数据,能有效利用broker集群的性能。
- 负载均衡:生产者或消费者根据特定的分区策略将消息发布到多个分区或者对多个分区进行消费以实现负载均衡。
- 提高吞吐量:消息是以追加到分区中的,多个分区顺序写磁盘的总效率要比随机写内存还要高(省去了大量磁头寻址的时间)。读写操作分配到多个分区和broker上,提高吞吐量。
生产者发布消息流程(Producer Publish Push)
Producer中的消息缓存模型(消息累加器RecordAccumulator)
Producer发送消息采用的是异步发送的方式,在消息发送的过程中,设计到了两个线程main线程和Sender线程,以及一个线程共享变量RecordAccumulator,main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker中。
要实现同步的话,用Future做返回值,调用Future.get()方法可以暂时阻塞,以达到同步发送的目的。
Kafka根据分区策略(Partitioner),选择对应的分区将消息分配(main线程)到消息累加器(RecordAccumulator)的ProducerBatch中暂时缓存起来,等消息到达batch.size()或者linger.ms时间到了,再进行批量发送(sender线程)。
优点:提高Producer客户端的发送吞吐量和提高性能,减少写磁盘I/O请求次数,消息缓存
缺点:消息的发送它必须要你的一个Batch满了或者linger.ms时间到了,才会发送。如果生产的消息比较少的话,迟迟难以让Batch塞满,那么就意味着更高的延迟。
消息压缩(GZIP或Snappy)
Producer端可以通过GZIP或Snappy格式对消息集合进行压缩。 Producer端进行压缩之后,在Consumer端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是CPU(压缩和解压会耗掉部分CPU资源)。
生产者分区分配策略(Partitioner)
4种分区分配策略:DefaultPartitioner 默认分区策略、UniformStickyPartitioner 纯粹的粘性分区策略、RoundRobinPartitioner 分区策略、CustomPartitioner自定义分区策略
DefaultPartitioner 默认分区策略
- 指明partition(这里的指明是指第几个分区)的情况下,直接将指明的值作为partition的值
- 没有指明partition的情况下,但是存在值key,此时根据序列化key的hash值与topic的可用partition数取模
- 如果不存在partition或key,则会使用粘性分区策略
粘性分区策略(Sticky Partitioner)
为什么使用粘性分区?(降低延迟)
如果将消息轮询到各个分区的ProducerBatch中, 本来消息就少,还给所有分区遍历的分配,那么每个ProducerBatch都很难满足发送条件,除非等linger.ms时间到,延迟很大。使用粘性分区之后,至少是先把一个Batch填满(随机选择)了发送然后再去填充另一个Batch,降低了延迟。
划重点:
选择第一个ProducerBatch是随机的
当一个Batch发送之后,需要选择一个新的粘性分区的时候
①. 可用分区<1 ;那么选择分区的逻辑是在所有分区中随机选择。
②. 可用分区=1; 那么直接选择这个分区。
③. 可用分区>1 ; 那么在所有可用分区中随机选择。
当选择下一个粘性分区的时候,不是按照分区平均的原则来分配,而是随机原则(当然不能跟上一次的分区相同),例如刚刚发送到的Batch是 1号分区,等Batch满了,发送之后,新的消息可能会发到2或者3,如果选择的是2,等2的Batch满了之后,下一次选择的Batch仍旧可能是1,而不是说为了平均,选择3分区。
UniformStickyPartitioner 纯粹的粘性分区策略
与DefaultPartitioner 分区策略的唯一区别:
DefaultPartitioner:如果有key的话,那么它是按照key来决定分区的,这个时候并不会使用粘性分区
UniformStickyPartitioner:不管你有没有key, 统一都用粘性分区来分配。
RoundRobinPartitioner 分区策略
- 如果消息中指定了分区,则使用它
- 没有指明partition的情况下,将消息平均的分配到每个分区中。
- 与key无关
当可用分区是0的话,那么就是遍历的是所有分区中的。当有可用分区的话,那么遍历的是所有可用分区的。
CustomPartitioner自定义分区策略
实现Partitioner接口,重写其中的方法即可
生产者ACK机制(0,1,-1)(保证消息可靠性)
为保证producer发送的数据能够可靠的发送到指定的topic中,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement),如果producer收到ack就会进行下一轮的发送,否则重新发送数据。会影响kafka集群的吞吐量和消息可靠性,二者不可兼得。
发送ack的时机:确保有follower与leader同步完成,leader在发送ack,这样可以保证在leader挂掉之后,follower中可以选出新的leader(主要是确保follower中数据不丢失)
副本数据同步策略
- 半数以上的follower同步完成,即可发送ack
优点:延迟低
缺点:容错率低,选举新的leader的时候,容忍n台节点的故障,需要2n+1个副本(因为需要半数同意,所以故障的时候,能够选举的前提是剩下的副本超过半数),容错率为1/2
- 全部的follower同步完成,才可以发送ack
优点:容错率高,选举新的leader的时候,容忍n台节点的故障只需要n+1个副本即可,因为只需要剩下的一个人同意即可发送ack了
缺点:延迟高
kafka选择第二种,更看重容错率,网络延迟对于Kafka的影响较小。改进为ISR(同步副本集)。
为什么改进使用ISR?
采用了第二种方案进行同步ack之后,如果leader收到数据,所有的follower开始同步数据,但有一个follower因为某种故障,迟迟不能够与leader进行同步,防止leader一直等待其发送ack,可以降低延迟。
ISR(同步副本集)
leader中维护了一个动态的ISR(in-sync replica set),即与leader保持同步的follower集合(包含leader副本),当ISR中的follower完成数据的同步之后,给leader发送ack,如果follower长时间没有向leader同步数据,则该follower将从ISR中被踢出,该时间阈值由(replica.lag.time.max.ms)参数设定。当leader发生故障之后,会从ISR中选举出新的leader。ISR 列表是持久化在 Zookeeper 中的。
ACK参数配置
0:producer不等待broker的ack,这一操作提供了最低的延迟,broker接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据,至多一次,不能保证数据不丢失,但能保证数据不重复。
1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将丢失数据。(只是leader落盘),至少一次,这里并不会保证数据不丢失,数据也可能重复。
-1(all):producer等待broker的ack,partition的leader和ISR的follower全部落盘成功才返回ack,但是如果在follower同步完成后,broker发送ack之前,如果leader发生故障,会造成数据重复。(这里的数据重复是因为producer没有收到broker的ack确认,所以继续重发导致的数据重复),至少一次,可以保证数据不丢失。
ExactlyOnce(精确的一次):建立在-1的基础上,目的是保证数据不重复。
- kafka 0.11之前,对下游数据进行去重;- kafka 0.11,引入幂等性,幂等性指代Producer不论向Server发送了多少次重复数据,Server端都只会持久化一条数据,去重操作放在了数据上游,在Producer的参数中设置(enable.idempotence=true),开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一个Partition的消息会附带Sequence Number(序列号),而Broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同主键的消息的时候,Broker只会持久化一条。但PID在重启之后会发生变化,同时不同的Partition也具有不同的主键,所以幂等性无法保证跨分区跨会话的Exactly Once。kafka幂等只保证单个生产者会话(session)中单分区的幂等。
异步发送带回调函数
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别为RecordMetaData和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。消息发送失败会启动重试机制,但需要在回调函数中手动重试。
同步后数据一致性问题(只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复)
- LEO(Log End Offset):每个副本最后的一个offset
- HW(High Watermark):高水位,指代消费者能见到的最大的offset,ISR队列中最小的LEO。
follower故障和leader故障
- follower故障:follower发生故障后会被临时提出ISR,等待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步,等待该follower的LEO大于等于该partition的HW,即follower追上leader之后,就可以重新加入ISR了。
- leader故障:leader发生故障之后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据的一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader中同步数据。
生产者拦截器(Interceptor)
Producer拦截器interceptor是在Kafka0.10版本引入的,主要用于Clients端的定制化控制逻辑。对于Producer而言,interceptor使得用户在消息发送之前以及Producer回调逻辑之前有机会对消息做一些定制化需求,比如修改消息的展示样式等,同时Producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链interceptor chain,并紧紧是捕获每个interceptor可能抛出的异常记录到错误日志中而非向上传递。Interceptor实现的接口为ProducerInterceptor,主要有四个方法:
configure(Map<String, ?> configs)
:获取配置信息和初始化数据时调用onSend(ProducerRecord record)
:该方法封装在KafkaProducer.send()
方法中,运行在用户主线程中,Producer确保在消息被序列化之前及计算分区前调用该方法,并且通常都是在Producer回调逻辑出发之前。onAcknowledgement(RecordMetadata metadata, Exception exception)
:onAcknowledgement运行在Producer的IO线程中,因此不要再该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。close()
:关闭inteceptor,主要用于执行资源清理工作。一定要调用Producer.close()方法,否则拦截器的close()方法不会被调用。
消费者订阅消息流程(Consumer Subscribe Pull)
消费方式以及为什么采取这种消费方式,有什么优缺点?
消费方式:consumer采用pull拉的方式来从broker中读取数据。
原因:push推的模式很难适应消费速率不同的消费者,因为消息发送率是由broker决定的,它的目标是尽可能以最快的速度传递消息,但是这样容 易造成造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
优点:pull方式则可以让consumer根据自己的消费处理能力以适当的速度消费消息。
缺点:如果Kafka中没有数据,消费者可能会陷入循环之中轮询 (因为消费者类似监听状态获取数据消费的),一直返回空数据,针对这一点,Kafka的 消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,时长为timeout。
消费者组
消费者组是Kafka实现单播和广播两种消息模型的手段。同一个topic,每个消费者组都可以拿到相同的全部数据(不论topic有几个分区)。Kafka默认的消费逻辑是:位于某个主题中的一个分区只能被同一个消费者组中的一个消费者消费。消费者所消费的分区是根据分区分配策略来的。
单个消费者组(单播)
- 同一个分区内的消息只能被同一个组中的一个消费者消费,当消费者数量多于分区数量时,多于的消费者空闲(不能消费数据)2. 当分区数多于消费者数的时候,有的消费者对应多个分区3. 当分区数等于消费者数的时候,每个消费者对应一个分区
多个消费者组(广播)
启动多个组,相同的数据会被不同组的消费者消费多次,同一个topic,每个消费者组都可以拿到相同的全部数据(不论topic有几个分区)。
消费者分区分配策略
Kafka的三种分配策略:RoundRobinAssignor轮询分区分配策略、RangeAssignor范围分区分配策略、StickyAssignor粘性分区分配策略
分区再平衡机制
当主题分区发生变化时、或有新消费者加入群组时、或群组中有消费者挂掉时,Kafka会触发分区再均衡操作(根据不同的分配策略继续进行分区分配)。 默认采用RangeAssignor范围分区分配策略。
那分区再平衡有哪些优缺点呢?
优点:能为消费者群组带来高可用性与伸缩性;
缺点:在发生再均衡这一期间内,消费者是无法读取信息的,所以这将会造成消费者群组会出现一小段时间不可用的情形。
所以在应用Kafka的过程中,需要避免无用的分区再均衡发生。
RoundRobinAssignor轮询分区分配策略
主要采用轮询的方式分配所有的分区,按照分区的字典对分区和消费者进行排序,然后对分区进行循环遍历,遇到订阅自己的则消费,否则向下轮询下一个消费者。即按照分区轮询消费者,继而消息被消费。会有消费者过载的情况
C0(t0,t1,t2),C1(t0,t1,t2),C2(t0,t1,t2)
- 先排序,t0-0判断C0是否订阅自己,然后C0(t0-0),C0和t0-0共同向下到底,t1-0判断C1是否订阅自己,然后C1(t1-0),C1和t1-0共同向下到底,t1-1判断C2是否订阅自己,然后C2(t1-1),以此类推,最终C0(t0-0,t2-0),C1(t1-0,t2-1),C2(t1-1,t2-2),平均分配。
C0(t0),C1(t0,t1),C2(t0,t1,t2)
- 同上,最终结果C0(t0-0),C1(t1-0),C2(t1-1,t2-0,t2-1,t2-2)
分区在循环遍历消费者,自己被当前消费者订阅,则分区与消费者共同向下(消息被消费),否则继续向下遍历消费者(消息没有被消费)。轮询的方式会导致每个Consumer所承载的分区数量不一致,从而导致各个Consumer压力不均。上面的C2因为订阅的比较多,导致承受的压力也相对较大。
RangeAssignor范围分区分配策略(默认)
基于Topic,首先按照字典对分区和消费者进行排序,每个分区计算订阅自己的各个Consumer将会承载的分区数量(计算方法:该Topic分区数/订阅该Topic的消费者数,如果没有除尽,多出来的分区则按照字典序挨个分配给消费者),然后将指定数量的分区分配给各个Consumer。会有消费者过载的情况。
- C1(t0,t1),C2(t0,t1) ,T0,T1分别有4个分区
t0判断有两个消费者订阅自己,然后用4个分区数除以2个消费者数得到每个消费者分配的分区数为2,所以C1(t0-0,t0-1),C2(t0-2,t0-3),T2分配类似。
- C1(t0,t1),C2(t0,t1) ,T0,T1分别有3个分区
T0判断有两个消费者订阅自己,然后用3个分区数除以2个消费者数得到每个消费者分配的分区数为1余1,将余下来的分区按字典序号依次分配给消费者,所以C1(T0-0,T0-1),C2(T0-2),T1分配类似
StickyAssignor粘性分区分配策略(0.11版本开始)
- 主题分区仍然尽可能均匀地分布(减少消费压力不均衡的情况)
- 当发生分区再平衡时主题分区尽可能与其先前分配的使用者在一起
步骤
- 首先进行分区和消费者进行排序
分区:根据被订阅的消费者数量进行从小到大排序,订阅数相同则按分区字典进行排序
消费者:根据当前消费者所分配到的分区数量从小到大排序,分区数量相同按照消费者字典进行排序
- 依次遍历分区分配给各个consumer,每次分配的时候都要从头开始遍历所有的consumer,并且每次分配完分区的时候,都会按照consumer所分配的分区数量的从小到大进行重排序(进行分区分配的时候consumer的顺序是动态变化的)
- 当发生分区再平衡时,将宕机的consumer所拥有的分区和活着的consumer按照上面的规则进行排序然后分配。如上面C0宕机,就会导致t0-0落到C1
消费者重置offset
consumer组中的每个consumer,都会实时记录自己消费到了哪个offset,以便出错恢复的时候,可以从上次的位置继续消费(0.9版本之前存在zk中,之后存在kafka一个内置的topic中,该topic为__consumer_offsets),设置(
auto.offset.reset
)。利用Rebalance分区监听器监听rebalance事件,一旦发生rebalance,先将offset提交,分区之后则找到最新的offset位置继续消费即可。
auto.offset.reset取值
- earliest:重置offset到最早的位置2. latest:重置offset到最新的位置,默认值3. none:如果在消费者组中找不到前一个offset则抛出异常4. anything else:抛出异常给消费者
消费者提交offset
自动提交(默认):
enable.auto.commit = true
,本质是基于时间提交。手动提交:由于开发人员难以把握offset提交时机
- commitSync(同步提交):提交本次pull拉取的一批数据的最高的偏移量提交,阻塞当前线程,持续到提交成功,失败会自动重试,可以设置超时时间,阻塞超过时间则释放。2. commitAsync(异步提交):提交本次pull拉取的一批数据的最高的偏移量提交,不阻塞当前线程,没有失败重试机制,有可能提交失败,多出一个offset提交的回调函数,可通过回调函数中的Exception来判断是否失败。
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.out.println("Commit failed, offset = " + offsets);
}
}
});
数据漏消费和重复消费分析:无论是同步提交还是异步提交offset,都可能会造成数据的漏消费或者重复消费。
- 先提交offset后消费(漏消费):先提交offset,consumer消费没到offset挂了,接手的consumer从offset开始消费,造成漏消费。
- 先消费后提交offset(重复消费):consumer先消费,在提交offset的时候挂了,导致接手的consumer从消费过的offset开始消费,重复消费。
Kafka高效读写
- 写高效:
- 消息是以追加到分区中的,多个分区顺序写磁盘的总效率要比随机写内存还要高(省去了大量磁头寻址的时间);2. 采用消息缓存机制,批量写盘,减少写磁盘I/O请求次数。
- 读高效:
零复制技术
零拷贝技术(I/O零拷贝技术)
数据直接从磁盘DMA拷贝到内核缓冲区,再从内核缓冲区DMA拷贝到NIC(网卡),把描述符和数据长度发送给socket缓冲区。减少了两次CPU拷贝和应用程序内核间的上下文切换。
Kafka中zookeeper的作用
brokers中的controller
Controller 作为 Kafka Server端一个重要的组件,它的角色类似于其他分布式系统Master的角色,跟其他系统不一样的是,Kafka集群的任何一台Broker都可以作为Controller,但是在一个集群中同时只会有一个 Controller是alive状态。在于分布式系统中,总会有一个地方需要对全局 meta 做一个统一的维护,Kafka 的 Controller 就是充当这个角色的。Controller 是运行在 Broker 上的,任何一台 Broker 都可以作为 Controller,但是一个集群同时只能存在一个 Controller,也就意味着 Controller 与数据节点是在一起的。
- Broker 的上线、下线处理;
- 新创建的 topic 或已有 topic 的分区扩容,处理分区副本的分配、leader 选举;
- 管理所有副本的状态机和分区的状态机,处理状态机的变化事件;
- topic 删除、副本迁移、leader 切换等处理。
Kafka集群controller的选举
Broker在启动时,会尝试去ZK创建/controller节点,第一个成功创建/controller节点的Broker会被指定为控制器。每个Broker都会在Controller Path (/controller)上注册一个Watch。 当前Controller失败时,对应的Controller Path会自动消失(因为它是ephemeral Node),此时该Watch被fire,所有“活” 着的Broker都会去竞选成为新的Controller (创建新的Controller Path),但是只会有一个竞选成功(这点由Zookeeper保证)。竞选成功者即为新的Leader,竞选失败者则重新在新的Controller Path上注册Watch。因为Zookeeper的Watch是一次性的, 被fire一次之后即失效,所以需要重新注册。
controller对partition的分配
- 将所有Broker(假设共n个Broker)和待分配的Partition排序
- 将第i个Partition分配到第(i mod n)个Broker上 (这个就是leader)
- 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
partition的leader选举过程(Leader容灾)
- Controller 会在 ZK 的 /brokers/ids 节点上注册 Watch,一旦有 Broker 宕机,它就能知道。当 Broker 宕机后,Controller 就会给受到影响的 Partition 选出新 Leader。
- Controller 从 ZK 的 /brokers/topics/[topic]/partitions/[partition]/state 中,读取对应 Partition 的 ISR(in-sync replica 已同步的副本)列表,选一个出来做 Leader。选出 Leader后,更新ZK,然后发送 LeaderAndISRRequest 通知需为此作出响应的Broker(controller直接发送RPC请求)。
- 从 ISA 中选出 Leader 后,Follower 会把自己日志中上一个高水位后面的记录去掉,然后去和 Leader 拿新的数据。
为什么不能通过ZK的方式来选举partition的leader?
如果使用zk选举leader,会给zk带来巨大的压力。所以kafka中leader的选举不能使用zk来完成。最关键的是zk的选举没有那么快,有延迟。
zookeeper的作用
Broker注册:Broker在zookeeper中保存为一个临时节点,节点的路径是/brokers/ids/[brokerid],每个节点会保存对应broker的IP以及端口等信息;
Topic注册:在kafka中,一个topic会被分成多个区并被分到多个broker上,分区的信息以及broker的分布情况都保存在zookeeper中,根节点路径为/brokers/topics,每个topic都会在topics下建立独立的子节点,每个topic节点下都会包含分区以及broker的对应信息;
partition状态信息:/brokers/topics/[topic]/partitions/[0…N] 其中[0…N]表示partition索引号;
Controller epoch:此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller中央控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1;
Controller注册信息:存储center controller中央控制器所在kafka broker的信息;
生产者负载均衡:当Broker启动时,会注册该Broker的信息,以及可订阅的topic信息。生产者通过注册在Broker以及Topic上的watcher动态的感知Broker以及Topic的分区情况,从而将Topic的分区动态的分配到broker上;
消费者:kafka有消费者分组的概念,每个分组中可以包含多个消费者,每条消息只会发给分组中的一个消费者,且每个分组之间是相互独立互不影响的。Consumer注册信息:每个consumer都有一个唯一的ID(consumerId可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息;./consumers/[groupId]/ids/[consumer_id]是一个临时的znode,此节点的值为该消费者订阅的Topic信息,即表示此consumer目前所消费的topic + partitions列表;
消费者与分区的对应关系:对于每个消费者分组,kafka都会为其分配一个全局唯一的Group ID,分组内的所有消费者会共享该ID,kafka还会为每个消费者分配一个consumer ID,通常采用hostname:uuid的形式。在kafka的设计中规定,对于topic的每个分区,最多只能被一个消费者进行消费,也就是消费者与分区的关系是一对多的关系。消费者与分区的关系也被存储在zookeeper中节点的路劲为 /consumers/[group_id]/owners/[topic]/[broker_id-partition_id],该节点的内容就是消费者的Consumer ID;
消费者负载均衡:消费者服务启动时,会创建一个属于消费者节点的临时节点,节点的路径为 /consumers/[group_id]/ids/[consumer_id],该节点的内容是该消费者订阅的Topic信息。每个消费者会对/consumers/[group_id]/ids节点注册Watcher监听器,一旦消费者的数量增加或减少就会触发消费者的负载均衡。消费者还会对/brokers/ids/[brokerid]节点进行监听,如果发现服务器的Broker服务器列表发生变化,也会进行消费者的负载均衡;
kafka事务(0.11版本)
kafka从0.11版本开始引入了事务支持,事务可以保证Kafka在Exactly Once语义的基础上,生产和消费可以跨分区的会话,要么全部成功,要么全部失败。kafka中的事务可以使应用程序将消费消息,生产消息,提交消费偏移量(offset)当作原子操作来处理,同时成功或者失败,即使该生产或消费跨越多个分区。
Producer事务
为什么引入事务?
kafka幂等只保证单个生产者会话(session)中单分区的幂等。幂等性并不能跨多个分区运行,而事务可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在不一致的情况。
为了按跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID(用户显示设置),并将Producer获得的PID(可以理解为Producer ID,kafka内部分配)和Transaction ID进行绑定,这样当Producer重启之后就可以通过正在进行的Transaction ID获得原来的PID。为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator,Producer就是通过有和Transaction Coordinator交互获得Transaction ID对应的任务状态,Transaction Coordinator还负责将事务信息写入内部的一个Topic中,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以恢复,从而继续进行。当新生产者实例被创建且工作的时候,旧的且拥有相同transactional ID的生产者实例将不再工作。新的生产者可以保证任何未完成的旧事务要么被提交(Commit),要么被中止(Abort),如此可以使新的生产者实例从一个正常的状态开始工作。通过事务,Kafka 可以保证跨生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。
Consumer事务
事务能保证的语义相对偏弱,Kafka 并不能保证已提交的事务中的所有消息都能够被消费;
原因:
- 对采用日志压缩策略的主题而言,事务中的某些消息有可能被清理(相同key的消息,后写入的消息会覆盖前面写入的消息)。
- 事务中消息可能分布在同一个分区的多个日志分段(LogSegment)中,当老的日志分段被删除时,对应的消息可能会丢失。
- 消费者可以通过seek()方法访问任意offset的消息,从而可能遗漏事务中的部分消息。
- 消费者在消费时可能没有分配到事务内的所有分区,如此它也就不能读取事务中的所有消息。
版权归原作者 夜酱ovo 所有, 如有侵权,请联系我们删除。