文章目录
一、生产者源码
1、初始化
生产者 main 线程初始化,点击 main()方法中的 KafkaProducer()
KafkaProducer(ProducerConfig config,Serializer<K> keySerializer,Serializer<V> valueSerializer,ProducerMetadata metadata,KafkaClient kafkaClient,ProducerInterceptors<K,V> interceptors,Time time){try{this.producerConfig = config;this.time = time;// 获取事务idString transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);// 获取客户端idthis.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);......// 监控kafka运行情况JmxReporter jmxReporter =newJmxReporter();
jmxReporter.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)));
reporters.add(jmxReporter);MetricsContext metricsContext =newKafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));this.metrics =newMetrics(metricConfig, reporters, time, metricsContext);// 获取分区器this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,Partitioner.class,Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);// key和value的序列化if(keySerializer ==null){this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,Serializer.class);this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)),true);}......// 拦截器处理(拦截器可以有多个)List<ProducerInterceptor<K,V>> interceptorList =(List) config.getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptor.class,Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));......// 单条日志大小 默认1mthis.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);// 缓冲区大小 默认32mthis.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);// 压缩,默认是nonethis.compressionType =CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);int deliveryTimeoutMs =configureDeliveryTimeout(config, log);this.apiVersions =newApiVersions();this.transactionManager =configureTransactionState(config, logContext);// 缓冲区对象 默认是32m// 批次大小 默认16k// 压缩方式,默认是none// liner.ms 默认是0// 内存池this.accumulator =newRecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),this.compressionType,lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,newBufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));// 连接上kafka集群地址List<InetSocketAddress> addresses =ClientUtils.parseAndValidateAddresses(
config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));// 获取元数据if(metadata !=null){this.metadata = metadata;}else{this.metadata =newProducerMetadata(retryBackoffMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,Time.SYSTEM);this.metadata.bootstrap(addresses);}this.errors =this.metrics.sensor("errors");this.sender =newSender(logContext, kafkaClient,this.metadata);String ioThreadName = NETWORK_THREAD_PREFIX +" | "+ clientId;// 把sender线程放到后台this.ioThread =newKafkaThread(ioThreadName,this.sender,true);// 启动sender线程this.ioThread.start();
config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");}......}
生产者 sender 线程初始化,KafkaProducer.java中点击 newSender()方法,查看发送线程初始化
SendernewSender(LogContext logContext,KafkaClient kafkaClient,ProducerMetadata metadata){// 缓存请求的个数 默认是5个int maxInflightRequests =configureInflightRequests(producerConfig);// 请求超时时间,默认30sint requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);ChannelBuilder channelBuilder =ClientUtils.createChannelBuilder(producerConfig, time, logContext);ProducerMetrics metricsRegistry =newProducerMetrics(this.metrics);Sensor throttleTimeSensor =Sender.throttleTimeSensor(metricsRegistry.senderMetrics);// 创建一个客户端对象// clientId 客户端id// maxInflightRequests 缓存请求的个数 默认是5个// RECONNECT_BACKOFF_MS_CONFIG 重试时间// RECONNECT_BACKOFF_MAX_MS_CONFIG 总的重试时间// 发送缓冲区大小send.buffer.bytes 默认128kb// 接收数据缓存 receive.buffer.bytes 默认是32kbKafkaClient client = kafkaClient !=null? kafkaClient :newNetworkClient(newSelector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),this.metrics, time,"producer", channelBuilder, logContext),
metadata,
clientId,
maxInflightRequests,
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
time,true,
apiVersions,
throttleTimeSensor,
logContext);// 0 :生产者发送过来,不需要应答; 1 :leader收到,应答; -1 :leader和isr队列里面所有的都收到了应答short acks =configureAcks(producerConfig, log);// 创建sender线程returnnewSender(logContext,
client,
metadata,this.accumulator,
maxInflightRequests ==1,
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),this.transactionManager,
apiVersions);}
Sender 对象被放到了一个线程中启动,所有需要点击 newSender()方法中的 Sender,并找到 sender 对象中的 run()方法
@Overridepublicvoidrun(){......while(!forceClose &&((this.accumulator.hasUndrained()||this.client.inFlightRequestCount()>0)||hasPendingTransactionalRequests())){try{// sender 线程从缓冲区准备拉取数据,刚启动拉不到数据runOnce();}catch(Exception e){
log.error("Uncaught error in kafka producer I/O thread: ", e);}}......}
2、发送数据到缓冲区
2.1 发送总体流程
从send()方法进入
publicProducerRecord<K,V>onSend(ProducerRecord<K,V>record){ProducerRecord<K,V> interceptRecord =record;for(ProducerInterceptor<K,V> interceptor :this.interceptors){try{// 拦截器对数据进行加工
interceptRecord = interceptor.onSend(interceptRecord);......return interceptRecord;}//从拦截器处理中返回,点击 doSend()方法privateFuture<RecordMetadata>doSend(ProducerRecord<K,V>record,Callback callback){TopicPartition tp =null;try{throwIfProducerClosed();// first make sure the metadata for the topic is availablelong nowMs = time.milliseconds();ClusterAndWaitTime clusterAndWaitTime;try{// 获取元数据
clusterAndWaitTime =waitOnMetadata(record.topic(),record.partition(), nowMs, maxBlockTimeMs);}catch(KafkaException e){if(metadata.isClosed())thrownewKafkaException("Producer closed while send in progress", e);throw e;}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;long remainingWaitMs =Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);Cluster cluster = clusterAndWaitTime.cluster;// 序列化相关操作byte[] serializedKey;try{
serializedKey = keySerializer.serialize(record.topic(),record.headers(),record.key());......// 分区操作int partition =partition(record, serializedKey, serializedValue, cluster);
tp =newTopicPartition(record.topic(), partition);setReadOnly(record.headers());Header[] headers =record.headers().toArray();int serializedSize =AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);// 保证数据大小能够传输(序列化后的 压缩后的)ensureValidRecordSize(serializedSize);......// accumulator缓存 追加数据 result是是否添加成功的结果RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs,true, nowMs);......// 批次大小已经满了 获取有一个新批次创建if(result.batchIsFull || result.newBatchCreated){
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch",record.topic(), partition);// 唤醒发送线程this.sender.wakeup();}return result.future;......}
2.2 分区选择
privateintpartition(ProducerRecord<K,V>record,byte[] serializedKey,byte[] serializedValue,Cluster cluster){Integer partition =record.partition();// 如果指定分区,按照指定分区配置return partition !=null?
partition :
partitioner.partition(record.topic(),record.key(), serializedKey,record.value(), serializedValue, cluster);}//点击 partition,跳转到 Partitioner 接口,选择默认的分区器 DefaultPartitionerpublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster,int numPartitions){// 没有指定keyif(keyBytes ==null){// 按照粘性分区处理return stickyPartitionCache.partition(topic, cluster);}// 如果指定key,按照key的hashcode值 对分区数求模// hash the keyBytes to choose a partitionreturnUtils.toPositive(Utils.murmur2(keyBytes))% numPartitions;}
2.3 发送消息大小校验
privatevoidensureValidRecordSize(int size){// 单条信息最大值 maxRequestSize 1mif(size > maxRequestSize)thrownewRecordTooLargeException("The message is "+ size +" bytes when serialized which is larger than "+ maxRequestSize +", which is the value of the "+ProducerConfig.MAX_REQUEST_SIZE_CONFIG +" configuration.");// totalMemorySize 缓存大小 默认32mif(size > totalMemorySize)thrownewRecordTooLargeException("The message is "+ size +" bytes when serialized which is larger than the total memory buffer you have configured with the "+ProducerConfig.BUFFER_MEMORY_CONFIG +" configuration.");}
2.4 内存池
publicRecordAppendResultappend(TopicPartition tp,long timestamp,byte[] key,byte[] value,Header[] headers,Callback callback,long maxTimeToBlock,boolean abortOnNewBatch,long nowMs)throwsInterruptedException{......try{// check if we have an in-progress batch// 获取或者创建一个队列(按照每个主题的分区)Deque<ProducerBatch> dq =getOrCreateDeque(tp);synchronized(dq){if(closed)thrownewKafkaException("Producer closed while send in progress");// 尝试向队列里面添加数据(正常添加不成功)RecordAppendResult appendResult =tryAppend(timestamp, key, value, headers, callback, dq, nowMs);if(appendResult !=null)return appendResult;}// we don't have an in-progress record batch try to allocate a new batchif(abortOnNewBatch){// Return a result that will cause another call to append.returnnewRecordAppendResult(null,false,false,true);}byte maxUsableMagic = apiVersions.maxUsableProduceMagic();// this.batchSize 默认16k 数据大小17kint size =Math.max(this.batchSize,AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);// 申请内存 内存池分配内存 双端队列
buffer = free.allocate(size, maxTimeToBlock);// Update the current time in case the buffer allocation blocked above.
nowMs = time.milliseconds();synchronized(dq){// Need to check if producer is closed again after grabbing the dequeue lock.if(closed)thrownewKafkaException("Producer closed while send in progress");RecordAppendResult appendResult =tryAppend(timestamp, key, value, headers, callback, dq, nowMs);if(appendResult !=null){// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...return appendResult;}// 封装内存bufferMemoryRecordsBuilder recordsBuilder =recordsBuilder(buffer, maxUsableMagic);// 再次封装(得到真正的批次大小)ProducerBatch batch =newProducerBatch(tp, recordsBuilder, nowMs);FutureRecordMetadata future =Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, nowMs));// 向队列的末尾添加批次
dq.addLast(batch);
incomplete.add(batch);// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer =null;returnnewRecordAppendResult(future, dq.size()>1|| batch.isFull(),true,false);}}finally{if(buffer !=null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();}}
3、sender 线程发送数据
voidrunOnce(){// 事务相关操作if(transactionManager !=null){try{
transactionManager.maybeResolveSequences();// do not continue sending if the transaction manager is in a failed stateif(transactionManager.hasFatalError()){RuntimeException lastError = transactionManager.lastError();if(lastError !=null)maybeAbortBatches(lastError);
client.poll(retryBackoffMs, time.milliseconds());return;}// Check whether we need a new producerId. If so, we will enqueue an InitProducerId// request which will be sent below
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();if(maybeSendAndPollTransactionalRequest()){return;}}catch(AuthenticationException e){// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request", e);
transactionManager.authenticationFailed(e);}}long currentTimeMs = time.milliseconds();// 发送数据long pollTimeout =sendProducerData(currentTimeMs);// 获取发送结果
client.poll(pollTimeout, currentTimeMs);}privatelongsendProducerData(long now){// 获取元数据Cluster cluster = metadata.fetch();// get the list of partitions with data ready to send// 1、判断32m缓存是否准备好RecordAccumulator.ReadyCheckResult result =this.accumulator.ready(cluster, now);// 如果 Leader 信息不知道,是不能发送数据的// if there are any partitions whose leaders are not known yet, force metadata updateif(!result.unknownLeaderTopics.isEmpty()){// The set of topics with unknown leader contains topics with leader election pending as well as// topics which may have expired. Add the topic again to metadata to ensure it is included// and request metadata update, since there are messages to send to the topic.for(String topic : result.unknownLeaderTopics)this.metadata.add(topic, now);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);this.metadata.requestUpdate();}......// create produce requests// 发送每个节点数据,进行封装,这样一个分区的就可以打包一起发送Map<Integer,List<ProducerBatch>> batches =this.accumulator.drain(cluster, result.readyNodes,this.maxRequestSize, now);addToInflightBatches(batches);......// 发送请求sendProduceRequests(batches, now);return pollTimeout;}// 是否准备发送publicReadyCheckResultready(Cluster cluster,long nowMs){Set<Node> readyNodes =newHashSet<>();long nextReadyCheckDelayMs =Long.MAX_VALUE;Set<String> unknownLeaderTopics =newHashSet<>();boolean exhausted =this.free.queued()>0;for(Map.Entry<TopicPartition,Deque<ProducerBatch>> entry :this.batches.entrySet()){Deque<ProducerBatch> deque = entry.getValue();synchronized(deque){// When producing to a large number of partitions, this path is hot and deques are often empty.// We check whether a batch exists first to avoid the more expensive checks whenever possible.ProducerBatch batch = deque.peekFirst();if(batch !=null){TopicPartition part = entry.getKey();Node leader = cluster.leaderFor(part);if(leader ==null){// This is a partition for which leader is not known, but messages are available to send.// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());}elseif(!readyNodes.contains(leader)&&!isMuted(part)){long waitedTimeMs = batch.waitedTimeMs(nowMs);// 如果不是第一次拉取, 且等待时间小于重试时间 默认100ms ,backingOff=trueboolean backingOff = batch.attempts()>0&& waitedTimeMs < retryBackoffMs;// 如果backingOff是true 取retryBackoffMs; 如果不是第一次拉取取lingerMs,默认0long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;// 批次大小满足发送条件boolean full = deque.size()>1|| batch.isFull();// 如果超时,也要发送boolean expired = waitedTimeMs >= timeToWaitMs;boolean transactionCompleting = transactionManager !=null&& transactionManager.isCompleting();// full linger.msboolean sendable = full
|| expired
|| exhausted
|| closed
||flushInProgress()|| transactionCompleting;if(sendable &&!backingOff){
readyNodes.add(leader);}else{long timeLeftMs =Math.max(timeToWaitMs - waitedTimeMs,0);// Note that this results in a conservative estimate since an un-sendable partition may have// a leader that will later be found to have sendable data. However, this is good enough// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs =Math.min(timeLeftMs, nextReadyCheckDelayMs);}}}}}returnnewReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);}
二、消费者源码
1、初始化
点击 main()方法中的 KafkaConsumer ()
KafkaConsumer(ConsumerConfig config,Deserializer<K> keyDeserializer,Deserializer<V> valueDeserializer){try{// 消费组平衡GroupRebalanceConfig groupRebalanceConfig =newGroupRebalanceConfig(config,GroupRebalanceConfig.ProtocolType.CONSUMER);// 获取消费者组idthis.groupId =Optional.ofNullable(groupRebalanceConfig.groupId);// 客户端idthis.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);......// 客户端请求服务端等待时间request.timeout.ms 默认是30sthis.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);this.time =Time.SYSTEM;this.metrics =buildMetrics(config, time, clientId);// 重试时间 100this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);// 拦截器List<ConsumerInterceptor<K,V>> interceptorList =(List) config.getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,ConsumerInterceptor.class,Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));this.interceptors =newConsumerInterceptors<>(interceptorList);// key和value 的反序列化if(keyDeserializer ==null){this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,Deserializer.class);this.keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)),true);}else{
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);this.keyDeserializer = keyDeserializer;}if(valueDeserializer ==null){this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,Deserializer.class);this.valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)),false);}else{
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);this.valueDeserializer = valueDeserializer;}// offset从什么位置开始消费 默认,latestOffsetResetStrategy offsetResetStrategy =OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));this.subscriptions =newSubscriptionState(logContext, offsetResetStrategy);ClusterResourceListeners clusterResourceListeners =configureClusterResourceListeners(keyDeserializer,
valueDeserializer, metrics.reporters(), interceptorList);// 元数据// retryBackoffMs 重试时间// 是否允许访问系统主题 exclude.internal.topics 默认是true,表示不允许// 是否允许自动创建topic allow.auto.create.topics 默认是truethis.metadata =newConsumerMetadata(retryBackoffMs,
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),!config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
subscriptions, logContext, clusterResourceListeners);// 连接kafka集群List<InetSocketAddress> addresses =ClientUtils.parseAndValidateAddresses(
config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));this.metadata.bootstrap(addresses);String metricGrpPrefix ="consumer";FetcherMetricsRegistry metricsRegistry =newFetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix);ChannelBuilder channelBuilder =ClientUtils.createChannelBuilder(config, time, logContext);this.isolationLevel =IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));Sensor throttleTimeSensor =Fetcher.throttleTimeSensor(metrics, metricsRegistry);int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);ApiVersions apiVersions =newApiVersions();// 创建客户端对象// 连接重试时间 默认50ms// 最大连接重试时间 默认1s// 发送缓存 默认128kb// 接收缓存 默认64kb// 客户端请求服务端等待时间request.timeout.ms 默认是30sNetworkClient netClient =newNetworkClient(newSelector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),this.metadata,
clientId,100,// a fixed large enough value will suffice for max in-flight requests
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
config.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
config.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
time,true,
apiVersions,
throttleTimeSensor,
logContext);// 消费者客户端// 客户端请求服务端等待时间request.timeout.ms 默认是30sthis.client =newConsumerNetworkClient(
logContext,
netClient,
metadata,
time,
retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
heartbeatIntervalMs);//Will avoid blocking an extended period of time to prevent heartbeat thread starvation// 消费者分区分配策略this.assignors =ConsumerPartitionAssignor.getAssignorInstances(
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)));// no coordinator will be constructed for the default (null) group id// 为消费者组准备的// auto.commit.interval.ms 自动提交offset时间 默认5sthis.coordinator =!groupId.isPresent()?null:newConsumerCoordinator(groupRebalanceConfig,
logContext,this.client,
assignors,this.metadata,this.subscriptions,
metrics,
metricGrpPrefix,this.time,
enableAutoCommit,
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),this.interceptors,
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));// 配置抓数据的参数// fetch.min.bytes 默认最少一次抓取1个字节// fetch.max.bytes 默认最多一次抓取50m// fetch.max.wait.ms 抓取等待最大时间 500ms// max.partition.fetch.bytes 默认是1m// max.poll.records 默认一次处理500条this.fetcher =newFetcher<>(
logContext,this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),this.keyDeserializer,this.valueDeserializer,this.metadata,this.subscriptions,
metrics,
metricsRegistry,this.time,this.retryBackoffMs,this.requestTimeoutMs,
isolationLevel,
apiVersions);this.kafkaConsumerMetrics =newKafkaConsumerMetrics(metrics, metricGrpPrefix);
config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka consumer initialized");}catch(Throwable t){// call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121// we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.if(this.log !=null){close(0,true);}// now propagate the exceptionthrownewKafkaException("Failed to construct kafka consumer", t);}}
2、消费者订阅主题
点击自己编写的 CustomConsumer.java 中的 subscribe ()方法
publicvoidsubscribe(Collection<String> topics,ConsumerRebalanceListener listener){acquireAndEnsureOpen();try{maybeThrowInvalidGroupIdException();// 要订阅的主题如果为null ,直接抛异常if(topics ==null)thrownewIllegalArgumentException("Topic collection to subscribe to cannot be null");// 要订阅的主题如果为空if(topics.isEmpty()){// treat subscribing to empty topic list as the same as unsubscribingthis.unsubscribe();}else{// 正常的处理操作for(String topic : topics){// 如果为空 抛异常if(Utils.isBlank(topic))thrownewIllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");}throwIfNoAssignorsConfigured();
fetcher.clearBufferedDataForUnassignedTopics(topics);
log.info("Subscribed to topic(s): {}",Utils.join(topics,", "));// 订阅主题(判断你是否需要更新订阅的主题; 主题了一个监听器listener)if(this.subscriptions.subscribe(newHashSet<>(topics), listener))// 更新订阅信息
metadata.requestUpdateForNewTopics();}}finally{release();}}publicsynchronizedbooleansubscribe(Set<String> topics,ConsumerRebalanceListener listener){// 注册负载均衡监听器registerRebalanceListener(listener);// 按照主题自动订阅模式setSubscriptionType(SubscriptionType.AUTO_TOPICS);// 判断是否需要更改订阅的主题returnchangeSubscription(topics);}privatebooleanchangeSubscription(Set<String> topicsToSubscribe){// 如果传入的topics 和以前订阅的主题一致,那就不需要更改对应订阅的主题if(subscription.equals(topicsToSubscribe))returnfalse;
subscription = topicsToSubscribe;returntrue;}
3、消费者拉取和处理数据
3.1 消费总体流程
点击自己编写的 CustomConsumer.java 中的 poll ()方法
privateConsumerRecords<K,V>poll(finalTimer timer,finalboolean includeMetadataInTimeout){acquireAndEnsureOpen();try{this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());if(this.subscriptions.hasNoSubscriptionOrUserAssignment()){thrownewIllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");}do{
client.maybeTriggerWakeup();if(includeMetadataInTimeout){// 1、消费者或者消费者组的初始化// try to update assignment metadata BUT do not need to block on the timer for join groupupdateAssignmentMetadataIfNeeded(timer,false);}else{while(!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE),true)){
log.warn("Still waiting for metadata");}}// 2 抓取数据finalMap<TopicPartition,List<ConsumerRecord<K,V>>> records =pollForFetches(timer);if(!records.isEmpty()){// before returning the fetched records, we can send off the next round of fetches// and avoid block waiting for their responses to enable pipelining while the user// is handling the fetched records.//// NOTE: since the consumed position has already been updated, we must not allow// wakeups or any other errors to be triggered prior to returning the fetched records.if(fetcher.sendFetches()>0|| client.hasPendingRequests()){
client.transmitSends();}// 3 拦截器处理数据returnthis.interceptors.onConsume(newConsumerRecords<>(records));}}while(timer.notExpired());returnConsumerRecords.empty();}finally{release();this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());}}
3.2 消费者/消费者组初始化
publicbooleanpoll(Timer timer,boolean waitForJoinGroup){maybeUpdateSubscriptionMetadata();invokeCompletedOffsetCommitCallbacks();if(subscriptions.hasAutoAssignedPartitions()){// 如果没有指定分区分配策略 直接抛异常if(protocol ==null){thrownewIllegalStateException("User configured "+ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +" to empty while trying to subscribe for group protocol to auto assign partitions");}// Always update the heartbeat last poll time so that the heartbeat thread does not leave the// group proactively due to application inactivity even if (say) the coordinator cannot be found.// 3s心跳pollHeartbeat(timer.currentTimeMs());// 判断Coordinator 是否准备好了if(coordinatorUnknown()&&!ensureCoordinatorReady(timer)){returnfalse;}......maybeAutoCommitOffsetsAsync(timer.currentTimeMs());returntrue;}
3.3 拉取数据
privateMap<TopicPartition,List<ConsumerRecord<K,V>>>pollForFetches(Timer timer){long pollTimeout = coordinator ==null? timer.remainingMs():Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());// if data is available already, return it immediately// 第一次拉取不到数据finalMap<TopicPartition,List<ConsumerRecord<K,V>>> records = fetcher.fetchedRecords();if(!records.isEmpty()){return records;}// send any new fetches (won't resend pending fetches)// 开始拉取数据,里面放了一个监听函数,
fetcher.sendFetches();// We do not want to be stuck blocking in poll if we are missing some positions// since the offset lookup may be backing off after a failure// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call// updateAssignmentMetadataIfNeeded before this method.if(!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs){
pollTimeout = retryBackoffMs;}
log.trace("Polling for fetches with timeout {}", pollTimeout);Timer pollTimer = time.timer(pollTimeout);
client.poll(pollTimer,()->{// since a fetch might be completed by the background thread, we need this poll condition// to ensure that we do not block unnecessarily in poll()return!fetcher.hasAvailableFetches();});
timer.update(pollTimer.currentTimeMs());return fetcher.fetchedRecords();}// 首先抓取数据为空,然后发送请求监听并将数据放入队列,最后再抓取数据,拦截器处理数据
3.4 消费者 Offset 提交
三、服务端源码
生产者消费者源码使用java编写,而服务端源码使用scala编写
程序入口在core→src→main→scala→Kafka→kafka.scala
def main(args: Array[String]):Unit={try{// 获取相关参数val serverProps = getPropsFromArgs(args)// 创建服务val server = buildServer(serverProps)try{if(!OperatingSystem.IS_WINDOWS &&!Java.isIbmJdk)new LoggingSignalHandler().register()}catch{case e: ReflectiveOperationException =>
warn("Failed to register optional signal handler that logs a message when the process is terminated "+
s"by a signal. Reason for registration failure is: $e", e)}// attach shutdown handler to catch terminating signals as well as normal termination
Exit.addShutdownHook("kafka-shutdown-hook",{try server.shutdown()catch{case _: Throwable =>
fatal("Halting Kafka.")// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
Exit.halt(1)}})// 启动服务try server.startup()catch{case _: Throwable =>// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)}
server.awaitShutdown()}catch{case e: Throwable =>
fatal("Exiting Kafka due to fatal exception", e)
Exit.exit(1)}
Exit.exit(0)}
版权归原作者 魅Lemon 所有, 如有侵权,请联系我们删除。