目录
主题的管理
使用kafka-topics.sh脚本。
下面是使用脚本的一些选项
选项说明–config <String: name=value>为创建的或修改的主题指定配置信息。–create创建一个新主题–delete删除一个主题–delete-config <String: name>删除现有主题的一个主题配置条目。这些条目就 是在–config中给出的配置条目。–alter更改主题的分区数量,副本分配和/或配置条目。–describe列出给定主题的细节。–disable-rack-aware禁用副本分配的机架感知。–force抑制控制台提示信息–help打印帮助信息–replica-assignment
<String:broker_id_for_part1_replica1
:broker_id_for_part1_replica2
,broker_id_for_part2_replica1
:broker_id_for_part2_replica2 , …>当创建或修改主题的时候手动指定partition-to-broker的分配关系。–replication-factor <Integer:replication factor>要创建的主题分区副本数。1表示只有一个副本, 也就是Leader副本。–topic <String: topic>要创建、修改或描述的主题名称。除了创建,修改和描述在这里还可以使用正则表达式。–topics-with-overridesif set when describing topics, only show topics that have overridden configs–unavailable-partitionsif set when describing topics, only show partitions whose leader is not available–under-replicated-partitionsif set when describing topics, only show under replicated partitions–zookeeper <String: urls>必需的参数:连接zookeeper的字符串,逗号分隔的多个host:port列表。多个URL可以故障转移。
创建主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x - -partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_02 --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760
查看主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --list
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_x
kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides -- describe
修改主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 --partitions 2 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config max.message.bytes=1048576
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test_01
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config segment.bytes=10485760
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic topic_test_01
删除主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x
删除主题时,只是给主题添加删除的标记,要过一段时间删除。
增加分区
通过命令行工具操作,主题的分区只能增加,不能减少。否则报错
ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic myTop1 currently has 2 partitions, 1 would not be an increase.
通过–alter修改主题的分区数,增加分区。
kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 -- partitions 2
分区副本的分配
副本分配的三个目标:
- 均衡地将副本分散于各个broker上
- 对于某个broker上分配的分区,它的其他副本在其他broker上
- 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。
在不考虑机架信息的情况下:
- 第一个副本分区通过轮询的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置进行轮询。
- 其余副本通过增加偏移进行分配。
必要参数配置
使用kafka-topics.sh --config xx=xx --config yy=yy
配置给主题的参数如下:
属性默认值服务器默认属性说明cleanup.policydeletelog.cleanup.policy要么是”delete“要么是”compact“; 这个字符串指明了当他们的回收时间或者尺寸限制到达时,针对旧日志部分的利用方式。默认 方式(“delete”)将会丢弃旧的部分;”compact“将会进行日志压缩。compression.typenoneproducer用于压缩数据的压缩类 型。默认是无压缩。正确的选项 值是none、gzip、snappy、 lz4。压缩最好用于批量处理,批 量处理消息越多,压缩性能越 好。max.message.bytes1000000max.message.byteskafka追加消息的最大字节数。注 意如果你增大这个字节数,也必 须增大consumer的fetch字节 数,这样consumer才能fetch到 这些最大字节数的消息。min.cleanable.dirty.ratio0.5min.cleanable.dirty.ratio此项配置控制log压缩器试图进行 清除日志的频率。默认情况下, 将避免清除压缩率超过50%的日 志。这个比率避免了最大的空间 浪费min.insync.replicas1min.insync.replicas当producer设置 request.required.acks为-1时, min.insync.replicas指定replicas 的最小数目(必须确认每一个 repica的写数据都是成功的), 如果这个数目没有达到, producer会产生异常。retention.bytesNonelog.retention.bytes如果使用“delete”的retention 策 略,这项配置就是指在删除日志之前,日志所能达到的最大尺寸。默认情况下,没有尺寸限制而只有时间限制retention.ms7 dayslog.retention.minutes如果使用“delete”的retention策 略,这项配置就是指删除日志前 日志保存的时间。segment.bytes1GBlog.segment.byteskafka中log日志是分成一块块存储的,此配置是指logsegment.index.bytes10MBlog.index.size.max.bytes此配置是有关offsets和文件位置 之间映射的索引文件的大小;一 般不需要修改这个配置segment.jitter.ms0log.roll.jitter.{ms,hours}The maximum jitter to subtract from logRollTimeMillis.segment.ms7 dayslog.roll.hours即使log的分块文件没有达到需要删除、压缩的大小,一旦log 的时间达到这个上限,就会强制新建一个log分块文件unclean.leader.election.enabletrue指明了是否能够使不在ISR中的replicas设置用来作为leader
KafkaAdminClient应用
除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采用Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,又多了一个AdminClient,在kafka-client包下,一个抽象类,具体的实现是
org.apache.kafka.clients.admin.KafkaAdminClient
。
功能
KafkaAdminClient包含了一下几种功能(以Kafka1.0.2版本为准):
- 创建主题:
createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options)
- 删除主题:
deleteTopics(final Collection<String> topicNames, DeleteTopicsOptions options)
- 列出所有主题:
listTopics(final ListTopicsOptions options)
- 查询主题:
describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options)
- 查询集群信息:
describeCluster(DescribeClusterOptions options)
- 查询配置信息:
describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options)
- 修改配置信息:
alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options)
- 修改副本的日志目录:
alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options)
- 查询节点的日志目录信息:
describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options)
- 查询副本的日志目录信息:
describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas,DescribeReplicaLogDirsOptions options)
- 增加分区:
createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options)
其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。
用到的参数:
属性说明重要性bootstrap.servers向Kafka集群建立初始连接用到的host/port列表。客户端会使用这里列出的所有服务器进行集群其他服务器的发现,而不管是否指定了哪个服务器用作引导。这个列表仅影响用来发现集群所有服务器的初始主机。
字符串形式:host1:port1,host2:port2,…
由于这组服务器仅用于建立初始链接,然后发现集群中的所有服务器,因此没有必要将集群中的所有地址写在这里。
一般最好两台,以防其中一台宕掉。highclient.id生产者发送请求的时候传递给broker的id字符串。用于在broker的请求日志中追踪什么应用发送了什么消息。一般该id是跟业务有关的字符串。mediumconnections.max.idle.ms当连接空闲时间达到这个值,就关闭连接。long型数据,默认:300000mediumrequest.timeout.ms客户端等待服务端响应的最大时间。如果该时间超时,则客户端要么重新发起请求,要么如果重试耗尽,请求失败。int类型值,默认:120000mediumsecurity.protocol跟broker通信的协议:PLAINTEXT, SSL,SASL_PLAINTEXT, SASL_SSL.string类型值,默认:PLAINTEXTmediumreconnect.backoff.ms重新连接主机的等待时间。避免了重连的密集循环。该等待时间应用于该客户端到broker的所有连接。long型值,默认:50mediumretries重试的次数,达到此值,失败。int类型值,默认5。lowretry.backoff.ms在发生失败的时候如果需要重试,则该配置表示客户端等待多长时间再发起重试。该时间的存在避免了密集循环。long型值,默认值:100。lowreceive.buffer.bytesTCP接收缓存(SO_RCVBUF),如果设置为-1,则使用操作系统默认的值。int类型值,默认65536mediumsend.buffer.bytes用于TCP发送数据时使用的缓冲大小(SO_SNDBUF),-1表示使用OS默认的缓冲区大小。int类型值,默认值:131072mediumreconnect.backoff.max.ms对于每个连续的连接失败,每台主机的退避将成倍增加,直至达到此最大值。在计算退避增量之后,添加20%的随机抖动以避免连接风暴。
long型值,默认1000low
操作示例
主要操作步骤:
- 客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
- 客户端发送请求至Kafka Broker。
- Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
- 客户端接收相应的回执并进行解析处理。
和协议有关的请求和回执的类基本都在
org.apache.kafka.common.requests
包中,
AbstractRequest和AbstractResponse
是这些请求和响应类的两个父类。
综上,如果要自定义实现一个功能,只需要三个步骤:
- 自定义XXXOptions;
- 自定义XXXResult返回值;
- 自定义Call,然后挑选合适的XXXRequest和XXXResponse来实现Call类中的3个抽象方法。
示例:
importorg.apache.kafka.clients.admin.*;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.requests.DescribeLogDirsResponse;importorg.junit.After;importorg.junit.Before;importorg.junit.Test;importjava.util.*;importjava.util.concurrent.ExecutionException;importjava.util.function.BiConsumer;importjava.util.function.Consumer;publicclassMyAdminClient{privateKafkaAdminClient client;@Beforepublicvoidbefore(){Map<String,Object> configs =newHashMap<>();
configs.put("bootstrap.servers","node1:9092");
configs.put("client.id","admin_001");
client =(KafkaAdminClient)KafkaAdminClient.create(configs);}@Afterpublicvoidafter(){// 关闭admin客户端
client.close();}@TestpublicvoidtestListTopics()throwsExecutionException,InterruptedException{// 列出主题// final ListTopicsResult listTopicsResult = client.listTopics();ListTopicsOptions options =newListTopicsOptions();// 列出内部主题
options.listInternal(true);// 设置请求超时时间,单位是毫秒
options.timeoutMs(500);finalListTopicsResult listTopicsResult = client.listTopics(options);// final Set<String> strings = listTopicsResult.names().get();//// strings.forEach(name -> {// System.out.println(name);// });// 将请求变成同步的请求,直接获取结果finalCollection<TopicListing> topicListings = listTopicsResult.listings().get();
topicListings.forEach(newConsumer<TopicListing>(){@Overridepublicvoidaccept(TopicListing topicListing){// 该主题是否是内部主题finalboolean internal = topicListing.isInternal();// 该主题的名字finalString name = topicListing.name();System.out.println("主题是否是内部主题:"+ internal);System.out.println("主题的名字:"+ name);System.out.println(topicListing);System.out.println("=====================================");}});}@TestpublicvoidtestDescribeLogDirs()throwsExecutionException,InterruptedException{finalDescribeLogDirsResult describeLogDirsResult = client.describeLogDirs(Collections.singleton(0));finalMap<Integer,Map<String,DescribeLogDirsResponse.LogDirInfo>> integerMapMap
= describeLogDirsResult.all().get();
integerMapMap.forEach(newBiConsumer<Integer,Map<String,DescribeLogDirsResponse.LogDirInfo>>(){@Overridepublicvoidaccept(Integer integer,Map<String,DescribeLogDirsResponse.LogDirInfo> stringLogDirInfoMap){System.out.println("broker.id = "+ integer);// log.dirs可以设置多个目录
stringLogDirInfoMap.forEach(newBiConsumer<String,DescribeLogDirsResponse.LogDirInfo>(){@Overridepublicvoidaccept(String s,DescribeLogDirsResponse.LogDirInfo logDirInfo){System.out.println("logdir = "+ s);finalMap<TopicPartition,DescribeLogDirsResponse.ReplicaInfo> replicaInfos = logDirInfo.replicaInfos;
replicaInfos.forEach(newBiConsumer<TopicPartition,DescribeLogDirsResponse.ReplicaInfo>(){@Overridepublicvoidaccept(TopicPartition topicPartition,DescribeLogDirsResponse.ReplicaInfo replicaInfo){System.out.println("主题分区:"+ topicPartition.partition());System.out.println("主题:"+ topicPartition.topic());// final boolean isFuture = replicaInfo.isFuture;// final long offsetLag = replicaInfo.offsetLag;// final long size = replicaInfo.size;}});}});}});}}
版权归原作者 Ethan-running 所有, 如有侵权,请联系我们删除。