目录
4. kafka消费者源码初探
4.1 kafka消费者基本原理
kafka是通过pull(拉)模式从服务端数据,其流程其实就是客户端不断循环向服务端发送网络请求,通过回调函数获取数据、处理数据的过程,大致流程如图:
4.2 消费者源码
4.2.1 示例代码
publicclassConsumerFastAnalysis{publicstaticfinalString brokerList ="hadoop101:9092";publicstaticfinalString topic ="topic-demo";publicstaticfinalString groupId ="group.demo";publicstaticvoidmain(String[] args){//配置消费者客户端参数Properties properties =newProperties();
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", groupId);//kafka消费者KafkaConsumer<String,String> consumer =newKafkaConsumer<>(properties);//订阅主题
consumer.subscribe(Collections.singletonList(topic));while(true){//拉取消息ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println("消费到的消息:"+ record.value());}}}}
由示例代码可以看出,消费者正常消费逻辑大体可以分为三步:
①配置消费者客户端参数及创建相应的消费者实例(KafkaConsumer初始化)
②订阅主题
③拉取消息并消费
4.2.2 KafkaConsumer初始化
同KafkaProducer初始化原理相同,KafkaConsumer初始化也是读取用户自定义的配置,然后封装为具有不同功能的java对象,KafkaConsumer持有这些对象,完成消费者功能:
KafkaConsumer初始化大致流程如下:
源码解析:
privateKafkaConsumer(ConsumerConfig config,Deserializer<K> keyDeserializer,Deserializer<V> valueDeserializer){try{//设置客户端idString clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);if(clientId.isEmpty())
clientId ="consumer-"+CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();this.clientId = clientId;//设置消费者组idString groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);LogContext logContext =newLogContext("[Consumer clientId="+ clientId +", groupId="+ groupId +"] ");this.log = logContext.logger(getClass());
log.debug("Initializing the Kafka consumer");//等待服务端响应的最大时间,默认30sthis.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);this.time =Time.SYSTEM;Map<String,String> metricsTags =Collections.singletonMap("client-id", clientId);MetricConfig metricConfig =newMetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)).timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),TimeUnit.MILLISECONDS).recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))).tags(metricsTags);List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,MetricsReporter.class);
reporters.add(newJmxReporter(JMX_PREFIX));this.metrics =newMetrics(metricConfig, reporters, time);//重试时间间隔,默认100msthis.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);// load interceptors and make sure they get clientIdMap<String,Object> userProvidedConfigs = config.originals();
userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);//拦截器配置List<ConsumerInterceptor<K,V>> interceptorList =(List)(newConsumerConfig(userProvidedConfigs,false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,ConsumerInterceptor.class);this.interceptors =newConsumerInterceptors<>(interceptorList);//key和value反序列化配置if(keyDeserializer ==null){this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,Deserializer.class);this.keyDeserializer.configure(config.originals(),true);}else{
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);this.keyDeserializer = keyDeserializer;}if(valueDeserializer ==null){this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,Deserializer.class);this.valueDeserializer.configure(config.originals(),false);}else{
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);this.valueDeserializer = valueDeserializer;}ClusterResourceListeners clusterResourceListeners =configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);//配置元数据this.metadata =newMetadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),true,false, clusterResourceListeners);//配置连接的kafka集群List<InetSocketAddress> addresses =ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));this.metadata.update(Cluster.bootstrap(addresses),Collections.<String>emptySet(),0);String metricGrpPrefix ="consumer";ConsumerMetrics metricsRegistry =newConsumerMetrics(metricsTags.keySet(),"consumer");ChannelBuilder channelBuilder =ClientUtils.createChannelBuilder(config);IsolationLevel isolationLevel =IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));Sensor throttleTimeSensor =Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics);//心跳时间,默认3sint heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);//创建网络客户端NetworkClient netClient =newNetworkClient(newSelector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),this.metadata,
clientId,100,// a fixed large enough value will suffice for max in-flight requests
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
time,true,newApiVersions(),
throttleTimeSensor,
logContext);//创建消费者客户端this.client =newConsumerNetworkClient(
logContext,
netClient,
metadata,
time,
retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
heartbeatIntervalMs);//Will avoid blocking an extended period of time to prevent heartbeat thread starvation//offset的拉取策略(offset从什么位置开始拉取,默认是latest)OffsetResetStrategy offsetResetStrategy =OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));this.subscriptions =newSubscriptionState(offsetResetStrategy);//消费者分区的分配策略(默认为RangeAssignor :范围分区分配策略)//RangeAssignor:以单个Topic为维度来分配, 只负责将每一个Topic的分区尽可能均衡的分配给消费者this.assignors = config.getConfiguredInstances(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,PartitionAssignor.class);int maxPollIntervalMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);//创建消费者协调器(自动提交offset的时间间隔,默认为5s)this.coordinator =newConsumerCoordinator(logContext,this.client,
groupId,
maxPollIntervalMs,
sessionTimeoutMs,newHeartbeat(sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs),
assignors,this.metadata,this.subscriptions,
metrics,
metricGrpPrefix,this.time,
retryBackoffMs,
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));//抓取数据配置//一次抓取最小值,默认1个字节//一次抓取最大值,默认50m//一次抓取最大等待时间,默认500ms//每个分区抓取最大字节数,默认1m//一次poll拉取数据返回消息最大条数,默认500条//key和value的序列化this.fetcher =newFetcher<>(
logContext,this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),this.keyDeserializer,this.valueDeserializer,this.metadata,this.subscriptions,
metrics,
metricsRegistry.fetcherMetrics,this.time,this.retryBackoffMs,this.requestTimeoutMs,
isolationLevel);
config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka consumer initialized");}catch(Throwable t){// call close methods if internal objects are already constructed// this is to prevent resource leak. see KAFKA-2121close(0,true);// now propagate the exceptionthrownewKafkaException("Failed to construct kafka consumer", t);}}
4.2.3 订阅主题
订阅主题主要逻辑就是将主题信息维护在元数据(metadata)中,并且注册一个负载均衡监听器(eg:消费者组中新增/减少消费者会触发该监听器),流程如图:
//之前的所有逻辑是对主题topics和消费者再均衡监听器ConsumerRebalanceListener的合法性检验//订阅主题this.subscriptions.subscribe(newHashSet<>(topics), listener);//设置元数据信息
metadata.setTopics(subscriptions.groupSubscription());
订阅主题代码:
publicvoidsubscribe(Set<String> topics,ConsumerRebalanceListener listener){if(listener ==null)thrownewIllegalArgumentException("RebalanceListener cannot be null");//按照设置的主题开始订阅,自动分配分区setSubscriptionType(SubscriptionType.AUTO_TOPICS);//注册负载均衡监听(例如在消费者组中,其他消费者退出触发再平衡)this.rebalanceListener = listener;//修改订阅主题信息changeSubscription(topics);}
4.2.4 拉取消息消费
consumer.poll()方法中,拉取消息消费的内部代码封装为了三个方法:
①updateAssignmentMetadataIfNeeded:主要目的是找到可用的消费者协调器(coordinator),更新fetcher拉取消息的位移
②pollForFetches:发送请求拉取数据
③this.interceptors.onConsume:调用拦截器链处理数据
(1)先看updateAssignmentMetadataIfNeeded方法
booleanupdateAssignmentMetadataIfNeeded(finallong timeoutMs){finallong startMs = time.milliseconds();//循环给服务端发请求,直到找到coordinatorif(!coordinator.poll(timeoutMs)){returnfalse;}//更新fetcher拉取消息的位移returnupdateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds()- startMs));}
publicbooleanpoll(finallong timeoutMs){finallong startTime = time.milliseconds();long currentTime = startTime;long elapsed =0L;if(subscriptions.partitionsAutoAssigned()){//发送一次心跳pollHeartbeat(currentTime);if(coordinatorUnknown()){//保证coordinator正常通信(寻找服务器端的coordinator)if(!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))){returnfalse;}
currentTime = time.milliseconds();
elapsed = currentTime - startTime;}//......省略部分代码//是否自动提交offsetmaybeAutoCommitOffsetsAsync(currentTime);returntrue;}
其中寻找可用的消费者协调器(coordinator)是通过消费者网络客户端通过网络请求实现的:
protectedsynchronizedbooleanensureCoordinatorReady(finallong timeoutMs){finallong startTimeMs = time.milliseconds();long elapsedTime =0L;//循环给服务端发送请求,直到找到coordinatorwhile(coordinatorUnknown()){finalRequestFuture<Void> future =lookupCoordinator();
client.poll(future,remainingTimeAtLeastZero(timeoutMs, elapsedTime));//......省略部分代码return!coordinatorUnknown();}
privateRequestFuture<Void>sendFindCoordinatorRequest(Node node){// initiate the group metadata request
log.debug("Sending FindCoordinator request to broker {}", node);FindCoordinatorRequest.Builder requestBuilder =newFindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP,this.groupId);return client.send(node, requestBuilder).compose(newFindCoordinatorResponseHandler());}
(2)拉取数据方法pollForFetches()
privateMap<TopicPartition,List<ConsumerRecord<K,V>>>pollForFetches(finallong timeoutMs){......//发送请求并抓取数据
fetcher.sendFetches();......//把数据按分区封装好后,一次处理默认500条数据return fetcher.fetchedRecords();}
首先通过发送网络请求的方式来抓取数据,并注册成功和失败的回调函数。回调成功中,会把拉取到的数据加入消息队列中,供后续处理。
publicintsendFetches(){Map<Node,FetchSessionHandler.FetchRequestData> fetchRequestMap =prepareFetchRequests();for(Map.Entry<Node,FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()){finalNode fetchTarget = entry.getKey();finalFetchSessionHandler.FetchRequestData data = entry.getValue();//初始化抓取数据的参数//最大等待时间默认500ms//最小抓取一个字节//最大抓取50m数据finalFetchRequest.Builder request =FetchRequest.Builder.forConsumer(this.maxWaitMs,this.minBytes, data.toSend()).isolationLevel(isolationLevel).setMaxBytes(this.maxBytes).metadata(data.metadata()).toForget(data.toForget());if(log.isDebugEnabled()){
log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);}//发送拉取数据请求
client.send(fetchTarget, request)//监听服务器端返回的数据.addListener(newRequestFutureListener<ClientResponse>(){//成功接收服务端数据回调@OverridepublicvoidonSuccess(ClientResponse resp){//获取服务端响应数据FetchResponse<Records> response =(FetchResponse<Records>) resp.responseBody();FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());if(handler ==null){
log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
fetchTarget.id());return;}if(!handler.handleResponse(response)){return;}Set<TopicPartition> partitions =newHashSet<>(response.responseData().keySet());FetchResponseMetricAggregator metricAggregator =newFetchResponseMetricAggregator(sensors, partitions);for(Map.Entry<TopicPartition,FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()){TopicPartition partition = entry.getKey();long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;FetchResponse.PartitionData fetchData = entry.getValue();
log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
isolationLevel, fetchOffset, partition, fetchData);//将数据以分区维度,加入到消息队列里
completedFetches.add(newCompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));}
sensors.fetchLatency.record(resp.requestLatencyMs());}//获取服务端消息失败回调@OverridepublicvoidonFailure(RuntimeException e){FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());if(handler !=null){
handler.handleError(e);}}});}return fetchRequestMap.size();}
把数据按分区封装好后,便进行数据处理(一次处理最大条数默认500条数据)
publicMap<TopicPartition,List<ConsumerRecord<K,V>>>fetchedRecords(){Map<TopicPartition,List<ConsumerRecord<K,V>>> fetched =newHashMap<>();//一次处理的最大条数,默认500条int recordsRemaining = maxPollRecords;try{//循环处理while(recordsRemaining >0){if(nextInLineRecords ==null|| nextInLineRecords.isFetched){//从缓存(数据队列)中获取数据CompletedFetch completedFetch = completedFetches.peek();//缓存中数据为null,直接跳出循环if(completedFetch ==null)break;try{
nextInLineRecords =parseCompletedFetch(completedFetch);}catch(Exception e){FetchResponse.PartitionData partition = completedFetch.partitionData;if(fetched.isEmpty()&&(partition.records ==null|| partition.records.sizeInBytes()==0)){
completedFetches.poll();}throw e;}//从缓存中拉取数据
completedFetches.poll();}else{List<ConsumerRecord<K,V>> records =fetchRecords(nextInLineRecords, recordsRemaining);TopicPartition partition = nextInLineRecords.partition;if(!records.isEmpty()){List<ConsumerRecord<K,V>> currentRecords = fetched.get(partition);if(currentRecords ==null){
fetched.put(partition, records);}else{List<ConsumerRecord<K,V>> newRecords =newArrayList<>(records.size()+ currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);}
recordsRemaining -= records.size();}}}}catch(KafkaException e){if(fetched.isEmpty())throw e;}return fetched;}
(3)拦截器处理数据
publicConsumerRecords<K,V>onConsume(ConsumerRecords<K,V> records){ConsumerRecords<K,V> interceptRecords = records;//循环所有拦截器,处理数据for(ConsumerInterceptor<K,V> interceptor :this.interceptors){try{
interceptRecords = interceptor.onConsume(interceptRecords);}catch(Exception e){// do not propagate interceptor exception, log and continue calling other interceptors
log.warn("Error executing interceptor onConsume callback", e);}}return interceptRecords;}
朱忠华:深入理解kafka:核心设计与实现原理
尚硅谷kafka教学文档
博主微信,欢迎交流:jhy2496085873
版权归原作者 do__something 所有, 如有侵权,请联系我们删除。