0


Kafka学习--------Kafka Producer生产者发送消息流程详解

1. Kafka Producer生产者结构

在这里插入图片描述

2. 生产者发送消息流程

2.1 生产者生成某个消息后,首先会经过一个或多个组成的拦截器链。
2.2 当消息通过所有的拦截器之后,会进行序列化,会根据key和value的序列化配置进行序列化消息内容,生产者和消费者必须使用相同的key-value序列化方式。

// 消息key序列化
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 消息value序列化
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

2.3 经过序列化后,会根据自定义的分区器或者Kafka默认的分区器进行获取消息的所属的分区。
自定义分区器可以参考下面。
Kafka默认的分区器规则:
1.当消息的key存在时,首先获取当前topic下的所有分区数,然后对key进行求hash值,根据hash值和分区总数进行取余,获取所属的的分区。
2.如果key不存在时,会根据topic获取一个递增的数值,然后通过和分区数进行取余,获取所属的分区。

Kafka默认分区器源码:

publicclassDefaultPartitionerimplementsPartitioner{privatefinalConcurrentMap<String,AtomicInteger> topicCounterMap =newConcurrentHashMap<>();publicvoidconfigure(Map<String,?> configs){}publicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if(keyBytes ==null){int nextValue =nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if(availablePartitions.size()>0){int part =Utils.toPositive(nextValue)% availablePartitions.size();return availablePartitions.get(part).partition();}else{// no partitions are available, give a non-available partitionreturnUtils.toPositive(nextValue)% numPartitions;}}else{// hash the keyBytes to choose a partitionreturnUtils.toPositive(Utils.murmur2(keyBytes))% numPartitions;}}privateintnextValue(String topic){AtomicInteger counter = topicCounterMap.get(topic);if(null== counter){
            counter =newAtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);if(currentCounter !=null){
                counter = currentCounter;}}return counter.getAndIncrement();}publicvoidclose(){}}

自定义分区器:

publicclassCustomerPartitionsimplementsPartitioner{@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){int partition =0;if(key ==null){}else{String keyStr = key.toString();if(keyStr.contains("Test")){
                partition =1;}else{
                partition =2;}}return partition;}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> configs){}}

**2.4 获取到消息所属的分区后,消息会被存放到消息缓冲区中(RecordAccumulator)中,根据topic和分区可以确定一个双端队列(Deque)中,这个队列中每个节点为多个消息的合集(ProducerBatch),新的消息会被放到队列的最后一个节点上,存放会存在多种情况。 **

前置知识:

  1. RecordAccumulator:每一个是生产上都会维护一个固定大小的内存空间,主要用于合并单条消息,进行批量发送,提高吞吐量,减少带宽消耗。
  2. RecordAccumulator的大小是可配置的,可以配置buffer.memory来修改缓冲区大小,默认值为:33554432(32M)
  3. RecordAccumulator内存结构分为两部分 第一部分为已经使用的内存,这一部分主要存放了很多的队列。每一个主题的每一个分区都会创建一个队列,来存放当前分区下待发送的消息集合。 第二部分为未使用的内存,这一部分分为已经池化后的内存和未池化的整个剩余内存(nonPooledAvailableMemory)。池化的内存的会根据batch.size(默认值为16K)的配置进行池化多个ByteBuffer,放入一个队列中。所有的剩余空间会形成一个未池化的剩余空间。

2.4.1 场景一:消息大小不足16K。
首先会根据topic和分区获取所属队列的最后一个ProducerBatch,
2.4.1.1 如果最后一个ProducerBatch+当前消息 <= 16K时,会把当前消息存入这个ProducerBatch中,等待发送。
2.4.1.2 如果最后一个ProducerBatch+当前消息 > 16K时,此时消息不会放入这个ProducerBatch中,而是会向池化的队列中获取一个ByteBuffer,把这个ByteBuffer放到队列的尾部,然后把消息放入这个新增的ProducerBatch中。
2.4.1.3 如果最后一个ProducerBatch+当前消息 > 16K时,并且池化的队列中没有可用的ByteBuffer时,池化队列会向剩余的未使用的内存空间(nonPooledAvailableMemory)申请一个大小为16K的内存空间,添加到池化队列尾部。然后把这个新增的ByteBuffer添加到分区下的队列尾部,存储新的消息。

2.4.2 场景二:消息大小超过16K
** 2.4.2.1 当消息超过16K时,任何一个ProducerBatch都无法存储这个消息。此时会直接向剩余的空间(nonPooledAvailableMemory)的进行分配和当前的消息大小一样的内存空间,加到队列的尾部,然后存储消息,等待发送。**
2.4.2.2 当剩余的空间(nonPooledAvailableMemory) < 消息大小时,nonPooledAvailableMemory会向池化队列获取空间,每次获取一个ByteBuffer(16K),直到nonPooledAvailableMemory的空间大于或等于消息大小时。获取的ByteBuffer会经过jvm的GC垃圾回收。过程比较慢。当nonPooledAvailableMemory空间大于获取等于消息大小时,会把分配消息大小的空间放入分区队列的尾部,把消息存入这个ProducerBatch内。

2.5 生产者会有一个send线程,用于不断的获取消息和发送消息。sender线程会不断的扫描RecordAccumulator中所有的ProducerBatch,如果ProducerBatch达到batch.size(默认16K)大小或者最早的一个消息已经等待超过linger.ms(默认为0)时,这个ProducerBatch会被sender线程收集到。由于不同的topic和分区会被分到不同的Broker节点上,sender线程会把发送到相同Broker姐节点的ProducerBatch合并在一个Request请求中,一个Request请求不会超过max.request.size(默认1048576B = 1M)

2.6 每个请求都会缓存在一个inFlightRequest缓冲区内,里面为每一个Broker分配了一个队列。新的请求会放在队列尾部,每个队列最多能够容纳max.in.flight.requests.per.connection(默认值为5)个Request,队列满了不会产生新的Request。

2.7 selector获取到Request会发往相对应的Broker节点。Broker节点收到Request后会进行ACK确认这个Request

acks 有三个配置值:[-1 , 0 , 1]
acks = -1 表示不需要收到leader节点的ACK回复就会发送下一个Request。高吞吐,低一致性
acks = 0 表示只需要接收到leader节点的ACK后就可以发送下一个Request。
acks = 1 表示 需要接收到leaer节点和ISR节点的ACK后才会发送下一个Request。一致性较高

2.8 当收到Broker对某个Request的ACK后,会删除inFlightRequest队列中这个Request。然后调用clear方法清除对应的ProducerBatch。
RecordAccumulator Clear清理场景:
针对2.4.1.1,2.4.1.2,2.4.1.3,ProducerBatch都会标记为删除,然后放入池化队列中,不会进行GC。2.4.1.3中从nonPooledAvailableMemory获取的内存也不会归还給nonPooledAvailableMemory,任然存放在池化队列中。

针对2.4.2.1,2.4.2.2,超过16K的消息内存空间会被GC进行回收,然后作为nonPooledAvailableMemory的一部分

2.9 如果发送过程中产生了异常,消息发送会存在重试机制。条件为重试次数小于指定值&&异常为RetriableException

privatebooleancanRetry(ProducerBatch batch,ProduceResponse.PartitionResponse response){return batch.attempts()<this.retries &&((response.error.exception()instanceofRetriableException)||(transactionManager !=null&& transactionManager.canRetry(response, batch)));}

在这里插入图片描述在这里插入图片描述

标签: kafka 学习 java

本文转载自: https://blog.csdn.net/qq_43719634/article/details/125964229
版权归原作者 卷王中王 所有, 如有侵权,请联系我们删除。

“Kafka学习--------Kafka Producer生产者发送消息流程详解”的评论:

还没有评论