1、代码
package com.zsh.kafkatest.topic;
import com.zsh.kafkatest.connect.KafkaConnection;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;
import java.util.*;
import java.util.concurrent.ExecutionException;
/**
* @Author ZhaoShuHao
* @Date 2023/7/21 16:58
*/
public class GetTopicAboutDatasource {
public static void main(String[] args) {
String kafka = "192.168.140.65:9092";
String[] kafkas = kafka.split(";");
for(int i=0;i<kafkas.length;i++){
String[] _kafka = kafkas[i].split(":");
if(_kafka.length<2){
System.out.println("有个地址缺少IP或端口,获取topic失败");
}
}
List<Map<String,String>> tVos = new ArrayList<>();
List<String> list = new ArrayList<>();
AdminClient adminiClient = KafkaConnection.kafkaTestConnection(kafka);
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult topicsResult = adminiClient.listTopics(options);
try {
Set<String> topicNames = topicsResult.names().get();
Iterator it = topicNames.iterator();
while (it.hasNext()){
Map<String,String> map = new HashMap<>();
String topicName = it.next().toString();
Map topicInfo = getTopicInfo(kafka,topicName);
map.put("tableName",topicName);
map.put("issame","0");
map.put("Partitions", String.valueOf(topicInfo.get("Partitions")));
map.put("PartitionSize", String.valueOf(topicInfo.get("PartitionSize")));
map.put("ReplicationFactor", String.valueOf(topicInfo.get("ReplicationFactor")));
tVos.add(map);
}
} catch (Exception e) {
System.out.println("获取topic失败");
}finally {
KafkaConnection.close(adminiClient);
}
System.out.println("所有信息查询成功tVos:"+tVos);
tVos.stream().forEach(maptopic -> {
System.out.println("————————————————————————————————————");
System.out.println("topic主题名称:"+maptopic.get("tableName"));
maptopic.get("issame");
maptopic.get("Partitions");
System.out.println("topic分区信息:"+maptopic.get("Partitions"));
maptopic.get("PartitionSize");
System.out.println("topic分区数量:"+maptopic.get("PartitionSize"));
maptopic.get("ReplicationFactor");
System.out.println("topic副本数量:"+maptopic.get("ReplicationFactor"));
System.out.println("————————————————————————————————————");
});
}
//获取topic的详细信息
public static Map getTopicInfo(String ipAndPort,String topic){
Map<String, Object> map = new HashMap<>();
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ipAndPort);
AdminClient adminClient = AdminClient.create(props);
String topicName = topic;
DescribeTopicsOptions describeTopicsOptions = new DescribeTopicsOptions().timeoutMs(5000);
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(topicName), describeTopicsOptions);
KafkaFuture<TopicDescription> topicDescriptionFuture = describeTopicsResult.values().get(topicName);
try {
TopicDescription topicDescription = topicDescriptionFuture.get();
List<TopicPartitionInfo> partitions = topicDescription.partitions();
int replicationFactor = partitions.get(0).replicas().size();
map.put("Partitions", partitions);
map.put("PartitionSize", partitions.size());
map.put("ReplicationFactor", replicationFactor);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return map;
}
}
2、结果
本文转载自: https://blog.csdn.net/ZhShH0413/article/details/131980131
版权归原作者 要开心吖ZSH 所有, 如有侵权,请联系我们删除。
版权归原作者 要开心吖ZSH 所有, 如有侵权,请联系我们删除。