文章目录
实现
packagecom.artisan.bootkafka.controller;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.PartitionInfo;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.util.*;publicclassTopicBacklog{publicstaticintgetTotalBacklog(String topic){// Kafka客户端配置Properties props =newProperties();
props.put("bootstrap.servers","ip:port");
props.put("group.id","attack-consumer");
props.put("key.deserializer",StringDeserializer.class.getName());
props.put("value.deserializer",StringDeserializer.class.getName());// 创建KafkaConsumerKafkaConsumer<String,String> consumer =newKafkaConsumer<>(props);// 订阅要查询的主题List<PartitionInfo> partitions = consumer.partitionsFor(topic);List<TopicPartition> topicPartitions =newArrayList<>();for(PartitionInfo partition : partitions){
topicPartitions.add(newTopicPartition(partition.topic(), partition.partition()));}// 手动分配分区
consumer.assign(topicPartitions);// 记录未消费消息总数int totalBacklog =0;// 遍历每个分区获取其未消费消息数并累加for(PartitionInfo partition : partitions){TopicPartition tp =newTopicPartition(partition.topic(), partition.partition());// 获取消费者的当前偏移量long latestOffset = consumer.position(tp);long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);int backlog =Math.toIntExact(endOffset - latestOffset);
totalBacklog += backlog;}// 返回未消费消息总数return totalBacklog;}publicstaticMap<String,Integer>getAllTopicsBacklog(){// Kafka客户端配置Properties props =newProperties();
props.put("bootstrap.servers","ip:port");
props.put("group.id","attack-consumer");
props.put("key.deserializer",StringDeserializer.class.getName());
props.put("value.deserializer",StringDeserializer.class.getName());KafkaConsumer<String,String> consumer =newKafkaConsumer<>(props);// 获取所有主题列表Map<String,List<PartitionInfo>> topicMap = consumer.listTopics();// 记录每个主题未消费消息总数Map<String,Integer> backlogMap =newHashMap<>();// 遍历每个主题,计算其未消费消息数for(String topic : topicMap.keySet()){// 订阅要查询的主题List<PartitionInfo> partitions = consumer.partitionsFor(topic);List<TopicPartition> topicPartitions =newArrayList<>();for(PartitionInfo partition : partitions){
topicPartitions.add(newTopicPartition(partition.topic(), partition.partition()));}// 手动分配分区
consumer.assign(topicPartitions);int backlog =0;for(PartitionInfo partition : partitions){TopicPartition tp =newTopicPartition(partition.topic(), partition.partition());long latestOffset = consumer.position(tp);long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
backlog +=Math.toIntExact(endOffset - latestOffset);}
backlogMap.put(topic, backlog);}// 返回每个主题未消费消息总数return backlogMap;}publicstaticvoidmain(String[] args){int backlog =getTotalBacklog("topic-test");System.out.println(backlog);getAllTopicsBacklog().forEach((topic, backlogCount)->System.out.println(topic +" - "+ backlogCount));}}
核对一下,23 。
有2个方法,第二个方法
Map<String, Integer> getAllTopicsBacklog()
虽然会返回所有的Topic 的积压量,但只有 对应的 消费组的数据是准确的。
本文转载自: https://blog.csdn.net/yangshangwei/article/details/130528939
版权归原作者 小小工匠 所有, 如有侵权,请联系我们删除。
版权归原作者 小小工匠 所有, 如有侵权,请联系我们删除。