基础概念
Broker: 与RobbitMq类似,一个Kafka消息中间件节点就是一个Broker,一个或者多个Broker可以组成一个Kafka集群。
Producer: 消息的生产者,向Broker发送消息的客户端。
Topic: 主题,是每个消息的分类。每个发送到Kafka的消息都必须指明一个Topic,消费者消费时也必须指明要消费哪个Topic。
Partition: 分区。一个Topic可以有多个分区,分区内部的消息是有序的。
Consumer: 消息的消费者,向Broker拉取消息的客户端。
ConsumerGroup: 消费组。每个消费者进行消费时必须以消费组的形式进行消费,即使消费组里只有一个消费者。一条消息可以被多个ConsumerGroup消费,但是一个ConsumerGroup里只能有一个Consumer消费该消息。
Topic和消息日志log
对于每一个Topic,可以同时分为多个partition日志文件:
partition存储着message,内部是有序的。message到达partition时会按顺序添加到commit log中,每条message都有一个唯一的offset。
每个partition都有一个commit log,一个partition中的message的offset是唯一的,但是在不同partition的message的offset有可能是相同的。
kafka一般不会删除消息,不管消息有没有被消费。kafka一般是通过配置在配置文件里的日志保留时间(log.retention.hours),决定日志的最长保留时间,一般默认保留一周。
kafka的性能跟日志文件大小没有关系,所以保留大量的日志消息不会对kafka的性能产生什么影响。
consumer进行消费时是基于维护在commit log的offset实现的。在kafka中,consumer会自己维护offset,消息完一条消息offser就+1,或者通过指定对应的offser或者时间offset进行跳过某几条消息或者重复消费某几条消息。
这意味着consumer的消费情况都由consumer本身进行维护,对于kafka的性能消耗是非常小的,增加或者减少consumer都不会对kafka集群或者consumer产生影响,因为offser是consumer各自维护的。
partition
一般情况下,partitions都分布在不同的broker下,每个broker可以请求备份其他broker下的partition下的数据。
同时,也可以通过配置,指定每个partition的副本数量。对于每个partition,都有一个broker起到leader作用,follower的数量可以是0个或者多个。并且只有leader才能接收针对这个partition消息的读写请求,其他follower只能被动的从leader同步数据,不提供读写。如果leader挂了,那么会选举其中一个follower成为leader。
ISR: In-Sync Replicas (同步副本集)。指的是与leader保持数据同步的副本的集合。当ledaer挂时,会从ISR中选出一个当folloer。如果ISR中的broker没有及时从ledaer中同步数据,就会从ISR中剔除。
消息传递模式
传统的消息中间件消息传递模式有2种,对于kafka的实现模式:
- Queue:多个consumer同时拉取消息,一条消息只能被一个consumer消费。 将所有的consumer配置在一个ConsumerGroup下。
- Publish-Subscribe模式:所有订阅该Topic的consumer都能收到消息 将所有的consumer都分配一个独立的ConsumerGroup。
kafka设计原理
核心总控制器 Controller
当kafka集群启动时,它会从集群中的多个broker选出一个broker作为controller,负责管理整个集群中所有分区和副本的状态:
- 当某个topic下的某个分区的leader故障,controller负责选举出新的leader;
- 当某个分区的ISR集合中的数据发生变化时,controller负责将相关的元信息同步到所有的broker中;
- 当某个topic的分区数量增加时,controller负责将该新分区被其他的broker感知到。
Controller选举和工作机制:
整个Controller的选举和工作机制都是基于Zookeeper实现的。
选举机制: 当kafka集群启动时,所有的broker都会尝试在zookeeper上创建一个 /controller 临时节点,zookeeper会保证只有一个broker节点创建成功,创建成功的broker节点就成为了核心控制器Controller。
其他没有成功创建 /controller 的broker会监听这个临时节点,当充当controller角色的borker节点宕机,那么 /controller 就会销毁,其他监听的broker会再次争抢创建临时节点,最后成功创建的成为了controller,这就是选举机制。
工作机制:
- 监听broker: 监听 zookeeper 中的 /brokers/ids/节点,处理broker数量的变化;
- 监听topic: 监听zookeeper中的 /brokers/topics,处理topics数量的变化;
- 监听partition: 监听zookeeper中的 /brokers/topics/[topic],处理partition数量的变化;
- 从zookeeper获取所有与brokers,topics和partition相关得到消息并进行管理。
Partition副本选举Leader机制
如果某个partition的leader挂了,那么controller会从ISR列表中选举出leader。kafka会优先挑选ISR列表中挑选第一个broker作为leader。因为一个leader最先放入ISR,可能同步的数据最多。如果配置了 unclean.leader.election.enable = true ,那么也可以从ISR以外的broker中挑选出leader。
follower进入ISR的条件:
- follower不能产生网络分区,必须与leader保持网络联通并且与zookeeper保持会话;
- follower能同步leader的所有写操作,至少不能落后太多。落后的多少是由 replica.lag.time.max.ms 决定的,如果超过这个时间未与leader进行同步,则会被剔除ISR。
consumer的offset记录机制
每个consumer会定期将各自的offset发送到 kafka 的内部 topic : _consumer_offsets,对应的 key 是 consumerGroupId + topic + 分区号,value 就是当前 offset 的值。consumer宕机后重新启动也是根据该key获取对应的offset。kafka 会定期清理这部分数据,只保留最新的。
从这里可以看出,_consumer_offsets可能会有很高的并发量,所以默认给其50个分区(可通过offsets.topic.num.partitions配置)。
消费者rebalance机制
如果某个topic内consumerGroup里的consumer数量或者partition数量发生了变化,这时会触发 kafka 的 rebalance机制。例如某个consumer宕机,此时会将它所负责的partition分给其他消费者,当它状态恢复正常后,又会将他负责的partition还给他。
rebalance机制只会在subscribe这种不指定分区的情况下触发,如果是assign这种指定分区的情况下不会触发。
以下几种情况会触发rebalance:
- consumerGroup 里的 consumer 数量增加或者减少;
- topic增加了分区;
- 消费组订阅了更多topic。 需要注意的是,处于rebalance的kafka集群,consumer无法从kafaka中消费消息,所以应当选择在流量较低的时间触发执行rebalance。Rebalance策略: range策略:例如有10个broker,3个consumer,第一个consumer消费 1-4,第二个消费 5-7,第三个消费 8-10; round-robin:轮询策略; sticky:与round-robin类似,但是rebalance时会尽量不去破坏原有的分配规则。
HW与LEO
LEO: log-end-offset 指的是副本中的日志文件下一条消息的offset,这个offset值目前是没有消息的。
HW:HighWatermark (高水位)。取得是partition中 ISR最小的LEO值。
consumer最多只能消费到 HW 的位置。另外,在每个副本集中,leader 和 follower 都需要维护自己的 HW 状态。当 producer 发送消息到leader 时,consumer不能立即消费到这条消息。leader 首先更新自己的LEO,然后 follower 从 leader 中同步后也更新自己的LEO值,等到所以的 follower 都更新完毕,那么副本集中的 HW 就会更新,此时消费者才能消费到这条消息。这么做的好处是:如果 leader 宕机,consumer 依然可以消费到这条消息。
日志分段存储
在 kafka 中,日志是以分段的形式存储的。一个 partition 中的所有的消息都放在一个以 topic + 分区号 命名的文件夹下存储。消息存储的格式如下:
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
00000000000005367851.index
00000000000005367851.log
00000000000005367851.timeindex
00000000000009936472.index
00000000000009936472.log
00000000000009936472.timeindex
以 “.index” 为结尾的文件,文件名就代表这段日志文件中起始offset的位置;
以 “.timeindex” 为结尾的文件,kafka每发送 4k 大小的消息到分区,就会记录一条当前消息的时间戳与对应的offset位置;
以 “.log” 为结尾的文件,存放者具体的消息,包括 offset 和消息体。
kafka 中默认每个日志段大小是 1G(可通过log.segment.bytes配置)。
kafka的重要参数与问题总结
一些比较重要的参数:
producer
- acks: (1) acks = 0。表示 producer不需要等待任何 broker 确认收到消息的回复,就可以发送下一条消息。这种性能最高,但是最容易丢消息; (2) acks = 1。表示 producer 需要等待 leader 确认将消息写入 log 的回复,但是不需要等待所有的 follower 都成功写入,就可以继续发送下一条消息。这种情况下,follower 还未及时同步消息,leader 又宕机了,那么就会丢消息。 (2) acks = -1/all。表示 producer 需要等待 leader 和 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数的配置的副本个数成功写入log的回复,才能发送下一条消息。这种策略会保证只要有一个 follower 存活就不会丢失数据。
- retries:消息发送失败时的重试次数。重试能保证消息发送的可靠性,但可能会导致消息重复发送。
- retry.backoff.ms:消息发送失败时的重试间隔,默认是100ms。
- buffer.memory:消息发送的本地缓冲区,如果设置了该值,那么消息会先发送到缓冲区中,在一定程度上可以提高性能。默认值是33554432,即32MB。
- batch.size: kafka本地线程会从缓冲去取数据,批量发送到broker。该值可以设置批量发送数据的大小,默认是16384,即16kb。
- linger.ms: 默认值是0,表示消息立即必须被发送。消息发送完后会放入一个本地的batch,如果设置了该值,在指定的时间内达到 batch.size 的大小,那么就会随着这个batch发送出去;如果没未达到,这条消息也会被发送出去。目的是不让消息发送的延时时间太长。
- key.serializer:指定key从字符串序列化到字节数组的序列化器。
- value.serializer:指定value从字符串序列化到字节数组的序列化器。
consumer
- group.id: 消费者所属的消费组id
- enable.auto.commit:是否由kafka自动提交offset。
- auto.offset.reset:当消费主题的是一个新的消费组;或者指定offset的消费方式,但是offset不存在,消费者如何消费。 (1)latest(默认): 只消费自己启动之后发送到主题的消息。 (2)earliest: 第一次从头开始消费,以后按照offset 的记录继续消费。
- heartbeat.interval.ms: 消费者发送到broker的心跳间隔时间。需要注意的是,如果发生了rebalance ,那么broker会通过心跳响应将rebalance方案。
- session.timeout.ms:broker多长没有收到consumer的心跳消息就将consumer从消费组踢出,默认是10秒。
- max.poll.records:一次性从consumer拉取消息的最大条数。
- max.poll.interval.ms:如果两次拉取消息的间隔时间超过这个时间,那么broker认为这个consumer处理能力太弱,将该consumer提出消费组。
- key.deserializer:指定key的反序列化器。
- value.serializer:指定value的反序列化器。
问题及优化方案
1. 消息丢失:
发送端:
(1) acks = 0。表示 producer不需要等待任何 broker 确认收到消息的回复,就可以发送下一条消息。这种性能最高,但是最容易丢消息;
(2) acks = 1。表示 producer 需要等待 leader 确认将消息写入 log 的回复,但是不需要等待所有的 follower 都成功写入,就可以继续发送下一条消息。这种情况下,follower 还未及时同步消息,leader 又宕机了,那么就会丢消息。
(2) acks = -1/all。表示 producer 需要等待 leader 和 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数的配置的副本个数成功写入log的回复,才能发送下一条消息。这种策略会保证只要有一个 follower 存活就不会丢失数据。
消费端:
将 auto.offset.reset 设置为 false,不自动提交offset。只有当业务代码成功执行完毕,再提交offser,避免消息丢失。
2. 消息重复消费
发送端: 如果配置了重试机制,可能因为网络原因会造成消息的重复提交。
消费端: 消费端如果是手动提交offset,可能出现业务代码执行完毕,但offset还没提交,下一次拉取就会消费重复的数据。
解决办法是,消费端的消费接口保证幂等性。
3.消息乱序
如果在消息发送端配置超时机制,那么就有可能出现,发送了1,2,3条消息,但是第一条出现网络问题导致落后于后面两条消息,从而导致消息到达broker后的顺序变成了2,3,1。
解决办法:
(1). 将发送消息的发送方式改成同步,保证一条消息发送成功后再发送下一条;或者直接关闭重试机制。这两种方案都会导致发送端的性能降低。
(2). 如果想要保证消息全链路的消费有序,需要保证有序的消息全部发送到topic的一个分区内,并且只能有一个consumer进行消费。或者消费者接收到消息后,将需要保证有序的消息放入到内存队列里,再开启一个线程去消费。
4.消息积压
(1) 由于发送端发送速率过快,或者消费端消费能力太低,导致消息挤压。
这个时候可以启动新的消费者,把旧的消费者暂时关掉。新的消费者程序不处理具体业务,而是将挤压的消息转发到其他新的topic下,这个topic可以设置很多分区,然后启动多台消费者去消费新的topic。
(2) 由于消息格式有误或者消费程序出现bug,导致消费者消费消息一直不成功。
可以将这些消费不成功的消息取出放入数据库或者磁盘中(类似死信队列),后面再慢慢分析消费失败的原因。
5.延时队列
实现方案:
发送端将消息按照不同的延迟时间发送到不同的topic下,如 topic_1s,topic_5s,topics_1min… ,然后开启定时器去轮询这些topic下的消息,如果时间到了就将这些消息转发到具体业务处理的topic下。
kafka高性能的原因
- kafka 写日志时每次都是在文件末尾追加,因为kafka的消息不能修改或者删除保证了磁盘的顺序读,不会出现文件的随机写的情况;
- 数据传输的零拷贝;
- 消息发送和消费的批量处理和压缩传输。
零拷贝:
- 减少了内核到用户空间,用户空间到内核的两次拷贝;
- 减少了内核与用户空间的上下文切换。
版权归原作者 肯定不吃番茄啊 所有, 如有侵权,请联系我们删除。