0


使用JavaApi获取Kafka的topic、topic的分区数量与副本数量

1、代码

  1. package com.zsh.kafkatest.topic;
  2. import com.zsh.kafkatest.connect.KafkaConnection;
  3. import org.apache.kafka.clients.admin.*;
  4. import org.apache.kafka.common.KafkaFuture;
  5. import org.apache.kafka.common.TopicPartitionInfo;
  6. import java.util.*;
  7. import java.util.concurrent.ExecutionException;
  8. /**
  9. * @Author ZhaoShuHao
  10. * @Date 2023/7/21 16:58
  11. */
  12. public class GetTopicAboutDatasource {
  13. public static void main(String[] args) {
  14. String kafka = "192.168.140.65:9092";
  15. String[] kafkas = kafka.split(";");
  16. for(int i=0;i<kafkas.length;i++){
  17. String[] _kafka = kafkas[i].split(":");
  18. if(_kafka.length<2){
  19. System.out.println("有个地址缺少IP或端口,获取topic失败");
  20. }
  21. }
  22. List<Map<String,String>> tVos = new ArrayList<>();
  23. List<String> list = new ArrayList<>();
  24. AdminClient adminiClient = KafkaConnection.kafkaTestConnection(kafka);
  25. ListTopicsOptions options = new ListTopicsOptions();
  26. options.listInternal(true);
  27. ListTopicsResult topicsResult = adminiClient.listTopics(options);
  28. try {
  29. Set<String> topicNames = topicsResult.names().get();
  30. Iterator it = topicNames.iterator();
  31. while (it.hasNext()){
  32. Map<String,String> map = new HashMap<>();
  33. String topicName = it.next().toString();
  34. Map topicInfo = getTopicInfo(kafka,topicName);
  35. map.put("tableName",topicName);
  36. map.put("issame","0");
  37. map.put("Partitions", String.valueOf(topicInfo.get("Partitions")));
  38. map.put("PartitionSize", String.valueOf(topicInfo.get("PartitionSize")));
  39. map.put("ReplicationFactor", String.valueOf(topicInfo.get("ReplicationFactor")));
  40. tVos.add(map);
  41. }
  42. } catch (Exception e) {
  43. System.out.println("获取topic失败");
  44. }finally {
  45. KafkaConnection.close(adminiClient);
  46. }
  47. System.out.println("所有信息查询成功tVos:"+tVos);
  48. tVos.stream().forEach(maptopic -> {
  49. System.out.println("————————————————————————————————————");
  50. System.out.println("topic主题名称:"+maptopic.get("tableName"));
  51. maptopic.get("issame");
  52. maptopic.get("Partitions");
  53. System.out.println("topic分区信息:"+maptopic.get("Partitions"));
  54. maptopic.get("PartitionSize");
  55. System.out.println("topic分区数量:"+maptopic.get("PartitionSize"));
  56. maptopic.get("ReplicationFactor");
  57. System.out.println("topic副本数量:"+maptopic.get("ReplicationFactor"));
  58. System.out.println("————————————————————————————————————");
  59. });
  60. }
  61. //获取topic的详细信息
  62. public static Map getTopicInfo(String ipAndPort,String topic){
  63. Map<String, Object> map = new HashMap<>();
  64. Properties props = new Properties();
  65. props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ipAndPort);
  66. AdminClient adminClient = AdminClient.create(props);
  67. String topicName = topic;
  68. DescribeTopicsOptions describeTopicsOptions = new DescribeTopicsOptions().timeoutMs(5000);
  69. DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(topicName), describeTopicsOptions);
  70. KafkaFuture<TopicDescription> topicDescriptionFuture = describeTopicsResult.values().get(topicName);
  71. try {
  72. TopicDescription topicDescription = topicDescriptionFuture.get();
  73. List<TopicPartitionInfo> partitions = topicDescription.partitions();
  74. int replicationFactor = partitions.get(0).replicas().size();
  75. map.put("Partitions", partitions);
  76. map.put("PartitionSize", partitions.size());
  77. map.put("ReplicationFactor", replicationFactor);
  78. } catch (InterruptedException | ExecutionException e) {
  79. e.printStackTrace();
  80. }
  81. return map;
  82. }
  83. }

2、结果

标签: kafka 大数据 后端

本文转载自: https://blog.csdn.net/ZhShH0413/article/details/131980131
版权归原作者 要开心吖ZSH 所有, 如有侵权,请联系我们删除。

“使用JavaApi获取Kafka的topic、topic的分区数量与副本数量”的评论:

还没有评论