Kafka为了增加系统的**伸缩性(Scalability)**,引入了分区(Partitioning)的概念。
Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。**主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。**
通过这个设计,就可以以分区这个粒度进行数据读写操作,每个Broker的各个分区独立处理请求,进而实现负载均衡,提升了整体系统的吞吐量。
** 分区策略**是**决定生产者将消息发送到哪个分区的算法**。
1、默认的分区器
kafka在数据生产的时候,有一个数据分发策略。默认的情况使用DefaultPartitioner.class类。这个类中就是定义数据分发的策略。
kafka默认的分区器:** org.apache.kafka.clients.producer.internals.DefaultPartitioner**
使用默认分区器,生产者创建消息时,根据 参数决定发送到哪个分区:
1.1、黏性分区策略(2.4.0之前是轮询)- 未指定分区、key
既没有**partition**值又没有**key**值的情况下,**Kafka**采用**Sticky Partition(黏性分区器)**,会随机选择一个分区,并尽可能一直 使用该分区,待该分区的**batch**已满或者已完成,**Kafka**再随机选一个分区进行使用(和上一次的分区不同)。
** Sticky Partitioning Strategy会随机地选择一个分区并会尽可能地坚持使用该分区——即所谓的粘住这个分区。**
原因:
kafka 在发送消息的时候 , 采用批处理方案 , 当达到一批后进行分送 , 但是如果一批数据中有不同分区的数据 , 就无法放置到一个批处理中, 而老版本中轮询方案 , 就会导致一批数据被分到多个小的批次中 , 从而影响效率 , 故在新版本中 , 采用这种粘性的划分策略。
例如:
第一次随机选择**0**号分区,等**0**号分区当前批次满了(默认**16k**)或者**linger.ms**设置的时间到, **Kafka**再随机一个分区进 行使用(如果还是**0**会继续随机)。
1.2、hash分区策略
没有指明**partition**值,但有**key**的情况下,将**key**的**hash**值与**topic**的 **partition**数进行取余得到**partition**值。 **Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions**
注意: 如果 key 一直不变,同一个 key 算出来的 hash 值是个固定值。如果是固定值,这种 hash 取模就没有意义。
例如:
** key1的hash值=5, key2的hash值**=6 ,topic的partition数**=2,那 么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。
1.3、指定partition策略
** 以上两种构造都会通过DefaultPartitioner进行数据分发操作。但指定分区后,不会调用DefaultPartitioner.partition() 方法。**
** **
指明**partition**的情况下,直接将指明的值作为**partition**值; 例如partition=0,所有数据写入分区0。
2、自定义分区策略
自定义分区策略 跟DefaultPartitioner实现方式一样。
1、创建一个类,实现Partitioner接口。
2、重写 partitioner中的方法,
partitioner()方法的参数说明: 参数1:topic 参数2:key值 参数3:key值字节数组 参数4:value数据 参数5:value数据的字节数组 参数6:集群对象
3、在 partitioner() 方法中编写自定义分区逻辑,返回分区编号。
4、在生产者配置信息中进行配置自定义分区:
spring.kafka.producer.properties.partitioner.class=配置类全路径
代码示例:
@Component
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
String msgValues = value.toString();
int partition;
if (msgValues.contains("test")){
partition = 0;
}else {
partition = 1;
}
return partition;
}
@Override
public void close() {
//Nothing to close
}
@Override
public void configure(Map<String, ?> configs) {
}
}
版权归原作者 炎升 所有, 如有侵权,请联系我们删除。