KafkaConsumer 指定消费位置的基础
一个 Topic 对应着磁盘上的几个重要的文件:
- .log:数据文件,存储了该 topic 中的所有消息。
- .index:索引文件,对数据文件中的消息进行索引。关键就是可以按照 offset 来索引
- .timeindex:时间索引文件,类似于索引文件,但按照消息的时间戳进行索引。它保存着 timestamp-offset 的关系,所以可以使用时间戳找到对应的 offset ,然后再找到对应的 offset 位置,这样就能找到对应的文件开始的位置了。
- leader-epoch-checkpoint:该文件的职责是存储leader对于topic的进度,以便在leader重启时可以重新定位到上一次成功提交的offset位置。
在我们探讨的功能中,index和timeindex两个文件扮演着关键角色,它们与消费者开始消费的位置密切相关。这是我们今天要讲解的核心基础,没有这两个文件,我们无法进行后续的操作。
具体的做法
方法名用法业务意义subscribe()consumer.subscribe(Arrays.asList(“topic1”, “topic2”));订阅一个或多个主题,按照策略指定当前消费者实例消费那个分区的数据assign()TopicPartition partition0 = new TopicPartition(“test”, 0);
consumer.assign(Arrays.asList(partition0));指定分配的TopicPartition,手工指定当前消费者实例消费那个分区的数据assignment()Set partitions = consumer.assignment();获取当前已分配的TopicPartition列表beginningOffsets()Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(Arrays.asList(partition0));获取指定TopicPartition的起始偏移量信息
说明:
subscribe()
和assign()
两个方法的使用是互斥的,只能使用其中之一。assignment()
方法只能在 consumer 已经订阅主题或手动分配分区之后使用,否则返回空列表。
在执行
subscribe()
和
assign()
之后,
assignment()
才能获取到偏移的位置,否返回一个 size() = 0 的 map。更加让人不能接收的是执行
subscribe()
之后,仍然
assignment()
不能获取 TopicPartition 信息。需要在执行了 poll() 方法之后才行。这就比较扯了。我本来想指定一个开始位置,但是还没有指定位置呢?先让我消费出来点数据,这不是多余吗?
在 FlinkKafkaConsumer 类中,使用了
assign()
+
seek
的方式,指定了消费者的消费位置,这样以来,
__consumer_offsets
就有用了,如果 FlinkKafkaConsumer 不是从 checkpoint 消费,则可以指定 Consumer 的 startMap 模式,在不同的模式下,可以从最早、最新、上次消费的位置来开始消费。
FlinkKafkaConsumer 中的 startMode 参数表示消费者的启动模式,具体有以下几种:
startMode中文名称业务意义GROUP_OFFSETS消费者组偏移量从消费者组中最近提交的偏移量开始消费,保证消费者组的偏移量和Kafka中的主题分区的偏移量保持一致,避免出现重复消费或者漏消费的情况EARLIEST最早的从最早的消息开始消费,即消费者会从最早的偏移量开始消费Kafka中的消息LATEST最新的从最新的消息开始消费,即消费者会从最新的偏移量开始消费Kafka中的消息TIMESTAMP时间戳从指定的时间戳开始消费Kafka中的消息SPECIFIC_OFFSETS指定偏移量从指定的偏移量开始消费Kafka中的消息
一般情况下,我们会使用 GROUP_OFFSETS 模式来消费 Kafka 中的消息,因为这样可以保证消费者组的偏移量和 Kafka 中的主题分区的偏移量保持一致,避免出现重复消费或者漏消费的情况。如果需要从最早或者最新的消息开始消费,可以选择 EARLIEST 或者 LATEST 模式。如果需要从指定的时间戳或者偏移量开始消费,可以选择 TIMESTAMP 或者 SPECIFIC_OFFSETS 模式。
关键代码
以下是加上注释的代码:
importorg.apache.kafka.clients.consumer.*;importorg.apache.kafka.common.PartitionInfo;importorg.apache.kafka.common.TopicPartition;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.time.Duration;importjava.util.*;importjava.util.stream.Collectors;publicclassKafkaConsumerAt{privatestaticLogger logger =LoggerFactory.getLogger(KafkaConsumerAt.class);publicstaticvoidmain(String[] args)throwsInterruptedException{Properties p =newProperties();
p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"your.kafka.address:9092");
p.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test-text-001");
p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"5");// 设置一次拉取的最大记录数
p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(p);String topic ="test_create_timestamp";// 获取指定主题的所有分区信息List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);TopicPartition topicPartition =null;Map<TopicPartition,Long> tpl =newHashMap<>();for(PartitionInfo partitionInfo : partitionInfos){
topicPartition =newTopicPartition(partitionInfo.topic(), partitionInfo.partition());
tpl.put(topicPartition ,0L);// 将所有分区的起始偏移量设置为0}List<TopicPartition> topicPartitions =Collections.singletonList(topicPartition);// 分配consumer消费者到指定分区进行消费,这里是最后一个分区
consumer.assign(topicPartitions);
tpl.forEach((key,value)->{// 将所有分区的偏移量设置为起始偏移量,因为 value 的值在上一步中已经设置为 0 了。
consumer.seek(key , value);});ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1L));// 消费消息for(ConsumerRecord<String,String> record : records){
logger.info("key:{} value:{} offset:{} timestamp:{} timestampType:{}", record.key(), record.value(), record.offset(), record.timestamp(), record.timestampType());// 输出消费的消息}
consumer.close();// 关闭消费者}}
KafkaConsumer#partitionsFor方法用于获取指定主题的所有分区信息。它会返回一个List,每个PartitionInfo对象包含分区的详细信息,比如分区编号、分区的主题、分区的副本列表等等。
以下是一个简单的使用案例:
Properties props =newProperties();
props.setProperty("bootstrap.servers","localhost:9092");
props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> consumer =newKafkaConsumer<>(props);List<PartitionInfo> partitions = consumer.partitionsFor("my-topic");for(PartitionInfo partition : partitions){System.out.println(partition);}
在上面的示例中,我们首先创建了一个KafkaConsumer对象,并设置了所需的属性。然后调用partitionsFor方法获取名为"my-topic"的主题的分区信息,最后打印出每个分区的详细信息。
KafkaConsumer#seek方法用于将消费者的偏移量(offset)设置到指定的分区和偏移量。它可以用于重置消费者在某个分区的消费进度,例如重新开始消费、跳过某些消息或者定位到特定的偏移量处继续消费等。
以下是一个简单的使用案例:
Properties props =newProperties();
props.setProperty("bootstrap.servers","localhost:9092");
props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> consumer =newKafkaConsumer<>(props);TopicPartition partition =newTopicPartition("my-topic",0);// 设置要操作的分区
consumer.assign(Arrays.asList(partition));// 订阅指定分区
consumer.seek(partition,100);// 将偏移量设置为100ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));// 消费消息for(ConsumerRecord<String,String> record : records){System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
在上面的示例中,我们首先创建了一个KafkaConsumer对象,并设置了所需的属性。然后订阅名为"my-topic"的主题中的分区0,并将偏移量设置为100。最后调用poll方法消费消息,并打印出每条消费的消息的偏移量、键和值。
从
.timeindex
文件,可以由 timestamp 找到对应的 offset 。
所以把 KafkaConsumer 提供给我们了
offsetsForTimes(Map<TopicPartition,Long>)
此方法可以根据指定的 partition 的 timestamp 找到对应的 offset , 然后使用 seek() 来指定开始消费的位置。
下面是代码:
publicclassConsumerFromTimestamp{publicstaticMap<TopicPartition,Long>fetchOffsetsWithTimestamp(Collection<KafkaTopicPartition> partitions ,long timestamp ,Properties properties){Map<TopicPartition,Long> partitionOffsetsRequest =newHashMap(partitions.size());Map<TopicPartition,Long> rs =newHashMap<>();for(KafkaTopicPartition partition : partitions){
partitionOffsetsRequest.put(newTopicPartition(partition.topic(),partition.partition()),timestamp);}KafkaConsumer<?,?> consumer =newKafkaConsumer(properties);Set<TopicPartition,OffsetAndTimestamp> offsets = consumer.offsetsForTimes(partitionOffsetsRequest);for(Entry<TopicPartition,OffsetAndTimestamp> partitionToOffset : offsets.entrySet()){
rs.put(newKafkaTopicPartition(((TopicPartition)partitionToOffset.getKey()).topic(),((TopicPartition)partitionToOffset.getKey()).partition()), partitionToOffset.getValue()==null?null:((OffsetAndTimestamp)partitionToOffset.getValue()).offset());}return rs ;}}
版权归原作者 bluedraam_pp 所有, 如有侵权,请联系我们删除。