生产者和消费者是Kafka的核心概念之一,它们在客户端被创建和使用,并且包含了许多与Kafka性能和机制相关的配置。虽然Kafka提供的命令行工具能够执行许多基本操作,但它无法实现所有可能的性能优化。相比之下,使用Java API可以充分利用编程语言的灵活性,对生产者和消费者进行更精细的性能调优。对于大多数中间件,熟悉服务器的命令行操作可能足以帮助学习其API的使用。然而,Kafka则不同,要全面掌握Kafka的所有特性,必须系统地学习和理解其Java API。
前置条件
在pom文件中引入如下依赖
<dependencies>
<!-- Kafka客户端依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.0</version> <!-- 请根据需要选择Kafka版本 -->
</dependency>
</dependencies>
1. 基础消费流程
在javaApi中可以通过创建一个Kafka生产者和消费者的配置对象,在new生产者或消费者的类时将配置对象传入,然后生产者实例通过调用send方法发送数据,消费者通过poll方法消费数据,数据需要通过ProducerRecords类封装key和value,并在生产者和消费者配置中为key和value指定序列化和反序列化类(key可以传null,key是在日志回收策略中发挥作用)。经过这样一套操作,消息就可以成功从生产者发往消费者。
package com.kafak.testkafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.Duration;
import java.util.Properties;
@SpringBootTest
class TestKafkaApplicationTests {
//预定义Kafka对象实例,因为Kafka对象时线程安全,所以可以定义外面节省资源防止重复创建
KafkaProducer<String, String> kafkaProducer;
//创建生产者
public KafkaProducer<String, String> getKafkaProducer() {
//创建生产者配置
Properties props = new Properties();
//配置Kafka集群地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
//配置序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//返回生产者
return new KafkaProducer<String, String>(props);
}
//创建消费者
public KafkaConsumer<String, String> getKafkaConsumer() {
//创建消费者配置
Properties props = new Properties();
//配置Kafka集群地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
//配置消费者组id
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
//配置反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//返回消费者
return new KafkaConsumer<>(props);
}
//通过生产者生产一百条数据
@Test
void kafkaProducerTest() {
//获取生产者
kafkaProducer = getKafkaProducer();
//发送消息
for (int i = 0; i < 100; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("topicJava", "testKey" + i, "testValue" + i));
}
// 强制刷新缓冲区,确保所有消息都发送出去
kafkaProducer.flush();
// 关闭生产者
kafkaProducer.close();
}
//通过消费者消费消息
@Test
void kafkaConsumerTest() {
//创建消费者,由于消费者是线程不安全,所以使用一次实例化一次,可以方式出现线程安全问题
KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
//接受消费者信息,传入100毫秒,消费者会一百毫秒拉去一次消息
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
// 处理消息
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed message with key %s, value %s, from partition %d with offset %d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
}
2. 消息确认
消息确认的原理性知识可以通过下面这篇文章学习,这里主要讲实操。
Kafka运行机制(二):消息确认,消息日志的存储和回收https://blog.csdn.net/dxh9231028/article/details/141329851?spm=1001.2014.3001.5501
生产者端
生产者端的消息确认策略由acks配置项控制,其由三种配置方式,其中我在下面这篇文章中详细讲述了相关知识。我们可以通过javaApi配置acks来控制确认策略。
生产者端的消息确认有同步和异步两种方式。
- 同步消息确认:同步消息确认是生产者实例在调用send方法后紧接着调用get方法,该方法会阻塞线程的继续执行,等待消息发送结果。当消息发送失败时,如果配置了重试机制(通过设置
retries
属性),生产者会自动重试指定的次数。如果在所有重试尝试后仍然失败,最终会抛出异常,通知调用方消息发送失败。 - 异步消息确认:异步消息确认通过生产者实例调用send方法时传入,第二个回调函数的参数。在成功或失败响应时,执行回调函数,异步消息确认不会阻塞代码,也不会触发自动重试。
消费者端
消费者消费成功在客户端的体现是成功获取到了数据,这本没有什么好说的,不过消费者不仅需要响应客户端数据,还要讲偏移量发送给Kafka,在这一过程中,消费者提供了手动提交和自动提交两种方式。启动自动提交是默认开启的,而手动提交则需要配置enable.auto.commit为false,然后通过创建分区和偏移量的映射关系,通过消费者的commit方法提交偏移量。
代码实现
下面代码中,我创建了四个测试单元,其中前两个测试单元,分别是生产者同步提交和异步提交,而后两个测试单元分别时消费者的自动提交和手动提交。
package com.kafak.testkafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@SpringBootTest
class TestKafkaApplicationTests {
//预定义Kafka对象实例,因为Kafka对象时线程安全,所以可以定义外面节省资源防止重复创建
KafkaProducer<String, String> kafkaProducer;
//创建生产者
public KafkaProducer<String, String> getKafkaProducer() {
//创建生产者配置
Properties props = new Properties();
//配置消息确认策略
props.put(ProducerConfig.ACKS_CONFIG, "all");
//配置重试次数
props.put(ProducerConfig.RETRIES_CONFIG,3);
//配置Kafka集群地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
//配置序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//返回生产者
return new KafkaProducer<String, String>(props);
}
//创建消费者
public KafkaConsumer<String, String> getKafkaConsumer(Boolean isAutoCommit) {
//创建消费者配置
Properties props = new Properties();
//配置Kafka集群地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
//配置消费者组id
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
//配置反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//判断当前消费者是否开启自动提交
if (!isAutoCommit) {
//关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
}else{
//设置自动提交间隔时间1s
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
}
//返回消费者
return new KafkaConsumer<>(props);
}
//生产者同步确认
@Test
void kafkaProducerGetTest() {
if(kafkaProducer == null) {
kafkaProducer = getKafkaProducer();
}
//同步确认消息是否发送成功
for (int i = 0; i < 100; i++) {
try{
RecordMetadata topicJava = kafkaProducer.send(new ProducerRecord<String, String>("topicJava", "testKey" + i, "testValue" + i)).get();
}catch (Exception e) {
e.printStackTrace();
}
}
kafkaProducer.close();
}
//生产者异步确认
@Test
void kafkaProduceSyncTest() {
if(kafkaProducer == null) {
kafkaProducer = getKafkaProducer();
}
//异步确认是否发送成功
for (int i = 0; i < 100; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("topicJava", "testKey" + i, "testValue" + i), (metadata, exception) -> {
if (exception == null) {
System.out.printf("发送消息成功, metadata=%s%n", metadata);
} else {
System.err.printf("发送消息失败, exception=%s%n", exception.getMessage());
}
});
}
kafkaProducer.close();
}
//消费者自动提交
@Test
void kafkaAutoCommitConsumerTest() {
//创建消费者开启自动提交
KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer(true);
//消费数据流程中无需负责偏移量提交
while (true) {
//接受消费者信息,传入100毫秒,消费者会一百毫秒拉去一次消息
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
//处理消息
for (ConsumerRecord<String, String> record : records) {
System.out.printf("消息消费成功, key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
}
//消费者手动提交
@Test
void kafkaSyncCommitConsumerTest() {
//创建消费者关闭自动提交
KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer(false);
//消费数据流程中需要在消费数据后,提交偏移量
while (true) {
//接受消费者信息,传入100毫秒,消费者会一百毫秒拉去一次消息
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
// 处理消息
for (ConsumerRecord<String, String> record : records) {
System.out.printf("消息消费成功, key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
//创建分区和偏移量的映射类
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
//讲分区和偏移量的数据存入映射类
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
//偏移量提交
kafkaConsumer.commitSync(offsets);
}
}
}
}
3. 批处理
批处理在生产者端,和消费者端也有不同的实现。我在Kakfa基本概念一文中清楚的讲解了批处理的概念,文章如下
Kafka基本概念https://blog.csdn.net/dxh9231028/article/details/141270920?spm=1001.2014.3001.5501
生产者
在生产者端,生产者实例的send方法会发送消息到缓冲区中,而缓冲区消息何时发送给Kafka集群,则是通过配置batch.size和linger.ms配置,来实现当缓冲区存入多少消息,和距离上一次发送消息多久后,来发送这一轮缓冲区的消息到Kafka集群,代码实现如下
//创建生产者
public KafkaProducer<String, String> getKafkaProducer() {
//创建生产者配置
Properties props = new Properties();
//配置生产者批处理
//缓冲区大小最大为16384比特
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//距离上次发送消息时间隔3s
props.put(ProducerConfig.LINGER_MS_CONFIG,"3000");
//配置消息确认策略
props.put(ProducerConfig.ACKS_CONFIG, "all");
//配置重试次数
props.put(ProducerConfig.RETRIES_CONFIG,3);
//配置Kafka集群地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
//配置序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//返回生产者
return new KafkaProducer<String, String>(props);
}
消费者
消费者端批处理,消费者在拉去消息时,会在fetch.max.bytes,max.partition.fetch.bytes和max.poll.records三个配置项,以及传入poll方法的超时时间参数的限制下,尽可能多的拉取更多消息。
- fetch.max.bytes:fetch.max.bytes 是指 Kafka 消费者单次从服务器拉取数据时能够获取的最大字节数。这是全局的上限,控制每次 poll() 操作可以拉取的数据量总和。 默认值:50MB(即 52428800 字节)。
- max.partition.fetch.bytes:max.partition.fetch.bytes 是指 Kafka 消费者从单个分区拉取消息时能获取的最大字节数,当消费者数量少于主题分区数量时,一个消费者可能会负责多个分区。默认值:1MB(即 1048576 字节)。
- max.poll.records:max.poll.records 是指 Kafka 消费者每次调用 poll() 方法时能够拉取的最大消息条数。 默认值:500 条消息。
代码实现如下
//创建消费者
public KafkaConsumer<String, String> getKafkaConsumer(Boolean isAutoCommit) {
//创建消费者配置
Properties props = new Properties();
//消费者批处理相关配置
//消费缓冲区大小,也就是一次消费最多能消费多少比特消息
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,16384);
//一次消费一个分区最多能消费多少比特
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,8192);
//一次消费最多能消费多少条数据
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1000);
//配置Kafka集群地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
//配置消费者组id
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
//配置反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//判断当前消费者是否开启自动提交
if (!isAutoCommit) {
//关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
}else{
//设置自动提交间隔时间1s
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
}
//返回消费者
return new KafkaConsumer<>(props);
}
4. 事务操作
Kafka驱动支持事务操作,允许许生产者在多个主题和分区上以原子方式写入消息。这意味着你可以确保一组消息要么全部成功写入Kafka,要么全部失败。
事务操作首先通过生产者实例调用生产者实例的initTransactions方法,向kafka集群申请一个映射当前生产者的事务Id,然后就可以通过调用生产者实例的beginTransaction方法,开启一个事务,进行消息发送,最终通过调用commitTransaction方法完成事务的提交,如果中途发生异常则通过abortTransaction对当前事务进行回滚,代码实例如下
package com.kafak.testkafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@SpringBootTest
class TestKafkaApplicationTests {
//预定义Kafka对象实例,因为Kafka对象时线程安全,所以可以定义外面节省资源防止重复创建
KafkaProducer<String, String> kafkaProducer;
//创建生产者
public KafkaProducer<String, String> getKafkaProducer() {
//创建生产者配置
Properties props = new Properties();
//配置生产者批处理
//缓冲区大小最大为16384比特
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//距离上次发送消息时间隔3s
props.put(ProducerConfig.LINGER_MS_CONFIG,"3000");
//配置消息确认策略
props.put(ProducerConfig.ACKS_CONFIG, "all");
//配置重试次数
props.put(ProducerConfig.RETRIES_CONFIG,3);
//配置Kafka集群地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
//配置序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//返回生产者
return new KafkaProducer<String, String>(props);
}
//测试事务
@Test
void kafkaProducerTransactionTest() {
if(kafkaProducer == null) {
kafkaProducer = getKafkaProducer();
}
kafkaProducer.initTransactions();
try{
kafkaProducer.beginTransaction();
//消息发送相关操作
for (int i = 0; i < 100; i++) {
try{
RecordMetadata topicJava = kafkaProducer.send(new ProducerRecord<String, String>("topicJava", "testKey" + i, "testValue" + i)).get();
}catch (Exception e) {
e.printStackTrace();
}
}
kafkaProducer.commitTransaction();
}catch (Exception e) {
e.printStackTrace();
kafkaProducer.abortTransaction();
}
kafkaProducer.close();
}
}
5. 自定义分区器
Kafka允许用户自定义分区器,实现特定的分区策略。可以通过实现Partitioner接口来创建自定义分区器。实现Partitioner接口需要实现三个方法,分别是partition,configure,close。
partition方法
partition方法是实现分区逻辑其的主要方法,其接受六个参数,分别是
- String topic:消息要发送到的Kafka主题名称
- Object key:消息的 key,可能为 null。
- byte[] keyBytes:序列化后的 key,可能为 null
- Object value:消息的 value,可以为任意对象
- byte[] valueBytes:序列化后的 value,可能为 null。
- Cluster cluster:Kafka集群的元数据信息,包括主题的分区数、每个分区的领导者等
partition方法的返回值则是发送分区的编号,通过这个机制可以实现不同逻辑的分区器。
configure方法
configuer方法在自定义分区类初始化时调用,当设计一些复杂操作,比如在发送消息前要和数据库交互时,可以在configure中完成数据库的连接。
close方法
close在分区逻辑执行完后调用,和configure一样,在复杂操作时,用于关闭分区逻辑中创建的连接,或一些内存资源等
假设我有一个三主机集群,其中30主机性能最好,31其次,32最差,我要通过自定义分区,将消息发送到三个分区的比例为3:2:1,通过Partitioner接口,可以简单的通过如下方式实现
package com.kafak.testkafka;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
//获取分区元数据
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(s);
//创建一个0-100的随机数
double num = Math.random() * 100;
//默认传递分区号
Integer finalPartition = 0;
for (PartitionInfo partitionInfo : partitionInfos) {
//获取分区的leader
Node leader = partitionInfo.leader();
//获取分区leader的ip和端口
String leaderAddress = leader.host() + ":" + leader.port(); // 生成 "host:port" 格式的字符串
//如果随机数在0-50之间,发送消息至192.168.142.30:9092
if (num < 50 && leaderAddress.equals("192.168.142.30:9092")) {
finalPartition = partitionInfo.partition();
break;
//如果随机数在50-82之间,发送消息至192.168.142.31:9092
} else if (num < 82 && num >= 50 && leaderAddress.equals("192.168.142.31:9092")) {
finalPartition = partitionInfo.partition();
break;
//如果随机数在82-100之间,发送消息至192.168.142.32:9092
} else if (num < 100 && num >= 82 && leaderAddress.equals("192.168.142.32:9092")) {
finalPartition = partitionInfo.partition();
break;
}
}
//返回最终分区号
return finalPartition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
在生产者配置中通过partitioner_class配置自定义分区器,代码如下
public KafkaProducer<String, String> getKafkaProducer() {
//创建生产者配置
Properties props = new Properties();
//启用自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafak.testkafka.CustomPartitioner");
//配置生产者批处理
//缓冲区大小最大为16384比特
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//距离上次发送消息时间隔3s
props.put(ProducerConfig.LINGER_MS_CONFIG,"3000");
//配置消息确认策略
props.put(ProducerConfig.ACKS_CONFIG, "all");
//配置重试次数
props.put(ProducerConfig.RETRIES_CONFIG,3);
//配置Kafka集群地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
//配置序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//返回生产者
return new KafkaProducer<String, String>(props);
}
如此,便可以实现一个自定义分区策略。
6. 主题操作
Kafka驱动提供了AdminClient类来提供了操作Kafka主题的功能。通过AdminClient提供的方法,可以对主题进行增删改查操作。
- createTopics方法,通过传入一个记录主题信息的NewTopic类实例的列表,可以请求Kafka集群创建多个主题。
- deleteTopics方法,通过传入主题名的字符串列表,可以对主题进行删除操作。
- createPartitions方法,可以通过传入,主题和分区改变信息的映射关系的Map实例,实现主题的分区数量改变。
- incrementalAlterConfigs方法,可以传入主题和配置项列表的映射管理,来修改配置。配置要通过ConfigEntry类的实例来表示,并且要通过AlterConfigOp类实例包裹来声明修改配置的方式。
- listTopics方法,查看当前Kafka集群所有主题。
- describeTopics方法,可以通过传入主题名字符串的列表,来查看主题的详细信息。
以上方法都有一个表示当前操作的返回值,不过个返回值并不标志者操作的成功或结束,可以通过调用这个返回值的all方法获得一个KafkaFuture实例,然后在通过调用整个实例的get方法,同生产者一样,可以阻塞代码执行,等待操作的执行结果,不过不能配置acks,必须列表中所有操作都执行完毕才返回成功。
具体代码操作如下:
package com.kafak.testkafka;
import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.config.ConfigResource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class TestKafkaApplicationTests {
//创建主题
@Test
public void createTopic() {
//生成配置
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
//创建连接资源
try (AdminClient adminClient = AdminClient.create(props)) {
// 创建多个新主题信息
NewTopic newTopic1 = new NewTopic("test-topic1", 3, (short) 1);
NewTopic newTopic2 = new NewTopic("test-topic2", 3, (short) 1);
NewTopic newTopic3 = new NewTopic("test-topic3", 3, (short) 1);
//放入一个List中
List<NewTopic> newTopics = new ArrayList<>();
newTopics.add(newTopic1);
newTopics.add(newTopic2);
newTopics.add(newTopic3);
//通过adminClient发送到Kafka集群创建对应主题
adminClient.createTopics(newTopics).all().get();
System.out.println("Topic created successfully");
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
//删除主题
@Test
public void deleteTopic() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
// 创建需要删除的主题名字的List
List<String> strings = new ArrayList<>();
strings.add("test-topic1");
strings.add("test-topic2");
strings.add("test-topic3");
//传入deleteTopic方法
adminClient.deleteTopics(strings).all().get();
System.out.println("Topic deleted successfully");
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
//增加分区
@Test
public void increasePartitions() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
//创建主题和分区增加信息的映射关系map
HashMap<String, NewPartitions> increasePartitionsMap = new HashMap<>();
increasePartitionsMap.put("test-topic1", NewPartitions.increaseTo(5));
//传入createPartitions方法,增加主题的分区数量
adminClient.createPartitions(increasePartitionsMap).all().get();
System.out.println("Partitions increased successfully");
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
//修改主题配置
@Test
public void alterTopicConfig() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
//创建主题配置实例
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test-topic");
//创建消息保留时间配置
ConfigEntry retentionEntry = new ConfigEntry("retention.ms", "3600000");
AlterConfigOp alterConfigOp = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
//将主题和配置的映射信息保存进map
HashMap<ConfigResource, Collection<AlterConfigOp>> alterTopicConfigMap = new HashMap<>();
alterTopicConfigMap.put(configResource, Arrays.asList(alterConfigOp));
//将map传入incrementalAlterConfigs方法中
adminClient.incrementalAlterConfigs(alterTopicConfigMap).all().get();
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
//查看所有主题
@Test
public void listTopics() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
// 列出所有主题
Set<String> topics = adminClient.listTopics().names().get();
topics.forEach(System.out::println);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
//获取主题信息
@Test
public void describeTopic() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
List<String> strings = new ArrayList<>();
strings.add("test-topic1");
strings.add("test-topic2");
strings.add("test-topic3");
// 通过describeTopics方法可以获取主题的详细信息
Map<String, TopicDescription> topicDescriptionMap = adminClient.describeTopics(strings).all().get();
topicDescriptionMap.forEach((name, description) -> System.out.println(name + ": " + description));
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
}
版权归原作者 不止会JS 所有, 如有侵权,请联系我们删除。