Kafka+SpringBoot 入门案例1
pom文件
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
创建连接Kafka服务
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG
,"192.168.109.140:9092");
adminClient = AdminClient.create(properties);
创建主题
NewTopic newTopic = new NewTopic(topicName,partitionNum,replicationNum);
CreateTopicsResult createTopicsResult =
adminClient.createTopics(Arrays.asList(newTopic));
//同步创建主题
createTopicsResult.all().get();
删除主题
DeleteTopicsResult deleteTopicsResult = adminClient
.deleteTopics(Arrays.asList(topicName));
deleteTopicsResult.all().get();
查看主题列表
ListTopicsResult result = adminClient.listTopics();
Set<String> topics = result.names().get();
topics.stream().forEach(t->{
System.out.println(t);
});
查看主题的详细信息
DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList(name));
Map<String,TopicDescription> map = result.all().get();
Set<String> keys = map.keySet();
keys.stream().forEach(key->{
TopicDescription description = map.get(key);
List<TopicPartitionInfo> list = description.partitions();
list.stream().forEach(t->{
System.out.println("topicName:"+key);
System.out.println("分区数:"+t.partition());
System.out.println("副本数:"+t.replicas().size());
});
});
修改主题的分区数量
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(name,NewPartitions.increaseTo(num));
CreatePartitionsResult createPartitionsResult= adminClient.createPartitions(newPartitions);
createPartitionsResult.all().get();
标签:
kafka
spring boot
本文转载自: https://blog.csdn.net/weixin_38380811/article/details/127548570
版权归原作者 weixin_38380811 所有, 如有侵权,请联系我们删除。
版权归原作者 weixin_38380811 所有, 如有侵权,请联系我们删除。