一、kafka 使用场景
1、异步
2、解耦
3、消峰
二、基本概念
kafka基础架构
1、Broker
消息中间件处理节点,一个 Kafka 节点就是一个 Broker,一个或者多个 Broker 可以组成一个 Kafka 集群。
2、Topic
Kafka 的消息通过 Topic 主题来分类,Topic类似于关系型数据库中的表,每个 Topic 包含一个或多(Partition)分区。
3、Partition
多个分区会分布在Kafka集群的不同服务节点上,消息以追加的方式写入一个或多个分区中。
4、LogSegment
每个分区又被划分为多个日志分段 LogSegment 组成,日志段是 Kafka 日志对象分片的最小单位。LogSegment 算是一个逻辑概念,对应一个具体的日志文件(”.log” 的数据文件)和两个索引文件(”.index” 和 “.timeindex”,分别表示偏移量索引文件和消息时间戳索引文件)组成。
5、Offset:
每个分区中都由一系列有序的、不可变的消息组成,这些消息被顺序地追加到 Partition 中,每个消息都有一个连续的序列号称之为 Offset 偏移量,用于在 Partition 内唯一标识消息。
6、Message:
消息是 Kafka 中存储的最小最基本的单位,即为一个 commit log,由一个固定长度的消息头和一个可变长度的消息体组成。
7、Producer:
消息的生产者,负责发布消息到 Kafka Broker,生产者在默认情况下把消息均衡地分布到主题的所有分区上,用户也可以自定义分区器来实现消息的分区路由。
8、Consumer:
消息的消费者,从 Kafka Broker 读取消息的客户端,消费者把每个分区最后读取的消息的 Offset 偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。
9、Consumer Group:
每个 Consumer 属于一个特定的 Consumer Group(若不指定 Group Name则属于默认的 group),一个或多个 Consumer 组成的群组可以共同消费一个 Topic 中的消息,但每个分区只能被群组中的一个消费者操作。
10、__consumer_offsets
__consumer_offsets 这个内部 Topic,专门用来存储 Consumer Group 消费的情况,默认情况下有 50 个 partition,每个 partition 默认三个副本
三、Kafka工作流程及文件存储机制
1、工作流程
Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
Topic是逻辑上的概念,而partition(分区)是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到哪个offset,以便出错恢复时,从上次的位置继续消费。
2、文件存储机制
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引的机制,将每个partition分为多个segment。(由log.segment.bytes决定,控制每个segment的大小,也可通过log.segment.ms控制,指定多长时间后日志片段会被关闭)每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如:bing这个topic有3个分区,则其对应的文件夹为:bing-0、bing-1和bing-2。
索引文件和日志文件命名规则:每个 LogSegment 都有一个基准偏移量,用来表示当前 LogSegment 中第一条消息的 offset。偏移量是一个 64位的长整形数,固定是20位数字,长度未达到,用 0 进行填补。如下图所示:
index和log文件以当前segment的第一条消息的offset命名。index文件记录的是数据文件的offset和对应的物理位置,正是有了这个index文件,才能对任一数据写入和查看拥有O(1)的复杂度,index文件的粒度可以通过参数log.index.interval.bytes来控制,默认是是每过4096字节记录一条index。下图为index文件和log文件的结构示意图:
查找message的流程(比如要查找offset为170417的message):
- 首先用二分查找确定它是在哪个Segment文件中,其中0000000000000000000.index为最开始的文件,第二个文件为0000000000000170410.index(起始偏移为170410+1 = 170411),而第三个文件为0000000000000239430.index(起始偏移为239430+1 = 239431)。所以这个offset = 170417就落在第二个文件中。其他后续文件可以依此类推,以起始偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。
- 用该offset减去索引文件的编号,即170417 - 170410 = 7,也用二分查找法找到索引文件中等于或者小于7的最大的那个编号。可以看出我们能够找到[4,476]这组数据,476即offset=170410 + 4 = 170414的消息在log文件中的偏移量。
- 打开数据文件(0000000000000170410.log),从位置为476的那个地方开始顺序扫描直到找到offset为170417的那条Message。
3、数据过期机制
当日志片段大小达到log.segment.bytes指定的上限(默认是1GB)或者日志片段打开时长达到log.segment.ms时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。当前正在写入的片段叫做活跃片段,活跃片段永远不会被删除,所以如果你要保留数据1天,但是片段包含5天的数据,那么这些数据就会被保留5天,因为片段被关闭之前,这些数据无法被删除。
4、Kafka生产者
4.1 分区策略
- 多Partition分布式存储,利于集群数据的均衡。
- 并发读写,加快读写速度。
- 加快数据恢复的速率:当某台机器挂了,每个Topic仅需恢复一部分的数据,多机器并发。
4.2 分区原则
- 指明partition的情况下,使用指定的partition;
- 没有指明partition,但是有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
- 既没有指定partition,也没有key的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition数取余得到partition值,也就是常说的round-robin算法。
- 自定义分区策略:实现 org.apache.kafka.clients.consumer.internals.PartitionAssignor接口。
4.3 数据可靠性保证
4.3.1 kafka提供了哪些数据可靠性保证呢?
- kafka可以保证分区消息的顺序。如果使用同一个生产者往同一个分区写入消息,而且消息B在消息A之后写入,那么kafka可以保证消息B的偏移量比消息A的偏移量大,而且消费者会先读取到消息A再读取消息B。
- 只有当消息被写入分区的所有副本时,它才被认为是“已提交”的。生产者可以选择接收不同类型的确认,比如在消息被完全提交时的确认、在消息被写入分区首领时的确认,或者在消息被发送到网络时的确认。
- 只要还有一个副本是活跃的,那么已经提交的信息就不会丢失。
- 消费者只能读取到已经提交的消息。
4.3.2 复制
Kafka的复制机制和分区的多副本架构是kafka可靠性保证的核心。把消息写入多个副本可以使kafka在发生奔溃时仍能保证消息的持久性。
kafka的topic被分成多个分区,分区是基本的数据块。每个分区可以有多个副本,其中一个是首领。所有事件都是发给首领副本,或者直接从首领副本读取事件。其他副本只需要与首领副本保持同步,并及时复制最新的事件。
Leader维护了一个动态的in-sync replica set(ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据同步后,leader就会发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader不可用时,将会从ISR中选举新的leader。满足以下条件才能被认为是同步的:
- 与zookeeper之间有一个活跃的会话,也就是说,它在过去的6s(可配置)内向zookeeper发送过心跳。
- 在过去的10s(可配置)内从首领那里获取过最新的数据。
4.3.3 影响Kafka消息存储可靠性的配置
4.3.4 ack应答机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没有必要等ISR中的follower全部接收成功。所以Kafka提供了三种可靠性级别,用户可以根据对可靠性和延迟的要求进行权衡。acks:
- 0: producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没写入磁盘就已经返回,当broker故障时可能丢失数据;
- 1: producer等待leader的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
- -1(all):producer等待broker的ack,partition的leader和ISR里的follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成重复数据。(极端情况下也有可能丢数据:ISR中只有一个Leader时,相当于1的情况)。
4.3.5 消费一致性保证
- follower故障
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。
等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
- leader故障
leader发生故障后,会从ISR中选出一个新的leader,之后为了保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
4.4 消息发送流程
Kafka 的producer 发送消息采用的是异步发送的方式。在消息发送过程中,涉及到了两个线程——main线程和sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
为了提高效率,消息被分批次写入kafka。批次就是一组消息,这些消息属于同一个主题和分区。(如果每一个消息都单独穿行于网络,会导致大量的网络开销,把消息分成批次传输可以减少网络开销。不过要在时间延迟和吞吐量之间做出权衡:批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长)。批次数据会被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算处理。
相关参数
- batch.size:只有数据积累到batch.size后,sender才会发送数据。(单位:字节,注意:不是消息个数)。
- linger.ms:如果数据迟迟未达到batch.size,sender等待 linger.ms之后也会发送数据。(单位:毫秒)。
- client.id:该参数可以是任意字符串,服务器会用它来识别消息的来源,还可用用在日志和配额指标里。
- max.in**.flight.requests.per.connection:该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设置为1可以保证消息时按发送的顺序写入服务器的,即使发生了重试。**
offset的维护
由于Consumer在消费过程中可能会出现断电宕机等故障,Consumer恢复后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到哪个位置,以便故障恢复后继续消费。Kafka0.9版本之前,Consumer默认将offset保存在zookeeper中,从0.9版本开始,Consumer默认将offset保存在Kafka一个内置的名字叫_consumeroffsets的topic中。默认是无法读取的,可以通过设置consumer.properties中的exclude.internal.topics=false来读取。
四、kafka的写入过程
注意:第一个步骤,之前版本:从zk的 /brokers/topics/saas-device-isapi-topic/partitions/1/state下取;
目前版本:根据实际遇到的问题,是从control去拿到patition的leader节点信息;(broker的节点映射错误后,会取不到leader,比如(host文件故意映射交换control的ip与普通的broker的ip))
zookeeper
第一步流程解释下:leader节点是存在zk的 get /brokers/topics/saas-device-isapi-topic/partitions/1/state 目录下 思考? 这个kafka会不会缓存在自己本地,减少与zk的交互?
这里可以看到 leader :1, 还有个isr
isr是什么?
既然大家已经知道了Partiton的多副本同步数据的机制了,那么就可以来看看ISR是什么了。
ISR全称是“In-Sync Replicas”,也就是保持同步的副本,他的含义就是,跟Leader始终保持同步的Follower有哪些。
大家可以想一下 ,如果说某个Follower所在的Broker因为JVM FullGC之类的问题,导致自己卡顿了,无法及时从Leader拉取同步数据,那么是不是会导致Follower的数据比Leader要落后很多?
所以这个时候,就意味着Follower已经跟Leader不再处于同步的关系了。但是只要Follower一直及时从Leader同步数据,就可以保证他们是处于同步的关系的。
所以每个Partition都有一个ISR,这个ISR里一定会有Leader自己,因为Leader肯定数据是最新的,然后就是那些跟Leader保持同步的Follower,也会在ISR里。
保证消息不丢失
1、生产者不丢失
- send方法使用同步发送,或者使用带回调的方法
- 设置重试次数**retries **合适的值,一般是3
2、消费者丢失
- 使用自动提交offset时,当消费者挂了,也会丢失。可以设置为手动提交offset,会带来消息被重新消费的问题,需要根据业务做幂等处理。
3、kafka服务端弄丢消息
- 设置 acks = all,所有副本都要接收到该消息之后该消息才算真正成功被发送。
- 设置 replication.factor >= 3,为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
- 设置 min.insync.replicas > 1,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。但是,为了保证整个 Kafka 服务的高可用性,你需要确保 replication.factor > min.insync.replicas 。为什么呢?设想一下加入两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成 replication.factor = min.insync.replicas + 1
- 设置 unclean.leader.election.enable = false,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。
1、配置同步到所有副本(即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。),且重试次数>1 ,具体根据业务来定
// 重试次数,0为不启用重试机制,幂等性的时候必须大于0
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ACKS_CONFIG, "all");
2、acks=all 就可以代表数据一定不会丢失了吗?
当然不是,如果你的Partition只有一个副本,也就是一个Leader,任何Follower都没有,你认为acks=all有用吗?
当然没用了,因为ISR里就一个Leader,他接收完消息后宕机,也会导致数据丢失。
所以说,这个acks=all,必须跟ISR列表里至少有2个以上的副本配合使用,起码是有一个Leader和一个Follower才可以。
这样才能保证说写一条数据过去,一定是2个以上的副本都收到了才算是成功,此时任何一个副本宕机,不会导致数据丢失。
如何保证消息的有序性
指定对应的key
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
return doSend(producerRecord);
}
kafka中计算发送对应分组的源代码
kafka吞吐量高的原因
- partition 并行处理
- 顺序写磁盘,充分利用磁盘特性
- 利用了现代操作系统分页存储 Page Cache 来利用内存提高 I/O 效率
- 采用了零拷贝技术
- Producer 生产的数据持久化到 broker,采用 mmap 文件映射,实现顺序的快速写入
- Customer 从 broker 读取数据,采用 sendfile,将磁盘文件读到 OS 内核缓冲区后,转到 NIO buffer进行网络发送,减少 CPU 消耗
这里使用了xmind 整理
4、存储方式
物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件)
5、Kafka消费过程分析
消费方式
consumer采用pull(拉)模式从broker中读取数据。
为什么不用推的方式? push
1、push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
2、 pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)
MQ有可能发生重复消费,如何避免,如何做到幂等
1、kafka中的幂等
//幂等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
2、业务上的幂等 (msg信息中需要有唯一标识)
1、使用数据库的唯一索引
2、使用redis
6、Kafka controller 控制器
1、是什么
在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller
2、之前版本不是采用Controller存在的问题
- 脑裂:虽然ZooKeeper能保证注册到节点上的所有监听器都会按顺序被触发,但并不能保证同一个时刻所有副本看到的状态是一样的,可能造成不同副本的响应不一致
- 羊群效应:如果宕机的那个Broker的Partition数量很多,会造成多个Watch被触发,引起集群内大量的调整
- 每个副本都要在ZK的Partition上注册Watcher,当集群内Partition数量很多时,会造成ZooKeeper负载过重 目前版本:不过每个broker还是会对/controller节点添加监听器的,以此来监听此节点的数据变化(参考ZkClient中的IZkDataListener)。
3、Controller与之前版本的不同
- Leader的变化从监听器改为由Controller管理
- 控制器负责检测Broker的失败,并为每个受影响的Partition选举新的Leader
- 控制器会将每个Leader的变化事件发送给受影响的每个Broker
- 控制器和Broker之间的通信采用直接的RPC,而不是通过ZK队列
4、 Controller的failover 故障转移
- 因为Leader管理被更加集中地管理,比较容易调试问题
- Leader变化针对ZK的读写可以批量操作,减少在failover过程中端到端的延迟
- 更少的ZooKeeper监听器
- 使用直接RPC协议相比队列实现的ZK,能够更加高效地在节点之间通信
5、从整个集群的所有Brokers中选举出一个Controller,它主要负责:
- Partition的Leader变化事件
- 新创建或删除一个topic
- 重新分配Partition
- 管理分区的状态机和副本的状态机
五、真实生产环境中的问题?
Kafka中Topic过多异常分析
1、kafka中偏移量的维护
1.1 、kafka 中存在一个__consumer_offsets topic 是专门维护所有topic的偏移量,这个topic下面有很多个分区(默认情况下__consumer_offsets有50个分区),每个topic下的分区节点维护在zk中
这个topic下面有50个分区
每个分区的leader不同,并不是只有一个leader维护这个topic,每个partion都有各自的leader
2、topic过多导致性能问题
topic过多,导致分区过多,kafka中只要还是会受分区数量的影响;
如下链接是说明kafka与分区数量的关系影响
参考链接:Apache Kafka Supports 200K Partitions Per Cluster | Confluent
翻译链接:https://www.cnblogs.com/huxi2b/p/9984523.html
3、发生rebalance
3.1 弊端
- coordinator协调者组件完成订阅主题分区的分配过程中,该消费者所有的实例不能消费任何消息
- 消费者很多的话,rebalance很慢,对业务产生影响
- rebalance效率不高,所有的消费者都要参与进来,0.11版本提供了sticky assignor,尽量保留之前的分配方案,实现分区变动最小
3.2 什么时候会rebalance机制
- 新的消费者加入消费者组
- 消费者从消费者组退出
- 消费者宕机下线。比如长时间GC、网络延迟导致长时间未向groupcoordinate发送心跳
- 消费者消费超时,没有在指定的时间内提交offset
- 消费者组对应得GroupCoordinate节点发生变化
- 消费者组订阅得主题或者主题得分区数发生变化
3.3 如何解决不必要的Rebalance
Coordinator 会在什么情况下认为某个 Consumer 实例已挂从而要退组呢?那就要来看看消费者端配置的几个参数:
- session.timeout.ms 设置了超时时间,consumer和broker的心跳超时时间,默认10s
- heartbeat.interval.ms 心跳时间间隔,consumer和broker的心跳检测时间,默认3s
- max.poll.interval.ms 每次消费的处理时间,两次poll的最大时间间隔,默认5分钟,超时则触发重平衡
- max.poll.records 每次消费的消息数,默认500条
- 消费者端频繁地full gc
六、深入Kafka服务端
6.1 时间轮
6.2 控制器
6.2.1 如何优雅的关闭Kafka?
kill -9快速关闭,不够优雅。bin目录下Kafka- server- stop.sh脚本工具,首先通过ps ax的方式找出正在运行Kafka的进程号pids,然后使用kill-s TERM $PIDS 或者kill -15
为什么这样关闭的方式会是优雅的?Kafka 服务入口程序中有一个名为“kafka-shutdown-hock”的关闭钩子,待 Kafka 进程捕获终止信号的时候会执行这个关闭钩子中的内容,其中除了正常关闭一些必要的资源,还会执行一个控制关闭(ControlledShutdown)的动作。使用ControlledShutdown的方式关闭Kafka有两个优点:一是可以让消息完全同步到磁盘上,在服务下次重新上线时不需要进行日志的恢复操作;二是 ControllerShutdown 在关闭服务之前,会对其上的leader副本进行迁移,这样就可以减少分区的不可用时间
七、kafka为什么这么快
- 第一个是Kafka对顺序I/O的依赖。
- 赋予 Kafka 性能优势的第二个设计选择是它对效率的关注:零复制原则。
该图说明了数据如何在生产者和消费者之间传输,以及零拷贝的含义。
- 步骤1.1 - 1.3:生产者将数据写入磁盘
- 第2步:Consumer无需零拷贝读取数据
2.1 数据从磁盘加载到OS缓存
2.2 数据从OS缓存复制到Kafka应用程序
2.3 Kafka应用程序将数据复制到socket缓冲区
2.4 将数据从socket buffer复制到网卡
2.5 网卡将数据发送给消费者
- 步骤3:消费者以零拷贝方式读取数据
3.1:数据从磁盘加载到OS缓存 3.2 OS缓存通过sendfile()命令直接将数据复制到网卡 3.3 网卡将数据发送给消费者
零拷贝是在应用程序上下文和内核上下文之间保存多个数据副本的快捷方式。
版权归原作者 大猩猩爱分享 所有, 如有侵权,请联系我们删除。