集群启动
在搭建完Kafka集群之后,我们需要了解一些基本的概念,并掌握如何正确地启动集群。以下是一些关键的概念介绍以及启动步骤。
相关概念
代理:Broker
在使用Kafka之前,我们需要启动Kafka服务进程,这个服务进程通常被称为Kafka Broker或Kafka Server。由于Kafka是一个分布式消息系统,在生产环境中,通常需要多个服务进程组成集群来提供消息服务。因此,每一个服务节点都称为一个Broker,并且为了区分不同的服务节点,每一个Broker都需要有一个唯一的全局ID,即
broker.id
,这个ID可以在Kafka软件的配置文件
server.properties
中进行配置。
示例配置如下:
# The id of the broker. This must be set to a unique integer for each broker
# 集群ID
broker.id=0
在我们的Kafka集群中,每个节点都有自己的ID,是一个唯一的整数。
主机kafka-broker1kafka-broker2kafka-broker3broker.id123
控制器:Controller
Kafka作为一个分布式消息传输系统,存在多个Broker服务节点,并且采用了常见的主从(Master-Slave)架构。这意味着需要从多个Broker中选出一个用于管理整个Kafka集群的Master节点,这个节点称为Controller。Controller是Kafka的核心组件之一,其主要职责是在Apache Zookeeper的帮助下管理和协调整个Kafka集群。
如果在运行过程中,Controller节点出现故障,Kafka会依赖于ZooKeeper软件来选举出新的Controller,以确保集群的高可用性。
Kafka集群中Controller的基本功能包括但不限于:
- Broker管理:监听
/brokers/ids
节点相关的所有变化,包括Broker数量的增减以及Broker对应数据的变化。 - Topic管理:监听
/brokers/topics
节点相关的所有变化,包括Topic的新增、修改和删除操作。 - Partition管理:监听
/admin/reassign_partitions
、/isr_change_notification
、/preferred_replica_election
等节点相关的所有变化。 - 数据服务:启动分区状态机和副本状态机。
启动ZooKeeper
Kafka集群含有多个服务节点,而在经典的主从架构中,需要从多个服务节点中选出一个作为集群管理的Master节点,即Controller。如果Controller节点出现故障,则需要从其余的Slave节点中选举出一个新的Controller节点来接管管理功能。
Kafka依赖于ZooKeeper软件来实现Broker节点的选举功能。具体来说,ZooKeeper提供了以下功能支持:
- 创建节点:创建一个节点时,可以选择持久化创建或临时创建。持久化创建的节点会一直存在,而临时创建的节点则依赖于客户端的连接状态,一旦客户端断开连接,节点也会被自动删除。
- 节点唯一性:ZooKeeper中的节点不允许重复创建,因此多个客户端尝试创建相同的节点时,只有第一个成功创建的客户端可以创建节点。
- 监听机制:客户端可以在ZooKeeper节点上设置监听器,以便在节点状态发生变化时收到通知并作出响应。
Kafka利用ZooKeeper的这些特性来实现Controller节点的选举:
- 当首次启动Kafka集群时,多个Broker节点会同时启动并尝试连接ZooKeeper,每个Broker节点都会尝试创建一个临时节点
/controller
。 - 由于ZooKeeper中一个节点不允许重复创建,因此最终只有一个Broker节点能够成功创建
/controller
节点,这个Broker节点将成为Kafka集群的Controller,负责管理整个集群。 - 其他未成为Controller的Broker节点会在
/controller
节点上设置监听器,以监听其状态变化。 - 如果Controller节点发生故障导致连接中断,
/controller
节点会被自动删除,此时其他设置了监听器的Broker节点会检测到这一变化,并尝试重新创建/controller
节点,成功者将成为新的Controller。
启动Kafka
初始化ZooKeeper
在Kafka Broker启动期间,首先会创建一个ZooKeeper客户端(
KafkaZkClient
)以与ZooKeeper进行交互。创建完客户端对象后,它会向ZooKeeper发送创建节点的请求,这些节点都是持久化的。以下是一些重要的节点及其作用:
/admin/delete_topics
:持久化节点,用于配置待删除的主题。因删除过程可能涉及Broker下线或失败情况,因此需要在Broker重新上线后根据此节点继续删除操作。一旦主题的所有分区数据被删除,则清理该节点。/brokers/ids
:持久化节点,存储服务节点ID标识。每当有新的Broker启动,就在该节点下增加子节点。Broker ID必须唯一。/brokers/topics
:持久化节点,包含服务节点中的主题详情,如分区和副本。/brokers/seqid
:持久化节点,用于自动生成Broker ID。/config/changes
:持久化节点,当Kafka元数据发生变化时,在该节点下创建子节点。/config/clients
:持久化节点,存储客户端配置,默认为空。/config/brokers
:持久化节点,存储服务节点相关配置,默认为空。/config/ips
:持久化节点,存储IP配置,默认为空。/config/topics
:持久化节点,存储主题配置,默认为空。/config/users
:持久化节点,存储用户配置,默认为空。/consumers
:持久化节点,记录消费者相关信息。/isr_change_notification
:持久化节点,用于通知Controller及时更新ISR列表。/latest_producer_id_block
:持久化节点,存储PID块,确保生产者请求得到响应。/log_dir_event_notification
:持久化节点,当Broker中的数据路径出现问题时,向ZooKeeper添加通知序号。/cluster/id
:持久化节点,存储Kafka集群的唯一ID信息及其版本号。
初始化服务
启动任务调度器
每个Broker启动时都会创建一个内部调度器(
KafkaScheduler
)并启动,使用Java中的定时任务线程池
ScheduledThreadPoolExecutor
来完成节点内部的工作任务。
创建数据管理器
每个Broker启动时创建数据管理器(
LogManager
),用于接收消息后的数据创建、查询、清理等处理。
创建远程数据管理器
每个Broker启动时创建远程数据管理器(
RemoteLogManager
),用于与其他Broker节点进行数据状态同步。
创建副本管理器
每个Broker启动时创建副本管理器(
ReplicaManager
),负责处理主题的副本。
创建ZK元数据缓存
每个Broker启动时创建ZK元数据缓存(
ZkMetadataCache
),用于缓存ZK中的Kafka元数据。
创建Broker通信对象
每个Broker启动时创建Broker间通信管理器(
BrokerToControllerChannelManager
),管理Broker与Controller间的通信。
创建网络通信对象
每个Broker启动时创建网络通信对象(
SocketServer
),用于与其他Broker进行通信,包含Java NIO通信的
Channel
和
Selector
对象。
注册Broker节点
Broker启动时,通过ZK客户端向ZK注册当前Broker节点ID,创建临时节点。如果Broker与ZK的连接断开,则临时节点会被删除。
启动控制器
控制器(
KafkaController
)是每个Broker启动时创建的核心对象,用于与ZK建立连接并申请成为整个Kafka集群的管理者。如果申请成功,它会初始化管理器并建立与其他Broker的数据通道。
初始化通道管理器
创建通道管理器(
ControllerChannelManager
),维护Controller与集群所有Broker节点间的网络连接,并向Broker发送控制请求及接收响应。
初始化事件管理器
创建事件管理器(
ControllerEventManager
),用于管理Controller与集群所有Broker节点间的网络连接,并向Broker发送控制请求及接收响应。
初始化状态管理器
创建状态管理器(
ControllerChangeHandler
),可监听
/controller
节点的操作,并在节点创建、删除或数据变化时执行相应处理。
启动控制器
控制器对象启动后,向事件管理器发送
Startup
事件。事件处理线程接收到事件后,通过ZK客户端向ZK申请
/controller
节点。申请成功后,执行一系列使当前节点成为Controller的操作,包括注册ZooKeeper监听器、删除日志路径变更及ISR副本变更通知事件、启动Controller通道管理器,以及启动副本状态机和分区状态机。
版权归原作者 大数据深度洞察 所有, 如有侵权,请联系我们删除。