Kafka 介绍
Kafka 是一个由 Apache 软件基金会开发的开源流式处理平台。它被设计用于处理大规模数据流,提供高可靠性、高吞吐量和低延迟的消息传递系统。Kafka 可以用于构建实时数据管道和流式应用程序,让不同应用、系统或者数据源之间能够高效地进行数据交换和通信。
Kafka 的核心概念包括以下几个部分:
- 消息: Kafka 是基于发布/订阅模式的消息系统,它通过主题(Topics)来组织消息。消息由生产者发布到主题,消费者可以订阅一个或多个主题以接收消息。
- 主题: 主题是消息的分类,每个主题可以包含一个或多个分区(Partitions)。消息发布到主题后,会根据一定规则被分发到不同的分区中。
- 分区: 主题可以被分为多个分区,每个分区都是有序且持久化的消息记录序列。分区使得 Kafka 能够水平扩展,允许多个消费者并行地处理消息。
- 生产者: 生产者负责向 Kafka 的主题发布消息。
- 消费者: 消费者从 Kafka 主题订阅并处理消息。
- 代理(Broker): Kafka 集群由多个代理组成,每个代理是一个独立的 Kafka 服务器,负责存储数据和处理消息。
Kafka 的特点包括:
- 持久性: Kafka 将消息持久化存储在磁盘上,保证消息不会丢失。
- 高吞吐量: Kafka 能够处理大量数据并保持低延迟,适用于大规模的数据处理和分析场景。
- 可扩展性: 可以水平扩展以处理更多数据和更高的负载。
- 容错性: Kafka 集群通过副本机制实现数据备份和容错,即使部分节点出现故障,仍能保证数据可靠性和可用性。
Kafka 在数据流处理、实时日志处理、指标监控等领域有着广泛的应用,被许多公司用于构建实时数据管道和处理大规模数据。
在 Ubuntu 环境下如何安装 Kafka、Kafka with Kraft
安装 Kafka 在 Ubuntu 环境下可以通过以下步骤进行。请注意,这里描述的是安装 Kafka 3.6.0 版本的方法。在安装之前,请确保已经安装了 Java 8 或更新版本。
了解一下 Kraft
Kafka 2.8 版本引入了 KRaft(Kafka Raft)作为 Kafka 的新的元数据管理方式,用来替代原本依赖 ZooKeeper 的方案。KRaft 是一个基于 Raft 一致性协议实现的元数据管理系统,它可以作为 Kafka 的替代方案,不再依赖 ZooKeeper。
Kafka with KRaft 使用 Raft 协议来管理和维护 Kafka 的元数据信息,包括分区分配、集群配置等。这样可以简化 Kafka 部署和管理过程,不再需要维护额外的 ZooKeeper 集群。
步骤:
1. 安装 Java
检查是否已经安装 Java:
java-version
如果未安装 Java 或需要更新,可以使用以下命令安装 OpenJDK:
sudoapt update
sudoaptinstall default-jdk
2. 下载 Kafka
在 Apache Kafka 的官方网站下载所需的 Kafka 版本,例如 3.6.0 版本。
Kafka 的版本号按照
<Scala 版本>-<Kafka 版本>
的格式命名。例如,
kafka_2.13-3.6.0.tgz
中的
3.6.0
是 Kafka 的版本号,而
2.13
表示这个 Kafka 版本是用 Scala 2.13 构建的。Kafka 发布的软件包已经包含了编译后的 Scala 代码,因此你只需按照 Kafka 的安装步骤进行操作即可,无需单独安装 Scala。
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
3. 解压并移动 Kafka
解压下载的 Kafka 压缩包:
tar-xzf kafka_2.13-3.6.0.tgz
将解压后的文件夹移动到所需位置,例如
/opt
目录:
sudomv kafka_2.13-3.6.0 /opt/kafka
4. 以 Kraft 方式启动 Kafka
生成集群 UUID:
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
使用
bin/kafka-storage.sh format
命令来为 Kafka with KRaft 集群的日志目录进行格式化
bin/kafka-storage.sh format-t$KAFKA_CLUSTER_ID-c config/kraft/server.properties
启动 Kafka 服务器:
# 正常运行
bin/kafka-server-start.sh config/kraft/server.properties
# 也可以选择后台运行nohup bin/kafka-server-start.sh config/kraft/server.properties > my_kafka_run.log 2>&1&
一旦 Kafka 服务器成功启动,你就会拥有一个基本的 Kafka 环境,可以开始使用了。
启动后的输出信息:
[2023-11-28 07:46:27,307] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)[2023-11-28 07:46:27,603] INFO Setting -Djdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)[2023-11-28 07:46:27,761] INFO Registered signal handlers forTERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)[2023-11-28 07:46:27,764] INFO [ControllerServer id=1] Starting controller (kafka.server.ControllerServer)[2023-11-28 07:46:27,782] INFO authorizerStart completed for endpoint CONTROLLER. Endpoint is now READY. (org.apache.kafka.server.network.EndpointReadyFutures)[2023-11-28 07:46:28,132] INFO Updated connection-accept-rate max connection creation rate to 2147483647(kafka.network.ConnectionQuotas)[2023-11-28 07:46:28,165] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Created data-plane acceptor and processors for endpoint : ListenerName(CONTROLLER)(kafka.network.SocketServer)[2023-11-28 07:46:28,166] INFO [SharedServer id=1] Starting SharedServer (kafka.server.SharedServer)[2023-11-28 07:46:28,224] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2(kafka.log.UnifiedLog$)[2023-11-28 07:46:28,224] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Reloading from producer snapshot and rebuilding producer state from offset 0(kafka.log.UnifiedLog$)[2023-11-28 07:46:28,224] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Producer state recovery took 0ms for snapshot load and 0ms for segment recovery from offset 0(kafka.log.UnifiedLog$)[2023-11-28 07:46:28,262] INFO Initialized snapshots with IDs SortedSet() from /tmp/kraft-combined-logs/__cluster_metadata-0 (kafka.raft.KafkaMetadataLog$)[2023-11-28 07:46:28,301] INFO [raft-expiration-reaper]: Starting (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)[2023-11-28 07:46:28,490] INFO [RaftManager id=1] Completed transition to Unattached(epoch=0, voters=[1], electionTimeoutMs=1226) from null (org.apache.kafka.raft.QuorumState)[2023-11-28 07:46:28,563] INFO [RaftManager id=1] Completed transition to CandidateState(localId=1, epoch=1, retries=1, voteStates={1=GRANTED}, highWatermark=Optional.empty, electionTimeoutMs=1279) from Unattached(epoch=0, voters=[1], electionTimeoutMs=1226)(org.apache.kafka.raft.QuorumState)[2023-11-28 07:46:28,572] INFO [RaftManager id=1] Completed transition to Leader(localId=1, epoch=1, epochStartOffset=0, highWatermark=Optional.empty, voterStates={1=ReplicaState(nodeId=1, endOffset=Optional.empty, lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)}) from CandidateState(localId=1, epoch=1, retries=1, voteStates={1=GRANTED}, highWatermark=Optional.empty, electionTimeoutMs=1279)(org.apache.kafka.raft.QuorumState)[2023-11-28 07:46:28,596] INFO [kafka-1-raft-outbound-request-thread]: Starting (kafka.raft.RaftSendThread)[2023-11-28 07:46:28,596] INFO [kafka-1-raft-io-thread]: Starting (kafka.raft.KafkaRaftManager$RaftIoThread)[2023-11-28 07:46:28,617] INFO [RaftManager id=1] High watermark set to LogOffsetMetadata(offset=1, metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=91)])for the first timefor epoch 1 based on indexOfHw 0 and voters [ReplicaState(nodeId=1, endOffset=Optional[LogOffsetMetadata(offset=1, metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=91)])], lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)](org.apache.kafka.raft.LeaderState)[2023-11-28 07:46:28,619] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:28,620] INFO [ControllerServer id=1] Waiting for controller quorum voters future (kafka.server.ControllerServer)[2023-11-28 07:46:28,621] INFO [ControllerServer id=1] Finished waiting for controller quorum voters future (kafka.server.ControllerServer)[2023-11-28 07:46:28,659] INFO [controller-1-ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,660] INFO [controller-1-ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,661] INFO [controller-1-ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,662] INFO [controller-1-ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,678] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,686] INFO [ControllerServer id=1] Waiting for the controller metadata publishers to be installed (kafka.server.ControllerServer)[2023-11-28 07:46:28,686] INFO [ControllerServer id=1] Finished waiting for the controller metadata publishers to be installed (kafka.server.ControllerServer)[2023-11-28 07:46:28,686] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:28,686] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Enabling request processing. (kafka.network.SocketServer)[2023-11-28 07:46:28,690] INFO Awaiting socket connections on 0.0.0.0:9093. (kafka.network.DataPlaneAcceptor)[2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer)[2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer)[2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer)[2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer)[2023-11-28 07:46:28,698] INFO [BrokerServer id=1] Transition from SHUTDOWN to STARTING (kafka.server.BrokerServer)[2023-11-28 07:46:28,699] INFO [BrokerServer id=1] Starting broker (kafka.server.BrokerServer)[2023-11-28 07:46:28,706] INFO [broker-1-ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,707] INFO [broker-1-ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,707] INFO [broker-1-ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,707] INFO [broker-1-ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,724] INFO [BrokerServer id=1] Waiting for controller quorum voters future (kafka.server.BrokerServer)[2023-11-28 07:46:28,724] INFO [BrokerServer id=1] Finished waiting for controller quorum voters future (kafka.server.BrokerServer)[2023-11-28 07:46:28,729] INFO [broker-1-to-controller-forwarding-channel-manager]: Starting (kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,731] INFO [broker-1-to-controller-forwarding-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,755] INFO Updated connection-accept-rate max connection creation rate to 2147483647(kafka.network.ConnectionQuotas)[2023-11-28 07:46:28,760] INFO [SocketServer listenerType=BROKER, nodeId=1] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT)(kafka.network.SocketServer)[2023-11-28 07:46:28,764] INFO [broker-1-to-controller-alter-partition-channel-manager]: Starting (kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,764] INFO [broker-1-to-controller-alter-partition-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,782] INFO [ExpirationReaper-1-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,783] INFO [ExpirationReaper-1-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,784] INFO [ExpirationReaper-1-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,786] INFO [ExpirationReaper-1-ElectLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,786] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:28,786] INFO [ExpirationReaper-1-RemoteFetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,801] INFO [ExpirationReaper-1-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,804] INFO [ExpirationReaper-1-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,836] INFO [broker-1-to-controller-heartbeat-channel-manager]: Starting (kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,836] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,837] INFO [BrokerLifecycleManager id=1] Incarnation rXokDA-kRI2e0TCw3qUr4g of broker 1in cluster ktQqKm60RwiR-s4Dts0HDg is now STARTING. (kafka.server.BrokerLifecycleManager)[2023-11-28 07:46:28,857] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,877] INFO [BrokerServer id=1] Waiting for the broker metadata publishers to be installed (kafka.server.BrokerServer)[2023-11-28 07:46:28,877] INFO [BrokerServer id=1] Finished waiting for the broker metadata publishers to be installed (kafka.server.BrokerServer)[2023-11-28 07:46:28,877] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:28,877] INFO [BrokerServer id=1] Waiting for the controller to acknowledge that we are caught up (kafka.server.BrokerServer)[2023-11-28 07:46:28,920] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:28,921] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,972] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,977] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:28,979] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:28,979] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,029] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,035] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:29,036] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,077] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,086] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,091] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:29,091] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,141] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,147] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:29,147] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,178] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,197] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,202] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:29,202] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,253] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,270] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:29,271] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,276] INFO [RaftManager id=1] Registered the listener org.apache.kafka.image.loader.MetadataLoader@382374793 (org.apache.kafka.raft.KafkaRaftClient)[2023-11-28 07:46:29,276] INFO [RaftManager id=1] Registered the listener org.apache.kafka.controller.QuorumController$QuorumMetaLogListener@859950147 (org.apache.kafka.raft.KafkaRaftClient)[2023-11-28 07:46:29,281] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,288] INFO [MetadataLoader id=1] maybePublishMetadata(LOG_DELTA): The loader is still catching up because we have not loaded a controller record as of offset 0 and high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,320] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,332] INFO [MetadataLoader id=1] maybePublishMetadata(LOG_DELTA): The loader finished catching up to the current high water mark of 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,360] INFO [BrokerLifecycleManager id=1] Successfully registered broker 1 with broker epoch 5(kafka.server.BrokerLifecycleManager)[2023-11-28 07:46:29,382] INFO [BrokerLifecycleManager id=1] The broker has caught up. Transitioning from STARTING to RECOVERY. (kafka.server.BrokerLifecycleManager)[2023-11-28 07:46:29,383] INFO [BrokerServer id=1] Finished waiting for the controller to acknowledge that we are caught up (kafka.server.BrokerServer)[2023-11-28 07:46:29,383] INFO [BrokerServer id=1] Waiting for the initial broker metadata update to be published (kafka.server.BrokerServer)[2023-11-28 07:46:29,386] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing SnapshotGenerator with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,386] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing FeaturesPublisher with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,386] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing DynamicConfigPublisher controller id=1 with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,388] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing DynamicClientQuotaPublisher controller id=1 with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,389] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing ScramPublisher controller id=1 with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,390] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing DelegationTokenPublisher controller id=1 with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,392] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing ControllerMetadataMetricsPublisher with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,393] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing AclPublisher controller id=1 with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,394] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing BrokerMetadataPublisher with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,394] INFO [BrokerMetadataPublisher id=1] Publishing initial metadata at offset OffsetAndEpoch(offset=5, epoch=1) with metadata.version 3.6-IV2. (kafka.server.metadata.BrokerMetadataPublisher)[2023-11-28 07:46:29,395] INFO [BrokerLifecycleManager id=1] The broker is in RECOVERY. (kafka.server.BrokerLifecycleManager)[2023-11-28 07:46:29,397] INFO Loading logs from log dirs ArraySeq(/tmp/kraft-combined-logs)(kafka.log.LogManager)[2023-11-28 07:46:29,402] INFO No logs found to be loaded in /tmp/kraft-combined-logs (kafka.log.LogManager)[2023-11-28 07:46:29,409] INFO Loaded 0 logs in 12ms (kafka.log.LogManager)[2023-11-28 07:46:29,410] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)[2023-11-28 07:46:29,415] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)[2023-11-28 07:46:29,555] INFO [kafka-log-cleaner-thread-0]: Starting (kafka.log.LogCleaner$CleanerThread)[2023-11-28 07:46:29,556] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)[2023-11-28 07:46:29,557] INFO [AddPartitionsToTxnSenderThread-1]: Starting (kafka.server.AddPartitionsToTxnManager)[2023-11-28 07:46:29,557] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.group.GroupCoordinator)[2023-11-28 07:46:29,561] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator)[2023-11-28 07:46:29,562] INFO [TransactionCoordinator id=1] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)[2023-11-28 07:46:29,563] INFO [TransactionCoordinator id=1] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)[2023-11-28 07:46:29,563] INFO [BrokerMetadataPublisher id=1] Updating metadata.version to 14 at offset OffsetAndEpoch(offset=5, epoch=1). (kafka.server.metadata.BrokerMetadataPublisher)[2023-11-28 07:46:29,566] INFO [TxnMarkerSenderThread-1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)[2023-11-28 07:46:29,568] INFO [BrokerServer id=1] Finished waiting for the initial broker metadata update to be published (kafka.server.BrokerServer)[2023-11-28 07:46:29,570] INFO KafkaConfig values:
advertised.listeners = PLAINTEXT://localhost:9092
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num =11
alter.log.dirs.replication.quota.window.size.seconds =1
authorizer.class.name =
auto.create.topics.enable =true
auto.include.jmx.reporter =true
auto.leader.rebalance.enable =true
background.threads =10
broker.heartbeat.interval.ms =2000
broker.id =1
broker.id.generation.enable =true
broker.rack = null
broker.session.timeout.ms =9000
client.quota.callback.class = null
compression.type = producer
connection.failed.authentication.delay.ms =100
connections.max.idle.ms =600000
connections.max.reauth.ms =0
control.plane.listener.name = null
controlled.shutdown.enable =true
controlled.shutdown.max.retries =3
controlled.shutdown.retry.backoff.ms =5000
controller.listener.names = CONTROLLER
controller.quorum.append.linger.ms =25
controller.quorum.election.backoff.max.ms =1000
controller.quorum.election.timeout.ms =1000
controller.quorum.fetch.timeout.ms =2000
controller.quorum.request.timeout.ms =2000
controller.quorum.retry.backoff.ms =20
controller.quorum.voters =[1@localhost:9093]
controller.quota.window.num =11
controller.quota.window.size.seconds =1
controller.socket.timeout.ms =30000
create.topic.policy.class.name = null
default.replication.factor =1
delegation.token.expiry.check.interval.ms =3600000
delegation.token.expiry.time.ms =86400000
delegation.token.master.key = null
delegation.token.max.lifetime.ms =604800000
delegation.token.secret.key = null
delete.records.purgatory.purge.interval.requests =1
delete.topic.enable =true
early.start.listeners = null
fetch.max.bytes =57671680
fetch.purgatory.purge.interval.requests =1000
group.consumer.assignors =[org.apache.kafka.coordinator.group.assignor.RangeAssignor]
group.consumer.heartbeat.interval.ms =5000
group.consumer.max.heartbeat.interval.ms =15000
group.consumer.max.session.timeout.ms =60000
group.consumer.max.size =2147483647
group.consumer.min.heartbeat.interval.ms =5000
group.consumer.min.session.timeout.ms =45000
group.consumer.session.timeout.ms =45000
group.coordinator.new.enable =false
group.coordinator.threads =1
group.initial.rebalance.delay.ms =3000
group.max.session.timeout.ms =1800000
group.max.size =2147483647
group.min.session.timeout.ms =6000
initial.broker.registration.timeout.ms =60000
inter.broker.listener.name = PLAINTEXT
inter.broker.protocol.version =3.6-IV2
kafka.metrics.polling.interval.secs =10
kafka.metrics.reporters =[]
leader.imbalance.check.interval.seconds =300
leader.imbalance.per.broker.percentage =10
listener.security.protocol.map = CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = PLAINTEXT://:9092,CONTROLLER://:9093
log.cleaner.backoff.ms =15000
log.cleaner.dedupe.buffer.size =134217728
log.cleaner.delete.retention.ms =86400000
log.cleaner.enable =true
log.cleaner.io.buffer.load.factor =0.9
log.cleaner.io.buffer.size =524288
log.cleaner.io.max.bytes.per.second =1.7976931348623157E308
log.cleaner.max.compaction.lag.ms =9223372036854775807
log.cleaner.min.cleanable.ratio =0.5
log.cleaner.min.compaction.lag.ms =0
log.cleaner.threads =1
log.cleanup.policy =[delete]
log.dir = /tmp/kafka-logs
log.dirs = /tmp/kraft-combined-logs
log.flush.interval.messages =9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms =60000
log.flush.scheduler.interval.ms =9223372036854775807
log.flush.start.offset.checkpoint.interval.ms =60000
log.index.interval.bytes =4096
log.index.size.max.bytes =10485760
log.local.retention.bytes =-2
log.local.retention.ms =-2
log.message.downconversion.enable =true
log.message.format.version =3.0-IV1
log.message.timestamp.after.max.ms =9223372036854775807
log.message.timestamp.before.max.ms =9223372036854775807
log.message.timestamp.difference.max.ms =9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate =false
log.retention.bytes =-1
log.retention.check.interval.ms =300000
log.retention.hours =168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours =168
log.roll.jitter.hours =0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes =1073741824
log.segment.delete.delay.ms =60000
max.connection.creation.rate =2147483647
max.connections =2147483647
max.connections.per.ip =2147483647
max.connections.per.ip.overrides =
max.incremental.fetch.session.cache.slots =1000
message.max.bytes =1048588
metadata.log.dir = null
metadata.log.max.record.bytes.between.snapshots =20971520
metadata.log.max.snapshot.interval.ms =3600000
metadata.log.segment.bytes =1073741824
metadata.log.segment.min.bytes =8388608
metadata.log.segment.ms =604800000
metadata.max.idle.interval.ms =500
metadata.max.retention.bytes =104857600
metadata.max.retention.ms =604800000
metric.reporters =[]
metrics.num.samples =2
metrics.recording.level = INFO
metrics.sample.window.ms =30000
min.insync.replicas =1
node.id =1
num.io.threads =8
num.network.threads =3
num.partitions =1
num.recovery.threads.per.data.dir =1
num.replica.alter.log.dirs.threads = null
num.replica.fetchers =1
offset.metadata.max.bytes =4096
offsets.commit.required.acks =-1
offsets.commit.timeout.ms =5000
offsets.load.buffer.size =5242880
offsets.retention.check.interval.ms =600000
offsets.retention.minutes =10080
offsets.topic.compression.codec =0
offsets.topic.num.partitions =50
offsets.topic.replication.factor =1
offsets.topic.segment.bytes =104857600
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
password.encoder.iterations =4096
password.encoder.key.length =128
password.encoder.keyfactory.algorithm = null
password.encoder.old.secret = null
password.encoder.secret = null
principal.builder.class = class org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
process.roles =[broker, controller]
producer.id.expiration.check.interval.ms =600000
producer.id.expiration.ms =86400000
producer.purgatory.purge.interval.requests =1000
queued.max.request.bytes =-1
queued.max.requests =500
quota.window.num =11
quota.window.size.seconds =1
remote.log.index.file.cache.total.size.bytes =1073741824
remote.log.manager.task.interval.ms =30000
remote.log.manager.task.retry.backoff.max.ms =30000
remote.log.manager.task.retry.backoff.ms =500
remote.log.manager.task.retry.jitter =0.2
remote.log.manager.thread.pool.size =10
remote.log.metadata.custom.metadata.max.bytes =128
remote.log.metadata.manager.class.name = org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
remote.log.metadata.manager.class.path = null
remote.log.metadata.manager.impl.prefix = rlmm.config.
remote.log.metadata.manager.listener.name = null
remote.log.reader.max.pending.tasks =100
remote.log.reader.threads =10
remote.log.storage.manager.class.name = null
remote.log.storage.manager.class.path = null
remote.log.storage.manager.impl.prefix = rsm.config.
remote.log.storage.system.enable =false
replica.fetch.backoff.ms =1000
replica.fetch.max.bytes =1048576
replica.fetch.min.bytes =1
replica.fetch.response.max.bytes =10485760
replica.fetch.wait.max.ms =500
replica.high.watermark.checkpoint.interval.ms =5000
replica.lag.time.max.ms =30000
replica.selector.class = null
replica.socket.receive.buffer.bytes =65536
replica.socket.timeout.ms =30000
replication.quota.window.num =11
replication.quota.window.size.seconds =1
request.timeout.ms =30000
reserved.broker.max.id =1000
sasl.client.callback.handler.class = null
sasl.enabled.mechanisms =[GSSAPI]
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin =60000
sasl.kerberos.principal.to.local.rules =[DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter =0.05
sasl.kerberos.ticket.renew.window.factor =0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds =300
sasl.login.refresh.min.period.seconds =60
sasl.login.refresh.window.factor =0.8
sasl.login.refresh.window.jitter =0.05
sasl.login.retry.backoff.max.ms =10000
sasl.login.retry.backoff.ms =100
sasl.mechanism.controller.protocol = GSSAPI
sasl.mechanism.inter.broker.protocol = GSSAPI
sasl.oauthbearer.clock.skew.seconds =30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms =3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms =10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms =100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
sasl.server.callback.handler.class = null
sasl.server.max.receive.size =524288
security.inter.broker.protocol = PLAINTEXT
security.providers = null
server.max.startup.time.ms =9223372036854775807
socket.connection.setup.timeout.max.ms =30000
socket.connection.setup.timeout.ms =10000
socket.listen.backlog.size =50
socket.receive.buffer.bytes =102400
socket.request.max.bytes =104857600
socket.send.buffer.bytes =102400
ssl.cipher.suites =[]
ssl.client.auth = none
ssl.enabled.protocols =[TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.principal.mapping.rules = DEFAULT
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.abort.timed.out.transaction.cleanup.interval.ms =10000
transaction.max.timeout.ms =900000
transaction.partition.verification.enable =true
transaction.remove.expired.transaction.cleanup.interval.ms =3600000
transaction.state.log.load.buffer.size =5242880
transaction.state.log.min.isr =1
transaction.state.log.num.partitions =50
transaction.state.log.replication.factor =1
transaction.state.log.segment.bytes =104857600
transactional.id.expiration.ms =604800000
unclean.leader.election.enable =false
unstable.api.versions.enable =false
zookeeper.clientCnxnSocket = null
zookeeper.connect = null
zookeeper.connection.timeout.ms = null
zookeeper.max.in.flight.requests =10
zookeeper.metadata.migration.enable =false
zookeeper.session.timeout.ms =18000
zookeeper.set.acl =false
zookeeper.ssl.cipher.suites = null
zookeeper.ssl.client.enable =false
zookeeper.ssl.crl.enable =false
zookeeper.ssl.enabled.protocols = null
zookeeper.ssl.endpoint.identification.algorithm = HTTPS
zookeeper.ssl.keystore.location = null
zookeeper.ssl.keystore.password = null
zookeeper.ssl.keystore.type = null
zookeeper.ssl.ocsp.enable =false
zookeeper.ssl.protocol = TLSv1.2
zookeeper.ssl.truststore.location = null
zookeeper.ssl.truststore.password = null
zookeeper.ssl.truststore.type = null
(kafka.server.KafkaConfig)[2023-11-28 07:46:29,577] INFO [BrokerServer id=1] Waiting for the broker to be unfenced (kafka.server.BrokerServer)[2023-11-28 07:46:29,612] INFO [BrokerLifecycleManager id=1] The broker has been unfenced. Transitioning from RECOVERY to RUNNING. (kafka.server.BrokerLifecycleManager)[2023-11-28 07:46:29,661] INFO [BrokerServer id=1] Finished waiting for the broker to be unfenced (kafka.server.BrokerServer)[2023-11-28 07:46:29,662] INFO authorizerStart completed for endpoint PLAINTEXT. Endpoint is now READY. (org.apache.kafka.server.network.EndpointReadyFutures)[2023-11-28 07:46:29,663] INFO [SocketServer listenerType=BROKER, nodeId=1] Enabling request processing. (kafka.network.SocketServer)[2023-11-28 07:46:29,663] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)[2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)[2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)[2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)[2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)[2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)[2023-11-28 07:46:29,665] INFO Kafka version: 3.6.0 (org.apache.kafka.common.utils.AppInfoParser)[2023-11-28 07:46:29,665] INFO Kafka commitId: 60e845626d8a465a (org.apache.kafka.common.utils.AppInfoParser)[2023-11-28 07:46:29,665] INFO Kafka startTimeMs: 1701157589664(org.apache.kafka.common.utils.AppInfoParser)[2023-11-28 07:46:29,666] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)[2023-11-28 07:53:16,542] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:53:16,543] INFO [BrokerLifecycleManager id=1] Unable to send a heartbeat because the RPC got timed out before it could be sent. (kafka.server.BrokerLifecycleManager)
6. 测试 Kafka
创建一个主题(Topic)并发送/接收一些消息来测试 Kafka。例如,创建名为
test-topic
的主题:
bin/kafka-topics.sh --create--topic test-topic --bootstrap-server localhost:9092 --replication-factor 1--partitions1
生产者发送消息到该主题:
bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
在另一个终端窗口中启动消费者以接收消息:
bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning
这些步骤将帮助你在 Ubuntu 上安装并启动 Kafka,并进行简单的测试以确保 Kafka 正常运行。
版权归原作者 天河书阁 VicRestart 所有, 如有侵权,请联系我们删除。