0


kafka学习(四):生产者发送消息的分区策略

    Kafka为了增加系统的**伸缩性(Scalability)**,引入了分区(Partitioning)的概念。

    Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。**主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。**

    通过这个设计,就可以以分区这个粒度进行数据读写操作,每个Broker的各个分区独立处理请求,进而实现负载均衡,提升了整体系统的吞吐量。

   ** 分区策略**是**决定生产者将消息发送到哪个分区的算法**。

1、默认的分区器

    kafka在数据生产的时候,有一个数据分发策略。默认的情况使用DefaultPartitioner.class类。这个类中就是定义数据分发的策略。

kafka默认的分区器:** org.apache.kafka.clients.producer.internals.DefaultPartitioner**

使用默认分区器,生产者创建消息时,根据 参数决定发送到哪个分区:

1.1、黏性分区策略(2.4.0之前是轮询)- 未指定分区、key

    既没有**partition**值又没有**key**值的情况下,**Kafka**采用**Sticky Partition(黏性分区器)**,会随机选择一个分区,并尽可能一直 使用该分区,待该分区的**batch**已满或者已完成,**Kafka**再随机选一个分区进行使用(和上一次的分区不同)。 

** Sticky Partitioning Strategy会随机地选择一个分区并会尽可能地坚持使用该分区——即所谓的粘住这个分区。**

原因:

    kafka 在发送消息的时候 , 采用批处理方案 , 当达到一批后进行分送 , 但是如果一批数据中有不同分区的数据 , 就无法放置到一个批处理中, 而老版本中轮询方案 , 就会导致一批数据被分到多个小的批次中 , 从而影响效率 , 故在新版本中 , 采用这种粘性的划分策略。

例如:

    第一次随机选择**0**号分区,等**0**号分区当前批次满了(默认**16k**)或者**linger.ms**设置的时间到, **Kafka**再随机一个分区进 行使用(如果还是**0**会继续随机)。

1.2、hash分区策略

      没有指明**partition**值,但有**key**的情况下,将**key**的**hash**值与**topic**的 **partition**数进行取余得到**partition**值。 **Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions**

    注意: 如果 key 一直不变,同一个 key 算出来的 hash 值是个固定值。如果是固定值,这种 hash 取模就没有意义。
     例如:

** key1hash=5key2hash值**=6topicpartition数**=2,那 么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。

1.3、指定partition策略

** 以上两种构造都会通过DefaultPartitioner进行数据分发操作。但指定分区后,不会调用DefaultPartitioner.partition() 方法。**

** **

    指明**partition**的情况下,直接将指明的值作为**partition**值; 例如partition=0,所有数据写入分区0。

2、自定义分区策略

     自定义分区策略 跟DefaultPartitioner实现方式一样。

1、创建一个类,实现Partitioner接口。

2、重写 partitioner中的方法,

    partitioner()方法的参数说明:

            参数1:topic       

            参数2:key值

            参数3:key值字节数组

            参数4:value数据

            参数5:value数据的字节数组

            参数6:集群对象

3、在 partitioner() 方法中编写自定义分区逻辑,返回分区编号。

4、在生产者配置信息中进行配置自定义分区:

spring.kafka.producer.properties.partitioner.class=配置类全路径

代码示例:

@Component
public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        String msgValues = value.toString();
        int partition;
        if (msgValues.contains("test")){
            partition = 0;
        }else {
            partition = 1;
        }
        return partition;
    }
    @Override
    public void close() {
        //Nothing to close
    }
    @Override
    public void configure(Map<String, ?> configs) {

    }
}
标签: kafka

本文转载自: https://blog.csdn.net/weixin_40482816/article/details/127509646
版权归原作者 炎升 所有, 如有侵权,请联系我们删除。

“kafka学习(四):生产者发送消息的分区策略”的评论:

还没有评论