目标
通过指定主题和消费者组调用方法,实时查看主题下分区消息的消费情况(消息总数量、消费消息数量、未消费的消息数量)。
工具类
package com.utils.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
public class KafkaConsumeLagMonitorUtils {
//主题
public static final String TOPIC_NAME = "topicName";
//消费者组
public static final String GROUP_ID_CONFIG = "groupId";
//如果是集群,则用逗号分隔。
public static final String KAFKA_BROKER_LIST = "127.0.0.1:6667";
public static Properties getConsumeProperties(String groupId, String bootstrapServer) {
Properties props = new Properties();
props.put("group.id", groupId);
props.put("bootstrap.servers", bootstrapServer);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
public List<Map<String, Object>> topicAndPartitionDetails(
String bootstrapServer,
String groupId,
String topic
) {
List<Map<String, Object>> result = new ArrayList<>();
Map<Integer, Long> endOffsetMap = new HashMap<>();
Map<Integer, Long> commitOffsetMap = new HashMap<>();
Properties consumeProps = getConsumeProperties(groupId, bootstrapServer);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumeProps);
try {
List<TopicPartition> topicPartitions = new ArrayList<>();
List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionsFor) {
TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
topicPartitions.add(topicPartition);
}
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
for (TopicPartition partitionInfo : endOffsets.keySet()) {
endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));
}
for (Integer partitionId : endOffsetMap.keySet()) {
System.out.println(String.format("at %s, topic:%s, partition:%s, logSize:%s", System.currentTimeMillis(), topic, partitionId, endOffsetMap.get(partitionId)));
}
//查询消费偏移量
for (TopicPartition topicAndPartition : topicPartitions) {
OffsetAndMetadata committed = consumer.committed(topicAndPartition);
commitOffsetMap.put(topicAndPartition.partition(), committed.offset());
}
//累加lag
long lagSum = 0l;
if (endOffsetMap.size() == commitOffsetMap.size()) {
for (Integer partition : endOffsetMap.keySet()) {
long endOffset = endOffsetMap.get(partition);
long commitOffset = commitOffsetMap.get(partition);
long diffOffset = endOffset - commitOffset;
lagSum += diffOffset;
HashMap<String, Object> partitionMap = new HashMap<>();
//主题
partitionMap.put("topic",topic);
//消费者组
partitionMap.put("groupId",groupId);
//分区
partitionMap.put("partition",partition);
//最后的偏移量
partitionMap.put("endOffset",endOffset);
//提交的偏移量
partitionMap.put("commitOffset",commitOffset);
//积压的消息
partitionMap.put("diffOffset",diffOffset);
result.add(partitionMap);
}
} else {
System.out.println(topic+"主题的分区丢失。");
}
} finally {
if (consumer != null) {
consumer.close();
}
}
return result;
}
public static void main(String[] args) {
List<Map<String, Object>> list = new KafkaConsumeLagMonitorUtils().topicAndPartitionDetails(
KAFKA_BROKER_LIST,
GROUP_ID_CONFIG,
TOPIC_NAME
);
for (Map<String, Object> map : list) {
map.forEach((k, v) -> {
System.out.println(k + "=" + v);
});
System.out.println("========================");
}
}
}
批量监控
package com.utils.kafka;
import java.util.*;
public class KafkaConsumeLagMonitor {
//kafkaIP和端口
public static final String KAFKA_BROKER_LIST ="127.0.0.1:6667";
static List<Map<String, Object>> topicList = new ArrayList<>();
//这里我监控了两个主题
static {
//大气
Map<String, Object> airMap = new HashMap<>();
airMap.put("topic", "air");
airMap.put("groupId", "air_minute_group");
topicList.add(airMap);
//水
Map<String, Object> waterMap = new HashMap<>();
waterMap.put("topic", "water");
waterMap.put("groupId", "water_minute_group");
topicList.add(waterMap);
}
/**
* 只要有一个分区的消息积压数量>lagLimit,则中断方法,直接预警。
* @param lagLimit 消息积压预警数量
* @return
*/
public static String isLazy(long lagLimit) {
for (Map<String, Object> map : topicList) {
List<Map<String, Object>> list = new KafkaConsumeLagMonitorUtils().topicAndPartitionDetails(
KAFKA_BROKER_LIST,
map.get("groupId").toString(),
map.get("topic").toString()
);
for (Map<String, Object> partitionItem : list) {
Set<String> keySet = partitionItem.keySet();
for (String k : keySet) {
Object v=partitionItem.get(k);
System.out.println(k + "=" + v);
if ("diffOffset".equals(k) && Long.parseLong(v.toString()) > lagLimit) {
String warnMsg = map.get("topic").toString() + "主题消息积压,分区" + partitionItem.get("partition") + "积压消息" + v + "条。";
return warnMsg;
}
}
System.out.println("========================");
}
}
return null;
}
public static void main(String[] args) {
String lazy = isLazy(1000);
System.out.println(lazy);
}
}
本文转载自: https://blog.csdn.net/qq_39706570/article/details/128844060
版权归原作者 我的身前一尺是我的世界 所有, 如有侵权,请联系我们删除。
版权归原作者 我的身前一尺是我的世界 所有, 如有侵权,请联系我们删除。