0


Kafka调研

文章目录

1. Kafka概述

  Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域,如海量日志的处理。

1.1 Kafka消费模式

  Kafka的消费模式主要有两种:一种是一对一的消费,也即点对点的通信,即一个生产者一个消费者。第二种为一对多的消费,即一个消息发送到某个Topic的消息队列,所有订阅了的消费者从该队列中拉取消息消费。

1.1.1 点对点模式

  消息生产者发布消息到Queue队列中,通知消费者从队列中拉取消息进行消费。消息被消费之后则删除,Queue支持多个消费者,但对于一条消息而言,只有一个消费者可以消费,即一条消息只能被一个消费者消费。
700

1.1.2 发布/订阅模式

  这种模式也称为发布/订阅模式,即利用Topic存储消息,消息生产者将消息发布到Topic中,同时有多个消费者订阅此Topic,消费者可以从中消费消息,注意发布到Topic中的消息会被多个消费者消费,消费者消费数据之后,数据不会被清除,Kafka会默认保留一段时间,然后再删除。
650

1.2 Kafka的基础架构

  Kafka主要存在生产者Producer、Kafka集群Broker、消费者Consumer、注册消息Zookeeper架构图如下:
900

  • Producer:消息生产者,向kafka broker发送消息的客户端.
  • Broker:Kafka集群中的服务器,一台Kafka服务器就是一个Broker,即Kafka集群的一个节点,一个集群由多个Broker组成,一个Broker可以容纳多个Topic。
  • Consumer:消息消费者,即从Kafka中拉取消息消费的客户端。
  • Consumer Group:消费者组,消费者组(唯一组ID)中存在多个消费者(≥1个),一个Topic的一个分区中的消息只能够被一个消费者组中的一个消费者消费,Topic内的分区被均匀地分配给1个消费者组中的不同消费者,因此消费者可能消费多个分区。向消费组添加消费者(可以是线程or进程)是横向扩展消费能力的主要方式。消费者组之间互不影响,消费者组是逻辑上的一个订阅者。 参考链接
  • Topic:主题,可以理解为一个队列,生产者和消费者都是面向一个Topic。
  • Partition:分区,为了实现扩展性,一个非常大的Topic可以分布到多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区有序,不能保证全局有序)。
  • Replica:副本,为保证在集群中某个节点发生故障时,节点上的Partition数据不丢失,Kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多个Follower。
  • Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。如果多个消费者,则分别消费Leader和各个Follower中的消息
  • Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader。
  • Zookeeper:用来保存kafka元数据信息(leader follower在那个broker等信息)。
  • offset:偏移量,消息在Partition中的编号,0.8版本之前保存在ZooKeeper,新版保存在Kafka Topic中:___consumer_offsets,默认50个分区(为什么偏移量从ZooKeeper转移到Kafka Topic,因为ZooKeeper不适合高并发下的读写,当Topic很多,每个Topic有多个分区,随着消费者越来越多,ZooKeeper不能满足偏移量的实时更新)。

1.3 Kafka的安装和使用

docker的安装可以看这篇文章:Docker&Docker命令学习

一、安装与启动Kafka和ZooKeeper

# docker直接拉取kafka和zookeeper的镜像
docker pull wurstmeister/kafka
docker pull wurstmeister/zookeeper 
# 首先需要启动zookeeper,如果不先启动,启动kafka没有地方注册消息
docker run -it --name zookeeper -p 12181:2181 -d wurstmeister/zookeeper:latest
# 启动kafka容器,注意需要启动三台,注意端口的映射,都是映射到9092
# 第一台
docker run -it --name kafka01 -p 19092:9092 -d -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.233.129:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.233.129:19092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
# 第二台
docker run -it --name kafka02 -p 19093:9092 -d -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=192.168.233.129:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.233.129:19093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
# 第三台
docker run -it --name kafka03 -p 19094:9092 -d -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=192.168.233.129:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.233.129:19094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest

  上面端口的映射注意都是映射到Kafka的9092端口上!否则将不能够连接!

二、具体命令学习

# 创建topic名称为first,3个分区,1个副本
./kafka-topics.sh --zookeeper 192.168.233.129:12181 --create --topic first --replication-factor 1 --partitions 3
# 查看first此topic信息
./kafka-topics.sh --zookeeper 192.168.233.129:12181 --describe --topic first
Topic: first    PartitionCount: 3    ReplicationFactor: 1    Configs: 
    Topic: first    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
    Topic: first    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
    Topic: first    Partition: 2    Leader: 1    Replicas: 1    Isr: 1
# 调用生产者生产消息
./kafka-console-producer.sh --broker-list 192.168.233.129:19092,192.168.233.129:19093,192.168.233.129:19094 --topic first
# 调用消费者消费消息,from-beginning表示读取全部的消息
./kafka-console-consumer.sh --bootstrap-server 192.168.233.129:19092,192.168.233.129:19093,192.168.233.129:19094 --topic first --from-beginning
# 删除topic


删除Topic

./kafka-topic.sh --zookeeper 192.168.233.129:12181 --delete --topic second

从上图可以看到删除的时候只是被标记为删除

marked for deletion

并没有真正的删除,如果需要真正的删除,需要再

config/server.properties

中设置

delete.topic.enable=true

修改分区数

./kafka-topics.sh --zookeeper 192.168.233.129:12181 --alter --topic test2 --partitions 3

2. Kafka原理

2.1 Kafka工作流程

  Kafka中消息是以topic进行分类的,Producer生产消息,Consumer消费消息,都是面向topic的。
750
  Topic是逻辑上的概念,Partition是物理上的概念,每个Partition对应着一个log文件,该log文件中存储的就是producer生产的数据,

Topic=N*partition;partition=log

  Producer生产的数据会被不断的追加到该log文件的末端,且每条数据都有自己的offset,consumer组中的每个consumer,都会实时记录自己消费到了哪个offset,以便出错恢复的时候,可以从上次的位置继续消费。流程:Producer => Topic(Log with offset)=> Consumer。

2.2 消息存储与查询的机制

2.2.1 消息存储的机制

  Kafka文件存储也是通过本地落盘的方式存储的,主要是通过相应的log与index等文件保存具体的消息文件。
600
  生产者不断的向log文件追加消息文件,为了防止log文件过大导致定位效率低下,Kafka的log文件以1G为一个分界点,当

.log

文件大小超过1G的时候,此时会创建一个新的

.log

文件,同时为了快速定位大文件中消息位置,Kafka采取了分片和索引的机制来加速定位。在kafka的存储log的地方,即文件的地方,会存在消费的偏移量以及具体的分区信息,分区信息的话主要包括

.index

.log

文件组成。

分区目的是为了备份,所以同一个分区存储在不同的Broker上,即当

third-2

存在当前机器

kafka01

上,实际上在

kafka03

中也有这个分区的文件(副本),分区中包含副本,即一个分区可以设置多个副本,副本中有一个是Leader,其余为Follower。如果

.log

文件超出大小,则会产生新的

.log

文件。如下所示:

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

2.2.2 消息查询的机制

  为了减小index文件大小,Kafka采用稀疏索引,即每隔一定字节的数据建立一条索引。index文件存储的是<key, value>对的数据,key是对应本log文件中消息的编号,value为消息位于log文件中的物理偏移量,log文件名+index文件的key可以唯一确定消息在partition中的偏移量。

上面分区中Segment-1中的第4条消息的offset(消息数的offset)就是

6 + 4

,index文件名

6

代表的消息数的偏移量,然后在6.log文件中找到物理偏移量为759的就是目标消息的起始位置。

2.3 生产者的消息分区策略

一、分区的原因

  • 方便在集群中扩展:每个Partition通过调整以适应它所在的机器,而一个Topic又可以有多个Partition组成,因此整个集群可以适应适合的数据。
  • 可以提高并发:以Partition为单位进行读写。类似于多路。

二、消息进入哪个分区?

  • 指明partition(这里的指明是指第几个分区)的情况下,直接将指明的值作为partition的值。
  • 没有指明partition的情况下,但是存在值key,此时将key的hash值与topic的partition总数进行取余得到partition值。
  • 值与partition均无的情况下,第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与topic可用的partition总数取余得到partition值,即round-robin算法。

2.4 生产者ISR

  Kafka为了保证数据的一致性使用了ISR机制(in-sync replica set, 副本同步集),ISR是一个副本的列表,里面存储的都是能跟Leader数据最近的副本。Kafka的数据是多副本的,每个Topic下的每个分区下都有一个Leader 和多个Follower,每个Follower都是主动从Leader拉取数据来和Leader保持数据同步的。Flower只提供数据的可恢复性,生产者和消费者都是从Leader读写数据。

一、Kafka引入ISR机制的原因?

    Leader和Flower同步去同步数据效率低下,异步去同步数据可能造成数据的丢失,所以原因如下:

  •   1. 尽可能保证数据同步的效率,同步效率不高的副本会被踢出ISR列表。
  •   2. 避免数据的丢失,因为ISR列表里的副本和Leader的数据是最接近的。

二、ISR里面存储的都是能跟Leader数据保持一致的副本,如何判断一个副本是否要保留到ISR中?

  • 根据副本和Leader 的交互时间差,如果大于某个时间差 就认定这个副本不行了,就把此副本从IRS中剔除,此时间差根据配置参数rerplica.lag.time.max.ms=10000 决定(单位ms)。
  • 根据Leader和副本的消息条数差值决定是否从ISR中剔除此副本,此消息条数差值根据配置参数rerplica.lag.max.messages=4000 决定(单位条)。

2.5生产者ACK机制

min.insync.replicas=n

配置参数表示当满足了n个副本的消息确认(n默认为1,最好大于1,因为Leader 也在ISR列表中),才认为这条消息是发送成功的。

min.insync.replicas

参数只有配合

request.required.acks =-1

时才能达到最大的可靠性。request.required.acks的参数说明如下:

  • 0:生产者只管发送,不管消息是否已落盘,直接返回。(当broker故障时有可能丢失数据
  • 1:Leader把消息落盘后就返回。(如果在follower同步成功之前leader故障,那么将丢失数据)
  • -1(all):只有ISR中的n-1个副本(Leader 除外所以n-1)都同步了消息 此消息才确认发送成功。(发送ack之前,如果leader发生故障,会造成数据重复

  为保证producer发送的数据能够可靠的发送到指定的topic中,Topic的每个Partition收到producer发送的数据后,都需要向producer发送ack(

acknowledgement

),如果producer收到ack就会进行下一轮的发送,否则重新发送数据

发送ack的时机

  Kafka会在所有Follower与Leader同步完成,Leader再向生产者发送ack,主要是确保Follower中数据不丢失。在Leader挂掉之后,会从Follower中选出新的Leader。

注意:生产者发送的消息只有在Leader发送ack后,才能被消费者消费。

2.6 副本间的数据一致性

  • **LEO(Log End Offset)**:每个副本最后的一个offset。
  • **HW(High Watermark)**:高水位,指代消费者能见到的最大的offset,ISR队列中最小的LEO。

Follower故障和Leader故障发生故障时的处理

Follower故障:Follower发生故障后会被临时提出ISR,等待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将Log文件高于HW的部分截取掉,从HW开始向Leader进行同步,等待该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。

leader故障:Leader发生故障之后,会从ISR中选出一个新的Leader,为了保证多个副本之间的数据的一致性,其余的Follower会先将各自的Log文件高于HW的部分截掉,然后从新的Leader中同步数据。

注意:这两种处理只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

2.7 Kafka Exactly-once

2.7.1 语义介绍

  • At-least-once(最少一次):如果生产者从Kafka broker接收到一个确认(ack)并且ack = -1,这意味着消息已经被写入Kafka Topic一次。然而,如果生产者收到ack超时或收到一个错误,它可能会重试发送消息,但在消息被成功写入Kafka Topic后,此重试将导致消息被写入两次,因此将不止一次地传递给最终使用者。这种方法可能会导致消息重复的结果。
  • At-most-once(最多一次):如果在ack超时或返回错误时生产者没有重试,那么消息可能最终不会被写入Kafka Topic,因此不会传递给消费者。这种方法避免了消息重复的可能性,但可能会导致消息丢失。
  • Exactly-once(精确一次):即使生产者重复发送消息,消息也只被传递一次给最终消费者。该语义是最理想的,但也难以实现,因为它需要消息系统本身与生产和消费消息的应用程序进行协作。

2.7.2 Kafka 中 Exactly-once 面临的问题

理想状况,网络良好,代码没有错误,则 Kafka 可以保证 Exactly-once,但生产环境错综复杂,故障几乎无法避免,主要有:

  • Broker副本全部失效:Kafka 作为一个高可用、持久化系统,保证每条消息被持久化并且冗余多份(假设是 n 份),所以 Kafka 可以容忍 n-1 个 Broker 故障,意味着一个分区只要至少有一个 Broker 可用,分区就可用。Kafka 的副本协议保证了只要消息被成功写入了主副本,它就会被复制到其他所有的可用副本(ISR)。
  • Producer 到 Broker 的 RPC 失败,可能重复发送消息:Kafka 的持久性依赖于生产者接收 Broker 的 ack 。Broker 可能在写入消息后,发送 ack 给生产者的时候挂了,甚至 Broker 也可能在写入消息前就挂了。由于生产者没有办法知道错误是什么造成的,所以它就只能认为消息没写入成功,并且会重试发送。在一些情况下,这会造成同样的消息在 Kafka 分区日志中重复,进而造成消费端多次收到这条消息。
  • 客户端消费消息失败:一旦新的客户端实例已经启动,它必须能够从失败实例的任何状态中恢复,并从安全点( safe checkpoint )开始处理,这意味着消费的偏移量必须始终与生成的输出保持同步。

2.7.3 如何保证 Exactly-once

2.7.3.1 从业务侧来保证 Exactly-once

一、生产者方要做的控制:

  • 每个分区只有一个生产者写入消息,当出现异常或超时,生产者查询此分区最后一个消息,用于决定后续操作时重传还是继续发送。
  • 为每个消息增加唯一主键,生产者不做处理,由消费者根据主键去重。

二、消费者方要做的控制:

  • 关闭自动提交 offset 的功能,不使用 Offsets Topic 这个内部 Topic 记录其 offset,而是由消费者自动保存 offset。将 offset 和消息处理放在一个事务里面,事务执行成功认为消息被消费,否则事务回滚需要重新处理。当出现消费者重启或者 Rebalance 操作,可以从数据库找到对应的 offset,然后调用 KafkaConsumer.seek() 设置消费者位置,从此 offset 开始消费。

2.7.3.2 从Kafka侧来保证 Exactly-once

2.7.3.2.1 幂等性的方式:每个分区中精确一次且有序

  Kafka 0.11.0.0版本引入了幂等语义。 一个幂等性的操作就是一种被执行多次造成的影响和只执行一次造成的影响一样的操作。对于单个分区而言,幂等生产者不会因为生产者或 Broker 故障而产生多条重复消息。想要开启这个特性,获得每个分区内的精确一次语义,也就是说没有重复,没有丢失,并且有序的语义,只需要 producer 配置

enable.idempotence=true

。这种方式只能保证单个 Producer 对于同一个(Topic, Partition)的 Exactly Once 语义。PID在重启之后会发生变化,同时不同的Partition也具有不同的主键,所以幂等性无法保证跨分区跨会话的Exactly Once。

Kafka实现 Exactly-once 的方式:

  每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,该PID对用户完全透明而不会暴露给用户。在底层,它和 TCP 的工作原理有点像,每一批发送到 Kafka 的消息都将包含 PID 和一个从 0 开始单调递增序列号。将使用这个序列号来删除重复的发送。和只能在瞬态内存中的连接中保证不重复的 TCP 不同,这个序列号被持久化到副本日志,所以,即使分区的 Leader 挂了,其他的 Broker 接管了Leader,新 Leader 仍可以判断重新发送的是否重复了。这种机制的开销非常低:每批消息只有几个额外的字段。这种特性比非幂等的生产者只增加了可忽略的性能开销。

  • 如果消息序号比 Broker 维护的序号大 1 以上,说明中间有数据尚未写入,也即乱序,此时 Broker 拒绝该消息。

  • 如果消息序号小于等于 Broker 维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息。

2.7.3.2.2 事务的方式:跨分区原子写入

  Kafka 现在通过新的 事务 API 支持跨分区原子写入。这将允许一个生产者发送一批到不同分区的消息,这些消息要么全部对任何一个消费者可见,要么对任何一个消费者都不可见。这个特性也允许在一个事务中处理消费数据和提交消费偏移量,从而实现端到端的精确一次语义。

  为了实现这种效果,应用程序必须提供一个稳定的(重启后不变)唯一的 ID,也即Transaction ID 。 Transactin ID 与 PID 可能一一对应。区别在于 Transaction ID 由用户提供,将生产者的 transactional.id 配置项设置为某个唯一ID。而 PID 是内部的实现对用户透明。

  为了保证新的 Producer 启动后,旧的具有相同 Transaction ID 的 Producer 失效,每次 Producer 通过 Transaction ID 拿到 PID 的同时,还会获取一个单调递增的 epoch。由于旧的 Producer 的 epoch 比新 Producer 的 epoch 小,Kafka 可以很容易识别出该 Producer 是老的 Producer 并拒绝其请求。

下面是的代码片段演示了事务 API 的使用:

Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 初始化事务,包括结束该Transaction ID对应的未完成的事务(如果有)
// 保证新的事务在一个正确的状态下启动
producer.initTransactions();
// 开始事务
producer.beginTransaction();
// 消费数据
ConsumerRecords<String, String> records = consumer.poll(100);
try{
    // 发送数据
    producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
    // 发送消费数据的Offset,将上述数据消费与数据发送纳入同一个Transaction内
    producer.sendOffsetsToTransaction(offsets, "group1");
    // 数据发送及Offset发送均成功的情况下,提交事务
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // 数据发送或者Offset发送出现异常时,终止事务
    producer.abortTransaction();
} finally {
    // 关闭Producer和Consumer
    producer.close();
    consumer.close();
}

需要注意的是,上述的事务保证是从 Producer 的角度去考虑的。从 Consumer 的角度来看,该保证会相对弱一些。尤其是不能保证所有被某事务 Commit 过的所有消息都被一起消费,因为:

  • 对于压缩的 Topic 而言,同一事务的某些消息可能被其它版本覆盖。
  • 事务包含的消息可能分布在多个 Segment 中(即使在同一个 Partition内),当老的 Segment 被删除时,该事务的部分数据可能会丢失。
  • Consumer 在一个事务内可能通过 seek 方法访问任意 Offset 的消息,从而可能丢失部分消息。
  • Consumer 可能并不需要消费某一事务内的所有 Partition,因此它将永远不会读取组成该事务的所有消息。

2.8 消费者分区分配策略

标签: kafka 分布式 java

本文转载自: https://blog.csdn.net/weixin_43060848/article/details/124936624
版权归原作者 北辰的夏天 所有, 如有侵权,请联系我们删除。

“Kafka调研”的评论:

还没有评论