0


Kafka:Topic概念与API介绍

Topic

事件被组织并持久地存储在

Topic

中,

Topic

类似于文件系统中的文件夹,事件就是该文件夹中的文件。

Kafka

中的

Topic

始终是多生产者和多订阅者:一个

Topic

可以有零个、一个或多个生产者向其写入事件,也可以有零个、一个或多个消费者订阅这些事件。

Topic

中的事件可以根据需要随时读取,与传统的消息中间件不同,事件在使用后不会被删除,相反,可以通过配置来定义

Kafka

中每个

Topic

应该保留事件的时间,超过该事件后旧事件将被丢弃。

Kafka

的性能在数据大小方面实际上是恒定的,因此长时间存储数据是非常好的。

Partition

Topic

是分区的,这意味着一个

Topic

可以分布在多个

Kafka

节点上。数据的这种分布式放置对于可伸缩性非常重要,因为它允许客户端应用程序同时从多个

Kafka

节点读取和写入数据。将新事件发布到

Topic

时,它实际上会

appended

Topic

的一个

Partition

中。具有相同事件

key

的事件将写入同一

Partition

Kafka

保证给定

Topic

Partition

的任何使用者都将始终以与写入时完全相同的顺序读取该分区的事件。

Replication

为了使数据具有容错性和高可用性,每个

Topic

都可以有多个

Replication

,以便始终有多个

Kafka

节点具有数据副本,以防出现问题。常见的生产设置是

replicationFactor

3

,即始终有三份数据副本(包括一份原始数据)。此

Replication

Topic

Partition

级别执行。

Kafka

在指定数量(通过

replicationFactor

)的服务器上复制每个

Topic

Partition

,这允许在集群中的某些服务器发生故障时进行自动故障转移,以便在出现故障时服务仍然可用。

Replication

的单位是

Topic

Partition

。在非故障条件下,

Kafka

中的每个

Partition

都有一个

leader

和零个或多个

follower

replicationFactor

是复制副本(包括

leader

)的总数。所有读和写操作都将转到

Partition

leader

上。通常,有比

Kafka

节点多得多的

Partition

,并且这些

Partition

leader

Kafka

节点之间均匀分布。

follower

上的数据需要与

leader

的数据同步,所有数据都具有相同的偏移量和顺序(当然,在任何给定时间,

leader

的数据末尾可能有一些尚未复制的数据)。

follower

会像普通

Kafka

消费者一样使用来自

leader

的消息,并将其应用到自己的数据中。如下图所示,三个

Kafka

节点上有两个

Topic

Topic 0

Topic 1

),

Topic 0

有两个

Partition

并且

replicationFactor

3

(红色的

Partition

leader

),

Topic 1

有三个

Partition

replicationFactor

也为

3

(红色的

Partition

leader

)。
在这里插入图片描述

API

添加依赖:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>

这里使用的

kafka-clients

版本和博主之前部署的

Kafka

版本一致:

  • Kafka:部署Kafka

client

操作

Topic

的客户端通过

AdminClient

抽象类来创建,源码如下:

packageorg.apache.kafka.clients.admin;importjava.util.Map;importjava.util.Properties;publicabstractclassAdminClientimplementsAdmin{/**
     * 使用给定的配置创建一个新的Admin
     * props:Admin的配置
     * 返回KafkaAdminClient实例
     */publicstaticAdminClientcreate(Properties props){return(AdminClient)Admin.create(props);}/**
     * 重载方法
     * 使用给定的配置创建一个新的Admin
     * props:Admin的配置
     * 返回KafkaAdminClient实例
     */publicstaticAdminClientcreate(Map<String,Object> conf){return(AdminClient)Admin.create(conf);}}

实际上会返回一个

KafkaAdminClient

实例(

KafkaAdminClient

类是

AdminClient

抽象类的子类),

KafkaAdminClient

类的方法比较多,其中

private

方法服务于

public

方法(提供给用户的服务)。
在这里插入图片描述

KafkaAdminClient

类提供的

public

方法是对

Admin

接口的实现。
在这里插入图片描述

create

创建一个新的

Topic

packagecom.kaven.kafka.admin;importorg.apache.kafka.clients.admin.*;importorg.apache.kafka.common.KafkaFuture;importjava.util.*;importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.ExecutionException;publicclassAdmin{privatestaticfinalAdminClient adminClient =Admin.getAdminClient();publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{Admin admin =newAdmin();
        admin.createTopic();Thread.sleep(100000);}publicvoidcreateTopic()throwsInterruptedException{CountDownLatch latch =newCountDownLatch(1);CreateTopicsResult topics = adminClient.createTopics(Collections.singleton(newNewTopic("new-topic-kaven",1,(short)1)));Map<String,KafkaFuture<Void>> values = topics.values();
        values.forEach((name, future)->{
            future.whenComplete((a, throwable)->{if(throwable !=null){System.out.println(throwable.getMessage());}System.out.println(name);
                latch.countDown();});});
        latch.await();}publicstaticAdminClientgetAdminClient(){Properties properties =newProperties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.31.240:9092");returnAdminClient.create(properties);}}

创建

AdminClient

(简单使用,配置

BOOTSTRAP_SERVERS_CONFIG

就可以了):

publicstaticAdminClientgetAdminClient(){Properties properties =newProperties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.31.240:9092");returnAdminClient.create(properties);}

创建

Topic

(传入一个

NewTopic

实例,并且给该

NewTopic

实例配置

name

numPartitions

replicationFactor

):

publicvoidcreateTopic()throwsInterruptedException{CountDownLatch latch =newCountDownLatch(1);CreateTopicsResult topics = adminClient.createTopics(Collections.singleton(newNewTopic("new-topic-kaven",1,(short)1)));Map<String,KafkaFuture<Void>> values = topics.values();
        values.forEach((name, future)->{
            future.whenComplete((a, throwable)->{if(throwable !=null){System.out.println(throwable.getMessage());}System.out.println(name);
                latch.countDown();});});
        latch.await();}

提供的方法大都是异步编程模式的,这些基础知识就不介绍了,输出如下图所示:
在这里插入图片描述

list

获取

Topic

列表。

publicvoidlistTopics()throwsExecutionException,InterruptedException{ListTopicsResult listTopicsResult = adminClient.listTopics(newListTopicsOptions().listInternal(true));Set<String> names = listTopicsResult.names().get();
        names.forEach(System.out::println);}
get

方法会等待

future

完成,然后返回其结果。输出如下图所示:
在这里插入图片描述
通过下面这个配置,可以获取到

Kafka

内置的

Topic

newListTopicsOptions().listInternal(true)

默认是不会获取到

Kafka

内置的

Topic

publicvoidlistTopics()throwsExecutionException,InterruptedException{ListTopicsResult listTopicsResult = adminClient.listTopics();Set<String> names = listTopicsResult.names().get();
        names.forEach(System.out::println);}

在这里插入图片描述

delete

删除

Topic

publicvoiddeleteTopic()throwsInterruptedException{CountDownLatch latch =newCountDownLatch(2);DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("java-client4","java-client2"));
        deleteTopicsResult.topicNameValues().forEach((name, future)->{
            future.whenComplete((a, throwable)->{if(throwable !=null){System.out.println(throwable.getMessage());}System.out.println(name);
                latch.countDown();});});
        latch.await();}

输出如下图所示:
在这里插入图片描述
现在再获取

Topic

的列表,输出如下图所示(删除的

Topic

已经不在了):
在这里插入图片描述

describe

获取

Topic

的描述。

publicvoiddescribeTopic(){Map<String,KafkaFuture<TopicDescription>> values =
                adminClient.describeTopics(Collections.singleton("new-topic-kaven")).values();for(String name : values.keySet()){
            values.get(name).whenComplete((describe, throwable)->{if(throwable !=null){System.out.println(throwable.getMessage());}System.out.println(name);System.out.println(describe);});}}

输出如下图所示:
在这里插入图片描述
输出符合预期,因为创建该

Topic

的配置为:

newNewTopic("new-topic-kaven",1,(short)1)

config

获取

Topic

的配置。

publicvoiddescribeTopicConfig()throwsExecutionException,InterruptedException{DescribeConfigsResult describeConfigsResult = adminClient
                .describeConfigs(Collections.singleton(newConfigResource(ConfigResource.Type.TOPIC,"new-topic-kaven")));
        describeConfigsResult.all().get().forEach(((configResource, config)->{System.out.println(configResource);System.out.println(config);}));}

输出如下图所示:
在这里插入图片描述

describeConfigs

方法很显然还可以获取其他资源的配置(通过指定资源的类型)。

publicenumType{BROKER_LOGGER((byte)8),BROKER((byte)4),TOPIC((byte)2),UNKNOWN((byte)0);...}

alter

增量更新

Topic

的配置。

publicvoidincrementalAlterConfig()throwsInterruptedException{CountDownLatch latch =newCountDownLatch(1);Map<ConfigResource,Collection<AlterConfigOp>> alter =newHashMap<>();
        alter.put(newConfigResource(ConfigResource.Type.TOPIC,"new-topic-kaven"),Collections.singletonList(newAlterConfigOp(newConfigEntry("compression.type","gzip"),AlterConfigOp.OpType.SET
                        )));AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alter);
        alterConfigsResult.values().forEach(((configResource, voidKafkaFuture)->{
            voidKafkaFuture.whenComplete((a, throwable)->{if(throwable !=null){System.out.println(throwable.getMessage());}System.out.println(configResource);
                latch.countDown();});}));
        latch.await();}

输出如下图所示:
在这里插入图片描述
很显然

incrementalAlterConfigs

方法也可以增量更新其他资源的配置(通过指定资源的类型)。

ConfigResource

定义需要修改配置的资源,

Collection<AlterConfigOp>

定义该资源具体的配置修改操作。

Map<ConfigResource,Collection<AlterConfigOp>> alter =newHashMap<>();
configEntry

定义资源需要修改的配置条目,

operationType

定义修改操作的类型。

publicAlterConfigOp(ConfigEntry configEntry,OpType operationType){this.configEntry = configEntry;this.opType =  operationType;}

修改操作的类型。

publicenumOpType{/**
         * 设置配置条目的值
         */SET((byte)0),/**
         * 将配置条目恢复为默认值(可能为空)
         */DELETE((byte)1),/**
         * 仅适用于列表类型的配置条目
         * 将指定的值添加到配置条目的当前值
         * 如果尚未设置配置值,则添加到默认值
         */APPEND((byte)2),/**
         * 仅适用于列表类型的配置条目
         * 从配置条目的当前值中删除指定的值
         * 删除当前不在配置条目中的值是合法的
         * 从当前配置值中删除所有条目会留下一个空列表,并且不会恢复为条目的默认值
         */SUBTRACT((byte)3);...}

资源的配置条目,包含配置名称、值等。

publicclassConfigEntry{privatefinalString name;privatefinalString value;privatefinalConfigSource source;privatefinalboolean isSensitive;privatefinalboolean isReadOnly;privatefinalList<ConfigSynonym> synonyms;privatefinalConfigType type;privatefinalString documentation;...}

在获取

Topic

配置的输出中也可以发现这些配置条目。
在这里插入图片描述
很显然,这里修改名称为

new-topic-kaven

Topic

compression.type

配置条目(压缩类型)。

        alter.put(newConfigResource(ConfigResource.Type.TOPIC,"new-topic-kaven"),Collections.singletonList(newAlterConfigOp(newConfigEntry("compression.type","gzip"),AlterConfigOp.OpType.SET
                        )));
compression.type

配置条目的默认值为

producer

(意味着保留生产者设置的原始压缩编解码器),和上面的图也对应,博主将该配置条目修改成了

gzip


在这里插入图片描述
再来获取该

Topic

的配置,如下图所示(很显然配置修改成功了):
在这里插入图片描述

Kafka

Topic

概念与

API

介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。


本文转载自: https://blog.csdn.net/qq_37960603/article/details/122277935
版权归原作者 ITKaven 所有, 如有侵权,请联系我们删除。

“Kafka:Topic概念与API介绍”的评论:

还没有评论