0


Kafka-创建topic源码

一、命令创建topic

kafka-topics --create --topic quickstart-events --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

二、kafka-topics脚本

  1. exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.TopicCommand "$@"

脚本中指定了处理它的主类:TopicCommand

三、TopicCommand

  1. public abstract class TopicCommand {
  2. public static void main(String... args) {
  3. Exit.exit(mainNoExit(args));
  4. }
  5. private static int mainNoExit(String... args) {
  6. try {
  7. execute(args);
  8. return 0;
  9. } catch (Throwable e) {
  10. return 1;
  11. }
  12. }
  13. static void execute(String... args) throws Exception {
  14. //解析命令行参数
  15. TopicCommandOptions opts = new TopicCommandOptions(args);
  16. //创建TopicService
  17. TopicService topicService = new TopicService(opts.commandConfig(), opts.bootstrapServer());
  18. try {
  19. if (opts.hasCreateOption()) {
  20. //这是处理topic创建的,我们主要分析它
  21. topicService.createTopic(opts);
  22. } else if (opts.hasAlterOption()) {
  23. //更高topic逻辑
  24. topicService.alterTopic(opts);
  25. } else if (opts.hasListOption()) {
  26. //获取topic
  27. topicService.listTopics(opts);
  28. } else if (opts.hasDescribeOption()) {
  29. //topi相关描述信息
  30. topicService.describeTopic(opts);
  31. } else if (opts.hasDeleteOption()) {
  32. //删除topic
  33. topicService.deleteTopic(opts);
  34. }
  35. }catch(...){...
  36. }finally {
  37. topicService.close();
  38. }
  39. }
  40. public static class TopicService implements AutoCloseable {
  41. public void createTopic(TopicCommandOptions opts) throws Exception {
  42. CommandTopicPartition topic = new CommandTopicPartition(opts);
  43. if (Topic.hasCollisionChars(topic.name)) {
  44. //由于度量名称的限制,带有句点(“.”)或下划线(“_”)的主题可能会发生冲突。为了避免问题,最好使用其中之一,但不要两者都使用
  45. System.out.println(".........");
  46. }
  47. createTopic(topic);
  48. }
  49. public void createTopic(CommandTopicPartition topic) throws Exception {
  50. if (topic.replicationFactor.filter(rf -> rf > Short.MAX_VALUE || rf < 1).isPresent()) {
  51. //复制因子必须介于1和“+Short.MAX_VALUE+”之间
  52. throw new IllegalArgumentException("...");
  53. }
  54. if (topic.partitions.filter(p -> p < 1).isPresent()) {
  55. //分区必须大于0
  56. throw new IllegalArgumentException("...");
  57. }
  58. try {
  59. NewTopic newTopic;
  60. //取决于创建 topic 时 是否指定了 replica-assignment
  61. if (topic.hasReplicaAssignment()) {
  62. newTopic = new NewTopic(topic.name, topic.replicaAssignment);
  63. } else {
  64. newTopic = new NewTopic(topic.name, topic.partitions, topic.replicationFactor.map(Integer::shortValue));
  65. }
  66. //给topic设置参数
  67. Map<String, String> configsMap = topic.configsToAdd.stringPropertyNames().stream().collect(Collectors.toMap(name -> name, name -> topic.configsToAdd.getProperty(name)));
  68. newTopic.configs(configsMap);
  69. //批量创建topic
  70. CreateTopicsResult createResult = adminClient.createTopics(Collections.singleton(newTopic),
  71. new CreateTopicsOptions().retryOnQuotaViolation(false));
  72. //等待所有topic都创建成功
  73. createResult.all().get();
  74. System.out.println("Created topic " + topic.name + ".");
  75. } catch (ExecutionException e) {
  76. //......
  77. }
  78. }
  79. }
  80. }

TopicCommandOptions中有对创建topic所有参数的解读,我们下面来详细看下这些参数

四、创建Topic参数

bootstrap-server

  1. **必选项:**连接Kafka server

command-config

  1. 包含要传递给Admin Client的配置的属性文件。这仅与--bootstrap-server选项一起使用,用于描述和更改broker配置

list

  1. 列出所有可用的topic

create

  1. 创建一个新的topic

delete

  1. 删除一个topic

alter

  1. 更改分区数量和副本分配,通过--alter更新现有主题的配置

describe

  1. 列出给定topic的详细信息

topic

  1. 要创建、更改、描述或删除的主题。它还接受正则表达式,但--create选项除外。将主题名称放在双引号中,并使用“\\”前缀转义正则表达式符号;例如 \"test\\.topic\"

topic-id

  1. 仅与用于描述主题的--bootstrap-server选项一起使用

config

  1. 正在创建的主题的主题配置覆盖。

delete-config

  1. 要删除现有主题的主题配置覆盖

partitions

  1. 正在创建或更改的主题的分区数量(警告:如果为具有键的主题增加分区,则分区逻辑或消息顺序将受到影响)。如果未提供用于,则为集群默认值

replication-factor

  1. 正在创建的主题中每个分区的复制因子。如果未提供,则为群集默认值

replica-assignment

  1. 正在创建或更改的topic的手动分区到broker分配列表

under-replicated-partitions

  1. 如果在描述主题时设置,则仅在复制分区下显示

unavailable-partitions

  1. 如果在描述主题时设置,则仅显示其leader不可用的分区

under-min-isr-partitions

  1. 如果在描述主题时设置,则仅显示isr计数 < 配置的最小值的分区。

at-min-isr-partitions

  1. 如果在描述主题时设置,则仅显示isr计数 = 配置的最小值的分区

topics-with-overrides

  1. 如果在描述主题时设置,则仅显示已覆盖配置的topic

if-exists

  1. 如果在更改、删除或描述主题时设置,则仅当主题存在时才会执行该操作

if-not-exists

  1. 如果在创建主题时设置,则仅当主题不存在时才会执行该操作。

exclude-internal

  1. 运行listdescribe命令时排除内部topic。默认情况下,内部topic将被列出

partition-size-limit-per-response

  1. 一个DescribeTopicPartitions响应中包含的最大分区大小

五、AdminClient

从第二步的源码中看到最终将topic的创建交给了AdminClient来完成,下面我们继续往下分析

1、创建

在TopicService的构造方法中创建的AdminClient

它是Kafka的管理客户端,支持管理和检查topic、broker、配置和ACL。

AdminClient的创建用到了bootstrap.servers,它里面有连接KafkaServer的host:port列表。

bootstrap.servers配置仅用于发现群集中的broker,然后AdminClient将根据需要连接到这些broker。因此,只包括两个或三个经纪人地址就足以应对broker不可用的风险。

  1. TopicService topicService = new TopicService(opts.commandConfig(), opts.bootstrapServer());
  1. public static class TopicService implements AutoCloseable {
  2. private final Admin adminClient;
  3. public TopicService(Properties commandConfig, Optional<String> bootstrapServer) {
  4. this.adminClient = createAdminClient(commandConfig, bootstrapServer);
  5. }
  6. private static Admin createAdminClient(Properties commandConfig, Optional<String> bootstrapServer) {
  7. if (bootstrapServer.isPresent()) {
  8. commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer.get());
  9. }
  10. return Admin.create(commandConfig);
  11. }
  12. }

2、交由子类KafkaAdminClient处理

  1. public class KafkaAdminClient extends AdminClient {
  2. private final AdminClientRunnable runnable;
  3. //创建一批topic
  4. public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
  5. final CreateTopicsOptions options) {
  6. final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> topicFutures = new HashMap<>(newTopics.size());
  7. final CreatableTopicCollection topics = new CreatableTopicCollection();
  8. for (NewTopic newTopic : newTopics) {
  9. //判断名字是否符合规范
  10. if (topicNameIsUnrepresentable(newTopic.name())) {
  11. KafkaFutureImpl<TopicMetadataAndConfig> future = new KafkaFutureImpl<>();
  12. future.completeExceptionally(new InvalidTopicException("The given topic name '" +
  13. newTopic.name() + "' cannot be represented in a request."));
  14. topicFutures.put(newTopic.name(), future);
  15. } else if (!topicFutures.containsKey(newTopic.name())) {
  16. //topicFutures 装的是还没有创建的 topicname
  17. topicFutures.put(newTopic.name(), new KafkaFutureImpl<>());
  18. topics.add(newTopic.convertToCreatableTopic());
  19. }
  20. }
  21. if (!topics.isEmpty()) {
  22. final long now = time.milliseconds();
  23. final long deadline = calcDeadlineMs(now, options.timeoutMs());
  24. //里面封装了 ApiKeys.CREATE_TOPICS 请求
  25. final Call call = getCreateTopicsCall(options, topicFutures, topics,
  26. Collections.emptyMap(), now, deadline);
  27. //实现了Runnable接口
  28. runnable.call(call, now);
  29. }
  30. return new CreateTopicsResult(new HashMap<>(topicFutures));
  31. }
  32. }

从这里我们看到,这里会用一个线程向broker发送ApiKeys.CREATE_TOPICS 请求。下面我们来看broker端怎么处理topics的创建请求的。按照我们之前的经验,要去看KafkaApis中对应ApiKeys.CREATE_TOPICS的处理逻辑

  1. class KafkaApis(...){
  2. request.header.apiKey match {
  3. //....
  4. case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
  5. case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
  6. case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
  7. case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
  8. case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)
  9. case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders)
  10. //.....
  11. }
  12. }

六、CREATE_TOPICS的处理逻辑

从KafkaApi中我们看到很多请求都调用了maybeForwardToController()方法来处理,但是传入的参数不同,从名称上我们可以猜测这些请求可能交由Controller来处理,回想下《Kafka-Controller角色需要做什么?》中当一个broker当选为Controller时第一件事就是注册监听器,去监听broker改变、topic改变、topic删除、isr改变等,并分别准备好了响应的处理逻辑。因此这里只要让topic发生改变就可以自动触发让Controller处理了。下面看下handleCreateTopicsRequest()中都做了什么?

1、获取ZooKeeper

  1. val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))

2、判断集群当下是否有Controller

如果集群当下没有Controller,直接向客户端返回Errors.NOT_CONTROLLER错误。我们按照集群当下有Controller继续分析。

  1. if (!zkSupport.controller.isActive) {
  2. //如果没有contorller,直接向客户端发送响应信息(集群当下没有controller),且这个时候时创建不了topic的,
  3. createTopicsRequest.data.topics.forEach { topic =>
  4. results.add(new CreatableTopicResult().setName(topic.name)
  5. .setErrorCode(Errors.NOT_CONTROLLER.code))
  6. }
  7. sendResponseCallback(results)
  8. } else {
  9. //正常逻辑
  10. }

3、检查topic名称

集群元数据topic是一个具有不同实现的内部topic。不应允许用户创建同名的topic。

  1. if (topicNames.contains(Topic.CLUSTER_METADATA_TOPIC_NAME)) {
  2. //拒绝创建内部主题 __cluster_metadata
  3. info(s"Rejecting creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME}")
  4. }
  5. topicNames.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))

4、调用ZkAdminManager创建topic

  1. zkSupport.adminManager.createTopics(
  2. createTopicsRequest.data.timeoutMs,
  3. createTopicsRequest.data.validateOnly,
  4. toCreate,
  5. authorizedForDescribeConfigs,
  6. controllerMutationQuota,
  7. handleCreateTopicsResults)
  8. }

1、循环校验每个topic是否符合规则

1、topic是否已经存在

2、topic是否为null

3、numPartitions或replicationFactor和replicasAssignments都已设置。两者不能同时使用

2、确定分区分配列表

如果用户指定了列表,那么就直接用用户的,否则使用Kafka自己的分配策略(下篇博客分析)

  1. val assignments = if (topic.assignments.isEmpty) {
  2. CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(
  3. brokers.asJavaCollection, resolvedNumPartitions, resolvedReplicationFactor))
  4. } else {
  5. val assignments = new mutable.HashMap[Int, Seq[Int]]
  6. //注意:我们不会检查replicaAssignment是否包含未知的代理——与添加分区的情况不同,这遵循TopicCommand中的现有逻辑
  7. topic.assignments.forEach { assignment =>
  8. assignments(assignment.partitionIndex) = assignment.brokerIds.asScala.map(a => a: Int)
  9. }
  10. assignments
  11. }

3、topics目录下创建指定的topic

  1. //ConfigType.TOPIC : topics 目录
  2. //topic :要创建的topic名称
  3. zkClient.setOrCreateEntityConfigs(ConfigType.TOPIC, topic, config)

4、topic目录下创建分区目录和对应信息

  1. writeTopicPartitionAssignment(topic, partitionReplicaAssignment.map { case (k, v) => k -> ReplicaAssignment(v) },isUpdate = false, usesTopicId)

5、创建对应的元数据

  1. CreatePartitionsMetadata(topic.name, assignments.keySet)

七、Controller端处理逻辑

我们找到TopicChange对应的处理逻辑

  1. override def process(event: ControllerEvent): Unit = {
  2. try {
  3. event match {
  4. case TopicChange =>
  5. processTopicChange()
  6. //......
  7. }
  8. }
  9. }
  10. private def processTopicChange(): Unit = {
  11. if (!isActive) return
  12. //从 brokers/topics/目录下获取所有的topic
  13. val topics = zkClient.getAllTopicsInCluster(true)
  14. //从controllerContext 获取当下缓存中所有的 topic
  15. //两者相减获取 新增加的 topic
  16. val newTopics = topics -- controllerContext.allTopics
  17. // 获取删除的topic (既topics目录没有,但是缓存中有)
  18. val deletedTopics = controllerContext.allTopics.diff(topics)
  19. //设置新的topic到缓存
  20. controllerContext.setAllTopics(topics)
  21. //检测zk中 每个topic 目录的变化
  22. registerPartitionModificationsHandlers(newTopics.toSeq)
  23. //现在要添加分区和副本了,也就是从topic下获取 topic_id、adding_replicas、removing_replicas、partitions 信息
  24. val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentAndTopicIdForTopics(newTopics)
  25. deletedTopics.foreach(controllerContext.removeTopic)
  26. processTopicIds(addedPartitionReplicaAssignment)
  27. addedPartitionReplicaAssignment.foreach { case TopicIdReplicaAssignment(_, _, newAssignments) =>
  28. newAssignments.foreach { case (topicAndPartition, newReplicaAssignment) =>
  29. //controllerContext 的缓存中 更新分区、副本、leder信息
  30. controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)
  31. }
  32. }
  33. info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
  34. s"[$addedPartitionReplicaAssignment]")
  35. if (addedPartitionReplicaAssignment.nonEmpty) {
  36. val partitionAssignments = addedPartitionReplicaAssignment
  37. .map { case TopicIdReplicaAssignment(_, _, partitionsReplicas) => partitionsReplicas.keySet }
  38. .reduce((s1, s2) => s1.union(s2))
  39. //更高topic下的分区、副本为可用状态 OnlineReplica
  40. //此时 往topic 生产数据就ok了
  41. onNewPartitionCreation(partitionAssignments)
  42. }
  43. }

从源码中我们可以看到,Controller这端会不断的将新的topic以及其下的topic_id、adding_replicas、removing_replicas、partitions 信息加载到缓存,并使用它们的状态机将它们更新至可用状态。并剔除掉删除的topic。始终保持,当向topic生产数据时,它这里都时最新的状态。


本文转载自: https://blog.csdn.net/lu070828/article/details/143669697
版权归原作者 隔着天花板看星星 所有, 如有侵权,请联系我们删除。

“Kafka-创建topic源码”的评论:

还没有评论