0


Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

kafka尚硅谷视频:

10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili

 1. producer初始化:加载默认配置,以及配置的参数,开启网络线程

 2. 拦截器拦截

 3. 序列化器进行消息key, value序列化

 4. 进行分区

 5. kafka broker集群 获取metaData

 6. 消息缓存到RecordAccumulator收集器,分配到该分区的DQueue(RecordBatch)

 7. batch.size满了,或者linker.ms到达指定时间,唤醒sender线程, 实例化networkClient

     RecordBatch ==>RequestClient 发送消息体,

  8. 与分区相同broker建立网络连接,发送到对应broker
  1. send()方法参数producerRecord对象:

关于分区:

  a.指定分区,则发送到该分区    

  b.不指定分区,k值没有传入,使用黏性分区(sticky partition)

             第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法   

  c.不指定分区,传入k值,k值先进行hash获取hashCodeValue, 再与topic下的分区数进行求模取余,进行分区。

  如 k hash = 5 topic目前的分区数2  则 分区为:1

      k  hash =6  topic目前的分区数2  则 分区为:0
  1. KafkaProducer 异步, 同步发送api:

    异步发送:

                 producer.send(producerRecord对象);
    

    同步发送则send()方法后面.get()



kafka 的send方法核心逻辑:

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, (Callback)null);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // 拦截器集合。多个拦截对象循环遍历
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return this.doSend(interceptedRecord, callback);
    }
    
    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;

        // 获取集群信息metadata
        try {
            this.throwIfProducerClosed();
            long nowMs = this.time.milliseconds();

            ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);
            } catch (KafkaException var22) {
                if (this.metadata.isClosed()) {
                    throw new KafkaException("Producer closed while send in progress", var22);
                }

                throw var22;
            }

            nowMs += clusterAndWaitTime.waitedOnMetadataMs;
            long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;

            // 序列化器 key序列化
            byte[] serializedKey;
            try {
                serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException var21) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21);
            }
      
            // 序列化器 value序列化
            byte[] serializedValue;
            try {
                serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException var20) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20);
            }

            // 分区
            int partition = this.partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            this.setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
            this.ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
            if (this.log.isTraceEnabled()) {
                this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
            }

            Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
            // RecordAccumulator.append() 添加数据转 ProducerBatch
            RecordAccumulator.RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
            if (result.abortForNewBatch) {
                int prevPartition = partition;
                this.partitioner.onNewBatch(record.topic(), cluster, partition);
                partition = this.partition(record, serializedKey, serializedValue, cluster);
                tp = new TopicPartition(record.topic(), partition);
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});
                }

                interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
                result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
            }

            if (this.transactionManager != null) {
                this.transactionManager.maybeAddPartition(tp);
            }

            // 判断是否满了,满了唤醒sender , sender继承了runnable
            if (result.batchIsFull || result.newBatchCreated) {
                this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }

            return result.future;
        } catch (ApiException var23) {
            this.log.debug("Exception occurred during message send:", var23);
            if (tp == null) {
                tp = ProducerInterceptors.extractTopicPartition(record);
            }

            Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
            interceptCallback.onCompletion((RecordMetadata)null, var23);
            this.errors.record();
            this.interceptors.onSendError(record, tp, var23);
            return new FutureFailure(var23);
        } catch (InterruptedException var24) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, var24);
            throw new InterruptException(var24);
        } catch (KafkaException var25) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, var25);
            throw var25;
        } catch (Exception var26) {
            this.interceptors.onSendError(record, tp, var26);
            throw var26;
        }
    }

Sender类 run()方法:

 public void run() {
        this.log.debug("Starting Kafka producer I/O thread.");

        while(this.running) {
            try {
                this.runOnce();
            } catch (Exception var5) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", var5);
            }
        }

        this.log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        while(!this.forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0 || this.hasPendingTransactionalRequests())) {
            try {
                this.runOnce();
            } catch (Exception var4) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", var4);
            }
        }

        while(!this.forceClose && this.transactionManager != null && this.transactionManager.hasOngoingTransaction()) {
            if (!this.transactionManager.isCompleting()) {
                this.log.info("Aborting incomplete transaction due to shutdown");
                this.transactionManager.beginAbort();
            }

            try {
                this.runOnce();
            } catch (Exception var3) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", var3);
            }
        }

        if (this.forceClose) {
            if (this.transactionManager != null) {
                this.log.debug("Aborting incomplete transactional requests due to forced shutdown");
                this.transactionManager.close();
            }

            this.log.debug("Aborting incomplete batches due to forced shutdown");
            this.accumulator.abortIncompleteBatches();
        }

        try {
            this.client.close();
        } catch (Exception var2) {
            this.log.error("Failed to close network client", var2);
        }

        this.log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

    void runOnce() {
        if (this.transactionManager != null) {
            try {
                this.transactionManager.maybeResolveSequences();
                if (this.transactionManager.hasFatalError()) {
                    RuntimeException lastError = this.transactionManager.lastError();
                    if (lastError != null) {
                        this.maybeAbortBatches(lastError);
                    }

                    this.client.poll(this.retryBackoffMs, this.time.milliseconds());
                    return;
                }

                this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
                if (this.maybeSendAndPollTransactionalRequest()) {
                    return;
                }
            } catch (AuthenticationException var5) {
                this.log.trace("Authentication exception while processing transactional request", var5);
                this.transactionManager.authenticationFailed(var5);
            }
        }

        long currentTimeMs = this.time.milliseconds();
        // 发送数据
        long pollTimeout = this.sendProducerData(currentTimeMs);
        this.client.poll(pollTimeout, currentTimeMs);
    }

sendProducerData() :

  最终转换为ClientRequest对象
         ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);
         this.client.send(clientRequest, now);
private long sendProducerData(long now) {
        Cluster cluster = this.metadata.fetch();
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
        Iterator iter;
        if (!result.unknownLeaderTopics.isEmpty()) {
            iter = result.unknownLeaderTopics.iterator();

            while(iter.hasNext()) {
                String topic = (String)iter.next();
                this.metadata.add(topic, now);
            }

            this.log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }

        iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;

        while(iter.hasNext()) {
            Node node = (Node)iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        this.addToInflightBatches(batches);
        List expiredBatches;
        Iterator var11;
        ProducerBatch expiredBatch;
        if (this.guaranteeMessageOrder) {
            Iterator var9 = batches.values().iterator();

            while(var9.hasNext()) {
                expiredBatches = (List)var9.next();
                var11 = expiredBatches.iterator();

                while(var11.hasNext()) {
                    expiredBatch = (ProducerBatch)var11.next();
                    this.accumulator.mutePartition(expiredBatch.topicPartition);
                }
            }
        }

        this.accumulator.resetNextBatchExpiryTime();
        List<ProducerBatch> expiredInflightBatches = this.getExpiredInflightBatches(now);
        expiredBatches = this.accumulator.expiredBatches(now);
        expiredBatches.addAll(expiredInflightBatches);
        if (!expiredBatches.isEmpty()) {
            this.log.trace("Expired {} batches in accumulator", expiredBatches.size());
        }

        var11 = expiredBatches.iterator();

        while(var11.hasNext()) {
            expiredBatch = (ProducerBatch)var11.next();
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            this.failBatch(expiredBatch, (RuntimeException)(new TimeoutException(errorMessage)), false);
            if (this.transactionManager != null && expiredBatch.inRetry()) {
                this.transactionManager.markSequenceUnresolved(expiredBatch);
            }
        }

        this.sensors.updateProduceRequestMetrics(batches);
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0L);
        if (!result.readyNodes.isEmpty()) {
            this.log.trace("Nodes with data ready to send: {}", result.readyNodes);
            pollTimeout = 0L;
        }

        this.sendProduceRequests(batches, now);
        return pollTimeout;
    }

    private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
        Iterator var4 = collated.entrySet().iterator();

        while(var4.hasNext()) {
            Map.Entry<Integer, List<ProducerBatch>> entry = (Map.Entry)var4.next();
            this.sendProduceRequest(now, (Integer)entry.getKey(), this.acks, this.requestTimeoutMs, (List)entry.getValue());
        }

    }

    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
        if (!batches.isEmpty()) {
            Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap(batches.size());
            byte minUsedMagic = this.apiVersions.maxUsableProduceMagic();
            Iterator var9 = batches.iterator();

            while(var9.hasNext()) {
                ProducerBatch batch = (ProducerBatch)var9.next();
                if (batch.magic() < minUsedMagic) {
                    minUsedMagic = batch.magic();
                }
            }

            ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
            Iterator var16 = batches.iterator();

            while(var16.hasNext()) {
                ProducerBatch batch = (ProducerBatch)var16.next();
                TopicPartition tp = batch.topicPartition;
                MemoryRecords records = batch.records();
                if (!records.hasMatchingMagic(minUsedMagic)) {
                    records = (MemoryRecords)batch.records().downConvert(minUsedMagic, 0L, this.time).records();
                }

                ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
                if (tpData == null) {
                    tpData = (new ProduceRequestData.TopicProduceData()).setName(tp.topic());
                    tpd.add(tpData);
                }

                tpData.partitionData().add((new ProduceRequestData.PartitionProduceData()).setIndex(tp.partition()).setRecords(records));
                recordsByPartition.put(tp, batch);
            }

            String transactionalId = null;
            if (this.transactionManager != null && this.transactionManager.isTransactional()) {
                transactionalId = this.transactionManager.transactionalId();
            }

            ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic, (new ProduceRequestData()).setAcks(acks).setTimeoutMs(timeout).setTransactionalId(transactionalId).setTopicData(tpd));
            RequestCompletionHandler callback = (response) -> {
                this.handleProduceResponse(response, recordsByPartition, this.time.milliseconds());
            };
            String nodeId = Integer.toString(destination);
            ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);
            // this.client 为KafkaClient接口 实现类:NetworkClient对象
            this.client.send(clientRequest, now);
            this.log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
        }
    }

NetworkClient send()方法:

 public void send(ClientRequest request, long now) {
        this.doSend(request, false, now);
    }

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
        this.ensureActive();
        String nodeId = clientRequest.destination();
        if (!isInternalRequest && !this.canSendRequest(nodeId, now)) {
            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
        } else {
            AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();

            try {
                NodeApiVersions versionInfo = this.apiVersions.get(nodeId);
                short version;
                if (versionInfo == null) {
                    version = builder.latestAllowedVersion();
                    if (this.discoverBrokerVersions && this.log.isTraceEnabled()) {
                        this.log.trace("No version information found when sending {} with correlation id {} to node {}. Assuming version {}.", new Object[]{clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version});
                    }
                } else {
                    version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion());
                }

                this.doSend(clientRequest, isInternalRequest, now, builder.build(version));
            } catch (UnsupportedVersionException var9) {
                this.log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", new Object[]{builder, clientRequest.correlationId(), clientRequest.destination(), var9});
                ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, var9, (AuthenticationException)null, (AbstractResponse)null);
                if (!isInternalRequest) {
                    this.abortedSends.add(clientResponse);
                } else if (clientRequest.apiKey() == ApiKeys.METADATA) {
                    this.metadataUpdater.handleFailedRequest(now, Optional.of(var9));
                }
            }

        }
    }

    private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        String destination = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sending {} request with header {} and timeout {} to node {}: {}", new Object[]{clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request});
        }

        Send send = request.toSend(header);
        // clientRequest convert InFlightRequest 对象
        InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);
        this.inFlightRequests.add(inFlightRequest);
        // nio channel。。。selector 发送消息信息
        //this.selector is Selectable interface  KafkaChannel is implement
        this.selector.send(new NetworkSend(clientRequest.destination(), send));
    }

总结:直接阅读源码很快就能想明白kafka 生产者发送逻辑,kafka-client.jar。 核心==>

本文第一张图片

标签: kafka 分布式

本文转载自: https://blog.csdn.net/qq_33919114/article/details/132487537
版权归原作者 不努力就种地~ 所有, 如有侵权,请联系我们删除。

“Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解”的评论:

还没有评论