0


Kafka由浅入深(3)一文读懂弃用默认分区器DefaultPartitioner KIP-794

一、弃用默认分区器的原因

默认分区全路径类名:org.apache.kafka.clients.producer.internals.DefaultPartitioner

  • 如果消息中指定了分区,则使用它
  • 如果未指定分区但存在key,则根据序列化key使用murmur2哈希算法对分区数取模。
  • 如果不存在分区或key,则会使用粘性分区策略(2.4.0版本开始),关于粘性分区请参阅 KIP-480。

KIP-480:Sticky Partitioner引入了 UniformStickyPartitioner 并使其成为默认分区器。事实证明,尽管被称为统一粘性分区器( UniformStickyPartitioner),但粘性分区器存在问题而实际上并没有统一。粘性分区器在改进之前存在的问题,是Kafka弃用默认分区器DefaultPartitioner 和UniformStickyPartitioner 的重要原因。

什么是粘性分区Sticky Partitioner:

首先,我们指定,Producer在发送消息的时候,会将消息放到一个ProducerBatch中, 这个Batch可能包含多条消息,然后再将Batch打包发送

1.1 存在的问题——分配倾斜:

它实际上将更多消息分配给速度较慢的broker,并且可能导致“失控”的问题。当临时慢的broker出现倾斜分配,会使这个broker获取更多的记录,broker服务器压力增大,并因此变得更慢。这反过来又使消息分配更加倾斜,并且问题持续存在并更加恶化,消息分配及其不均匀,严重的情况一下可能导致这个服务器宕机。

1.2 分配倾斜出现的原因:

a、消息分配不均匀,导致broker压力过大

因为“粘性”时间是由新的批量创建消息驱动的,这与broker的延迟成反比——较慢的broker消耗批量消息的速度较慢,因此它们会比速度更快的分区获得更多的“粘性”时间,从而是消息分配倾斜。

假设一个生产者写入的消息到3个分区(生产者配置为linger.ms=0),并且一个partition由于某种原因(leader broker选举更换或网络抖动问题等情况)导致稍微变慢了一点。生产者必须一直保留发送到这个partition的批次消息,直到partition变得可用。在保留这些批次消息的同时,因为生产者还没有准备好发送到这个分区,其他批次的消息又大批量发送的并开始堆积,从而可能导致每个批次都可能会被填满。

linger.ms :

这个参数用来指定生产者发送ProducerBatch之前等待更多消息(ProducerRecord)加入ProducerBatch的时间,默认值为0。生产者客户端会在ProducerBatch被填满或等待时间超过linger.ms值时发送出去。增大这个参数会增加消息的延迟,但同时能提升一定的吞吐量。

从粘性分区器的角度考虑这一点。每次选择慢速分区时,生产者都会完全填满批次。另一方面,由于 linger.ms=0的 设置,其他较快分区可能无法获取到它们的批次的消息。只要有一条记录可用,它就可能会被发送。所以更多的数据最终会被写入已经开始积压的partition。甚至在最初缓慢的原因(例如leader broker选举更换)得到解决之后,这种不平衡可能还需要一些时间才能得到恢复。如果这个慢partition无法承载消息堆积起来的压力,这甚至会产生失控效应。

1.3 案例分析:

如下是基于可能会发生分配倾斜问题的案例分析。下面总结了 1 小时内对 3 个partition的写入。这里的partition 0 是“慢”分区。所有分区的批次数量大致相同,但慢速分区的批次大小要大得多。
partition

TotalBatches
TotalBytes
TotalRecords
BytesPerBatch
RecordsPerBatch

0

1683
25953200
25228
15420.80 
14.99

1

1713
7836878
4622
4574.94
2.70

2

1711
7546212
4381

4410.412.56
虽然重新启动应用程序,生产者可以再次恢复正常,但是它只是无法在工作量不平衡的情况下进行恢复。

b、无法高效的创建批次消息

这不是唯一的问题。即使有大量速度都一样快broker,生产者设置了linger.ms=0参数,粘性分区器也不会高效地创建批次。考虑这种情况,假设我们有 30 个partition,每个partition都有一个leader在自己的broker。

  1. 产生一条记录,partitioner 分配给 partition1,批次 准备好并立即发送出去
  2. 生成一条记录,partitioner 看到创建了一个新批次,触发重新分配,分配给 partition2,批次准备就绪并立即发送出去
  3. 生成一条记录,partitioner 看到创建了一个新批次,触发重新分配,分配给 partition3,批次准备就绪并立即发送出去
  4. .......

以此类推,如此重复整个循环下去。在第一个broker上的第一个消息可能会完成,在这种情况下,单个记录批次可能会再次准备好。当broker的数量很少时,这可能没什么大不了的,但是了解很好的了解动态过程。

所以从某种意义上说,UniformStickyPartitioner 既不均匀也不足够粘性。

二、改进方案

生产者客户端3.3.0版本之后的改动方案: 核心思想是对粘性分区策略问题的优化方案, 这些改动方案不会影响有key消息的分区逻辑,只会影响非key消息的分区逻辑

2.1、公共接口(Public Interface)

2.1.1 更改了默认配置

partitioner.class将具有默认值null。当 partitioner.class 没有显式设置为自定义分区器类时,生产者使用的分区逻辑实在

KafkaProducer实现的

. 不推荐使用

DefaultPartitioner(

默认分区

UniformStickyPartitioner(统一粘性分区)。不应

设置 partitioner.class=UniformStickyPartitioner, 并且partitioner.ignore.keys配置应设置为“true”。

改进的主要变化

改进主要针对消息中没有指定key的默认分区逻辑,它代替了默认分区器中实现的相应逻辑。新版的默认分区逻辑将在

KafkaProducer中自己实现

KafkaProducer

将检查partitioner.class是否设置为null并在这种情况下实现默认分区逻辑。如果partitioner.ignore.代了keys设置为“true”,那么即使具有键的消息也会在partition之间均匀分布。

   /**
     * computes partition for given record.
     * if the record has partition returns the value otherwise
     * calls configured partitioner class to compute the partition.
     */
   // 2.8.0版本 KafkaProducer的partition()方法逻辑,分区的默认处理逻辑在DefaultPartioner中实现
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

上面的代码2.8.0版本 KafkaProducer的partition()方法逻辑,分区的默认处理逻辑在DefaultPartioner中实现


    
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        // 如果消息中指定发送分区,则直接返回分区号
        if (record.partition() != null)
            return record.partition();

        // 如果设置了自定义分区,则通过自定义分区获取分区号
        if (partitioner != null) {
            int customPartition = partitioner.partition(
                record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
            if (customPartition < 0) {
                throw new IllegalArgumentException(String.format(
                    "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
            }
            return customPartition;
        }
        
        if (serializedKey != null && !partitionerIgnoreKeys) {
            // 消息key不为空,并且配置partitioner.ignore.keys=false,则通过hash进行分区          
            return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
        } else {
            // 如果消息key为空,或者配置partitioner.ignore.keys=true,则返回分区号为-1
            return RecordMetadata.UNKNOWN_PARTITION;
        }
    }

上面的代码3.3.0版本 KafkaProducer的partition() 方法逻辑,分区的默认分区在partition()方法中实现

RecordMetadata.UNKNOWN_PARTITION(值为-1)表示可以使用任何分配(分区号的计算是基于内置的分区逻辑)


新增配置说明:

**partitioner.adaptive.partitioning.enable **

默认值为“true”,如果为true,则生产者将尝试适应代理性能并为托管在更快代理上的分区生成更多消息。如果为“false”,则生产者将尝试随机分配分区。

partitioner.availability.timeout.ms

默认值为 0。如果该值大于 0 并且启用了自适应分区,并且代理在partitioner.availability.timeout.ms

毫秒内无法接受对分区的生产请求,则该分区被标记为不可用。如果值为 0,则禁用此逻辑。如果通过将partitioner.adaptive.partitioning.enable设置为“false”禁用了自适应分区,则忽略此配置。

partitioner.ignore.keys

默认值为“false”,如果为“false”,则生产者使用消息键(如果指定)来选择分区,如果为“true”,则生产者不使用消息键来选择分区,即使它是指定的。

请注意,新配置适用于未指定partitioner.class时使用的分区算法。如果使用自定义分区器,它们没有任何效果。

2.1.2 统一的粘性批量大小

不是在每次创建批处理时切换分区,而是在每次生成batch.size字节时切换分区。假设我们正在生产分区 1。在分区 1 产生 16KB 后,我们切换到分区 42。在分区 42 产生 16KB 后,我们切换到分区 3。依此类推,无论批处理等发生什么,我们都会这样做。只需计算生成到分区的字节数。这样,即使 linger.ms=0,分布也将是均匀的(可能存在小的暂时不平衡)和粘性,因为更多的连续记录被定向到一个分区,从而允许它创建更好的批次。

让我们考虑一下,使用严格统一的粘性分区器和 linger.ms=0 和 30 个分区,每个分区都在自己的代理上,批处理将如何不同。

  1. 产生一条记录,partitioner 分配给 partition1,batch 准备好并立即发送出去
  2. 产生了一条记录,partitioner 仍然卡在 partition1 上,batch 准备好并立即发送出去
  3. 一样
  4. --
  5. --
  6. 产生了一条记录,partitioner 仍然粘在 partition1 上,现在我们有 5 个 in-flight,所以开始批处理

批处理将继续进行,直到进行中的批处理完成,或者我们达到了batch.size字节并移动到下一个分区。这样,开始批处理只需要 5 条记录。发生这种情况是因为一旦我们有 5 个在飞行中,新批次将不会立即发出,直到至少在飞行中的批次并不断积累记录。使用当前的解决方案,需要 5 倍数量的分区才能有足够的批次在运行,因此不会立即发送新批次。随着生产速度的加快,当 5 个批次已经在进行中时,可以积累更多的记录,因此将使用更大的批次来实现更高的生产率以维持更高的吞吐量。

如果其中一个代理具有更高的延迟,则托管在该代理上的分区的记录将形成更大的批次,但仍将是相同数量的记录在更大的批次中发送频率较低,逻辑会自动适应这一点。

综上所述,统一粘性分区器具有以下优点:

  1. 它是统一的,易于实现且易于理解。直观地说,这是用户所期望的。
  2. 它创建了更好的批次,不会在低生产率时增加延迟,而是在高生产率时切换到更好的批次。
  3. 它适应更高延迟的代理,使用更大的批次来推送数据,保持吞吐量和数据分布均匀。
  4. 它很有效(选择分区的逻辑不需要复杂的计算)。

从实现的角度来看,分区器没有足够的信息来计算记录中的字节数,即:

  • 标头
  • 压缩信息
  • 批头信息(一个分区可以驻留在更快的代理上并获得更多更小的批次,我们需要考虑到这一点以实现一致性)
  • 记录开销(由于做 var int 优化)。不确定它是否重要,但似乎比证明它不重要更容易实现。

可以在

RecordAccumulator

对象中轻松收集此信息,因此

KafkaProducer

该类将计算分区信息,

RecordAccumulator

除非显式设置自定义分区器类。

2.1.3 自适应分区切换

严格的统一粘性分区器切换的一个潜在缺点是,如果其中一个broker落后(无法维持其吞吐量份额),记录将继续堆积在累加器中,最终会耗尽缓冲池内存并降低生产速度以匹配最慢代理的容量。为了避免这个问题,使partition切换决策可以自适应broker负载。

a、切换策略:分配分区的概率与队列长度成反比

等待发送的批次的队列大小是broker负载的直接指示(负载越多的broker将拥有更长的队列)。选择下一个分区时会考虑队列的长度进行分区切换。改进的策略是,选择分区的概率与队列长度成反比,即队列较长的分区不太可能被选择。

b、设置分区可用超时时间

除了基于队列大小的逻辑之外,partitioner.availability.timeout.ms可以设置为非 0 值,在这种情况下,准备发送批次的分区超过partitioner.availability.timeout.ms毫秒,将是标记为不可用于分区,并且在broker能够接受来自分区的下一个就绪批次之前不会被选择。

如果不想使用自适应分区切换逻辑,可以通过设置partitioner.adaptive.partitioning.enable = false来关闭。

从实现的角度来看,分区器对队列长度或broker运行情况一无所知(但该信息可以在

RecordAccumulator

对象中收集),因此

KafkaProducer

该类将执行分区逻辑,

RecordAccumulator

除非显式设置了自定义分区器。

兼容性、弃用和迁移计划

  • DefaultPartitionerUniformStickyPartitioner将被弃用,因为它们的处理逻辑跟优化的版本一致。
  • 未指定自定义分区器将默认使用到这些优化策略。
  • 明确指定DefaultPartitionerUniformStickyPartitioner将收到弃用警告但并不会影响到老版本实现的功能。如果老版本升级到3.3.0以后,需要相应地更新配置以获得新的优化策略(如果替换UniformStickyPartitioner,则需要删除partitioner.class设置,和将partitioner.ignore.keys设置为“true” )。
  • Partitioner.onNewBatch将被弃用。

三、试验结果

测试设置

本地机器上3个kafka broker,一个topic有3个partition,RF=1,每个broker上一个partition:

    主题:foo 分区:0 领导者:1 副本:1 Isr:1 离线: 
    主题:foo 分区:1 领导者:0 副本:0 Isr:0 离线: 
    主题:foo 分区:2 领导者:2 副本:2 Isr:2 离线: 

Kafka-0 为每个生产响应注入了 20 毫秒的睡眠。

Kafka producer 的设置是默认的,而不是明确提到的。

3.1 测试系列 1

bin/kafka-producer-perf-test.sh --topic foo --num-records 122880 --print-metrics --record-size 512 --throughput 2048 --producer.config producer.properties 

该测试产生约 120K 记录,每个记录 512 字节(总计约 60MB),吞吐量限制为 2048 记录/秒(1MB/秒)

概括

分区配置

吞吐量

平均延迟

P99 延迟

P99.9 延迟
旧的 DefaultPartitioner0.84 MB/秒4072.74 毫秒10992 毫秒11214 毫秒partitioner.adaptive.partitioning.enable=false1 MB/秒49.92 毫秒 214 毫秒422 毫秒具有所有默认设置的新逻辑1 MB/秒40.06 毫秒154 毫秒220 毫秒partitioner.availability.timeout.ms=51 MB/秒36.29 毫秒150 毫秒184 毫秒

旧的 DefaultPartitioner(当前实现)

已发送 122880 条记录,每秒 1724.873666 条记录(0.84 MB/秒),平均延迟 4072.74 毫秒,最大延迟 11246.00 毫秒,第 50 次 3870 毫秒,第 95 次 10221 毫秒,第 99 次 10992 毫秒,第 99.9 次 11214 毫秒。

当前的实现有利于最慢的代理并设法处理 ~0.85 MB/s,因此延迟会随着时间的推移而增长。

生产者节点指标:输出字节总数:{client-id=perf-producer-client,node-id=node-0}:46826262.000
生产者节点指标:输出字节总数:{client-id= perf-producer-client, node-id=node-1} : 9207276.000 
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2} : 9004193.000

Kafka-0 代理占用的字节数比其他 2 个代理多约 5 倍,成为集群的瓶颈并可能扭曲下游数据分布。

新的统一分区器 (partitions.adaptive.partitioning.enable=false)

发送 122880 条记录,每秒 2043.198484 条记录(1.00 MB/秒),平均延迟 49.92 毫秒,最大延迟 795.00 毫秒,第 50 次 8 毫秒,第 95 次 150 毫秒,第 99 次 214 毫秒,第 99.9 次 422 毫秒。
生产者节点指标:输出字节总数:{client-id=perf-producer-client,node-id=node-0}:22237614.000
生产者节点指标:输出字节总数:{client-id= perf-producer-client, node-id=node-1} : 21606034.000 
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2} : 22152273.000

所有经纪人都承担大致相同的负载。最慢的代理不再超载,因此集群以 1MB/秒的速度拉动,延迟得到更多控制。

新的默认分区(所有设置都是默认的)

已发送 122880 条记录,每秒 2045.477245 条记录(1.00 MB/秒),平均延迟 40.06 毫秒,最大延迟 817.00 毫秒,第 50 次 7 毫秒,第 95 次 141 毫秒,第 99 次 154 毫秒,第 99.9 次 220 毫秒。

生产者节点指标:输出字节总数:{client-id=perf-producer-client,node-id=node-0}:19244362.000
生产者节点指标:输出字节总数:{client-id= perf-producer-client, node-id=node-1} : 23589010.000 
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2} : 23321818.000

在这里,自适应逻辑向较慢的代理发送较少的数据,因此较快的代理可以承受更多的负载并且延迟更好。数据分布并没有太大的倾斜,足以将负载调整到代理容量。

具有 partitioner.availability.timeout.ms=5 的新默认分区器

发送 122880 条记录,每秒 2044.218196 条记录(1.00 MB/秒),平均延迟 36.29 毫秒,最大延迟 809.00 毫秒,第 50 次 6 毫秒,第 95 次 138 毫秒,第 99 次 150 毫秒,第 99.9 次 184 毫秒。

producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-0} : 17431944.000 
producer-node-metrics:outgoing-byte-total:{client-id= perf-producer-client, node-id=node-1} : 25781879.000 
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2} : 22977244.000

在这里,自适应逻辑对延迟的响应更快,向较慢的代理发送的数据更少,这增加了由较快的代理处理的数据组合。请注意,我花了一些实验来找到一个产生影响的好值,因此这证实了设计决定,即默认情况下该逻辑应关闭,仅在调整到特定配置和工作负载后才打开。

3.2 测试系列 2

在这个测试系列中,吞吐量限制增加到 2MB/秒。

bin/kafka-producer-perf-test.sh --topic foo --num-records 122880 --print-metrics --record-size 512 --throughput 4096 --producer.config producer.properties

该测试产生约 120K 记录,每个记录 512 字节(总计约 60MB),吞吐量限制为 4096 记录/秒(2MB/秒)

新的统一分区器 (partitions.adaptive.partitioning.enable=false)

已发送 122880 条记录,每秒 3789.317873 条记录(1.85 MB/秒),平均延迟 426.24 毫秒,最大延迟 2506.00 毫秒,第 50 次 8 毫秒,第 95 次 2065 毫秒,第 99 次 2408 毫秒,第 99.9 次 2468 毫秒。

生产者节点指标:传出字节总数:{client-id=perf-producer-client,node-id=node-0}:22396882.000
生产者节点指标:传出字节总数:{client-id= perf-producer-client, node-id=node-1} : 21652393.000 
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2} : 21355134.000

此处的代理承担相同的负载,看起来最慢的代理已被最大化,因此集群只能占用 1.85MB/s 并且延迟会增长。

新的默认分区(所有设置都是默认的)

发送了 122880 条记录,每秒 4078.327249 条记录(1.99 MB/秒),平均延迟 34.66 毫秒,最大延迟 785.00 毫秒,第 50 次 4 毫秒,第 95 次 143 毫秒,第 99 次 167 毫秒,第 99.9 次 297 毫秒。

生产者节点指标:输出字节总数:{client-id=perf-producer-client,node-id=node-0}:14866064.000
生产者节点指标:输出字节总数:{client-id= perf-producer-client, node-id=node-1} : 25176418.000 
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-2} : 25581116.000

自适应逻辑设法将负载重新分配给更快的代理,以维持 2MB/秒的吞吐量和稳定的延迟。请注意,这里为了调整负载以适应代理容量,在分布中做出了更大的偏差(相对于 1MB/秒的节流),这表明自适应逻辑的偏差刚好足以保持延迟稳定。

4、默认分区DefaultPartitioner的替代品

KafkaProducer 显式检查 DefaultPartitioner

KafkaProducer 可以显式检查,是否在

RecordAccumulator中

使用

DefaultPartitioner

或者

UniformStickyPartitioner

分区器进行分区逻辑处理。

Partitioner.partition 返回 -1

Partitioner.partition 方法可以返回 -1 以指示默认的分区决策应该由生产者自己做出。现在,Partitioner.partition 需要返回一个有效的分区号。这背离了分区逻辑(包括默认分区逻辑)完全封装在分区器对象中的范式,这种封装不再有效,因为它需要只有生产者(Sender,RecordAccumulator)才能知道的信息(例如队列大小、记录大小、代理响应能力等)。

如果生产者使用自定义分区,也是可以通过自定义分区逻辑获取分区号;但是如果分区器只想使用默认分区逻辑,它可以返回 -1 并让生产者找出分区来利用。

这似乎也比尝试在默认分区器中保留(部分)分区逻辑封装更具未来性,因为如果将来我们支持其他扩展逻辑,我们可以只更改生产者中的逻辑而无需扩展分区器接口传递附加信息。

public class BuiltInPartitioner {
    private final Logger log;
    private final String topic;
    private final int stickyBatchSize;

    private volatile PartitionLoadStats partitionLoadStats = null;
    private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = new AtomicReference<>();
    ...
    
    // 获取到当前粘性分区信息
    StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
        
        StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
        if (partitionInfo != null)
            return partitionInfo;

        // 第一次创建partitionInfo

        partitionInfo = new StickyPartitionInfo(nextPartition(cluster));
        // 基于CAS 更新partitionInfo
        if (stickyPartitionInfo.compareAndSet(null, partitionInfo))
            return partitionInfo;

        // 没有争抢到锁的时候,通过竞争获取
        return stickyPartitionInfo.get();
    }

   /**
     *  基于分区负载,计算下一个主题的分区
     */
    private int nextPartition(Cluster cluster) {
        // 随机一个数,用于后续随机分区计算
        int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt());

        // 分区负载的状态
        PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
        
        // 分区号
        int partition;

        if (partitionLoadStats == null) {
           // 如果分区负载状态为空,则获取到所有可用状态的分区
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            
            if (availablePartitions.size() > 0) {
                // 如果存在可用状态的分区,则随机一个可用状态分区并返回
                partition = availablePartitions.get(random % availablePartitions.size()).partition();
            } else {  
                // 如果不存在可用状态的分区,则随机一个主题下得分区并返回
                List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
                partition = random % partitions.size();
            }
        } else {        
            // 如果当前处于分区负载状态
            assert partitionLoadStats.length > 0;
            
            // 分区负载的负载量
            int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable;
            // 随机负载的权重
            int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1];
      
            // 通过二分查找找到预期的分区并返回
            int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom);
            int partitionIndex = Math.abs(searchResult + 1);
            assert partitionIndex < partitionLoadStats.length;
            partition = partitionLoadStats.partitionIds[partitionIndex];
        }

        log.trace("Switching to partition {} in topic {}", partition, topic);
        return partition;
    }
}

弃用DefaultPartitioner的原因

在这样做之后,我们意识到默认分区器中对默认分区逻辑的期望封装无论如何都被破坏了,所以我们不妨将默认分区逻辑提升到生产者中,让默认分区器只通知生产者需要默认分区逻辑。将逻辑提升到生产者中的效率也稍高一些,因为拆分逻辑在生产者和分区器之间转换时需要多次查找各种映射,现在(返回 -1)查找一次,逻辑一次运行。

说明:

本文主要基于Kafka官方KIP-794说明翻译和解释,基于Kafka客户端版本3.3.0代码

Kafka官方KIP-794链接:

K​​​​​IP-794: Strictly Uniform Sticky Partitioner - Apache Kafka - Apache Software Foundation

标签: 大数据 java 算法

本文转载自: https://blog.csdn.net/dreamcatcher1314/article/details/127349140
版权归原作者 架构源启 所有, 如有侵权,请联系我们删除。

“Kafka由浅入深(3)一文读懂弃用默认分区器DefaultPartitioner KIP-794”的评论:

还没有评论