0


【项目实战】Kafka 生产者写入分区的策略

👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人

Java知识图谱点击链接:体系化学习Java(Java面试专题)

💕💕 感兴趣的同学可以收藏关注下不然下次找不到哟💕💕

✊✊ 感觉对你有帮助的朋友,可以给博主一个三连,非常感谢 🙏🙏🙏

在这里插入图片描述

文章目录

1、生产者写入分区的策略有哪些?

生产者写入分区的策略主要有以下几种:

  1. 轮询分区策略:生产者可以使用轮询策略将消息依次写入每个分区,实现负载均衡。在每次发送消息时,生产者会按照轮询的方式选择下一个可用的分区,并将消息写入该分区。这样可以确保消息均匀地分布在各个分区中。
  2. 随机分区策略:Kafka生产者随机的将消息写入分区,有可能会造成消息的分布不均,所以这个策略基本上也很少用。
  3. 按 key 分区策略:Kafka生产者基于消息的键(key)进行哈希计算,然后将消息写入对应的分区。这种策略可以保证具有相同键的消息被写入到相同的分区,从而保证消息的顺序性。
  4. 自定义分区策略:Kafka生产者可以使用自定义分区策略来决定将消息写入哪个分区。

2、轮询分区策略

轮询分区的代码如下:

importorg.apache.kafka.clients.producer.Partitioner;importorg.apache.kafka.common.Cluster;importorg.apache.kafka.common.PartitionInfo;importjava.util.List;importjava.util.Map;publicclassRoundRobinPartitionerimplementsPartitioner{privateint currentPartition;@Overridepublicvoidconfigure(Map<String,?> configs){// 初始化当前分区索引
        currentPartition =0;}@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// 轮询选择下一个分区int selectedPartition = currentPartition;
        currentPartition =(currentPartition +1)% numPartitions;return selectedPartition;}@Overridepublicvoidclose(){// 可选:清理资源}}

partition 方法会使用一个变量 currentPartition 来记录当前选择的分区索引。每次调用 partition 方法时,会将 currentPartition 增加 1,并通过取模运算来确保选择的分区索引始终在分区数范围内。

要使用轮询分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props =newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class","com.example.RoundRobinPartitioner");

3、随机分区策略

随机分区的代码如下:

importorg.apache.kafka.clients.producer.Partitioner;importorg.apache.kafka.common.Cluster;importorg.apache.kafka.common.PartitionInfo;importjava.util.List;importjava.util.Map;importjava.util.Random;publicclassRandomPartitionerimplementsPartitioner{privatefinalRandom random =newRandom();@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();return random.nextInt(numPartitions);}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> configs){}}

partition 方法会随机选择一个分区返回。 random.nextInt(numPartitions) 方法会生成一个小于分区数的随机数,作为分区的索引。

要使用随机分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props =newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class","com.example.RandomPartitioner");

4、按 key 分区策略

按 key 分区的代码如下:

importorg.apache.kafka.clients.producer.Partitioner;importorg.apache.kafka.common.Cluster;importorg.apache.kafka.common.PartitionInfo;importjava.util.List;importjava.util.Map;publicclassKeyPartitionerimplementsPartitioner{@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if(keyBytes ==null){// 如果 key 为 null,则使用轮询分区策略returnMath.abs(key.hashCode())% numPartitions;}else{// 使用 key 的哈希码来确定分区returnMath.abs(Utils.murmur2(keyBytes))% numPartitions;}}@Overridepublicvoidclose(){// 可选:清理资源}@Overridepublicvoidconfigure(Map<String,?> configs){// 可选:配置方法}}

partition 方法会检查 key 是否为 null。如果 key 为 null,就会使用轮询分区策略,通过计算 key 的哈希码并对分区数取模来确定分区。如果 key 不为 null,则使用 key 的字节数组的哈希码来确定分区。

要使用基于 key 的分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props =newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class","com.example.KeyPartitioner");

5、自定义分区策略

自定义分区的代码如下:

importorg.apache.kafka.clients.producer.Partitioner;importorg.apache.kafka.common.Cluster;importorg.apache.kafka.common.PartitionInfo;importjava.util.List;importjava.util.Map;publicclassCustomPartitionerimplementsPartitioner{@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// 自定义分区逻辑// 根据消息的 key 或 value 来选择分区// 这里以 key 的哈希值作为分区选择依据int partition =Math.abs(key.hashCode())% numPartitions;return partition;}@Overridepublicvoidclose(){// 可选:清理资源}@Overridepublicvoidconfigure(Map<String,?> configs){// 可选:配置分区器}}

partition 方法根据消息的 key 或 value 来选择分区。这里使用 key 的哈希值进行取模运算,以确保选择的分区索引在分区数范围内。

要使用自定义分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props =newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class","com.example.CustomPartitioner");

写在最后

通过y以上这些实现,生产者将根据自定义的分区策略来选择分区来发送消息。您可以根据自己的需求,实现不同的分区逻辑。

💕💕 本文由激流原创,原创不易,希望大家关注、点赞、收藏,给博主一点鼓励,感谢!!!
🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃
在这里插入图片描述

标签: kafka linq 分布式

本文转载自: https://blog.csdn.net/qq_37967783/article/details/131751346
版权归原作者 激流丶 所有, 如有侵权,请联系我们删除。

“【项目实战】Kafka 生产者写入分区的策略”的评论:

还没有评论