0


Ubuntu 环境安装 Kafka、配置运行测试 Kafka 流程笔记

Kafka 介绍

Kafka 是一个由 Apache 软件基金会开发的开源流式处理平台。它被设计用于处理大规模数据流,提供高可靠性、高吞吐量和低延迟的消息传递系统。Kafka 可以用于构建实时数据管道和流式应用程序,让不同应用、系统或者数据源之间能够高效地进行数据交换和通信。

在这里插入图片描述

Kafka 的核心概念包括以下几个部分:

  1. 消息: Kafka 是基于发布/订阅模式的消息系统,它通过主题(Topics)来组织消息。消息由生产者发布到主题,消费者可以订阅一个或多个主题以接收消息。
  2. 主题: 主题是消息的分类,每个主题可以包含一个或多个分区(Partitions)。消息发布到主题后,会根据一定规则被分发到不同的分区中。
  3. 分区: 主题可以被分为多个分区,每个分区都是有序且持久化的消息记录序列。分区使得 Kafka 能够水平扩展,允许多个消费者并行地处理消息。
  4. 生产者: 生产者负责向 Kafka 的主题发布消息。
  5. 消费者: 消费者从 Kafka 主题订阅并处理消息。
  6. 代理(Broker): Kafka 集群由多个代理组成,每个代理是一个独立的 Kafka 服务器,负责存储数据和处理消息。

Kafka 的特点包括:

  • 持久性: Kafka 将消息持久化存储在磁盘上,保证消息不会丢失。
  • 高吞吐量: Kafka 能够处理大量数据并保持低延迟,适用于大规模的数据处理和分析场景。
  • 可扩展性: 可以水平扩展以处理更多数据和更高的负载。
  • 容错性: Kafka 集群通过副本机制实现数据备份和容错,即使部分节点出现故障,仍能保证数据可靠性和可用性。

Kafka 在数据流处理、实时日志处理、指标监控等领域有着广泛的应用,被许多公司用于构建实时数据管道和处理大规模数据。

在 Ubuntu 环境下如何安装 Kafka、Kafka with Kraft

安装 Kafka 在 Ubuntu 环境下可以通过以下步骤进行。请注意,这里描述的是安装 Kafka 3.6.0 版本的方法。在安装之前,请确保已经安装了 Java 8 或更新版本。

在这里插入图片描述

了解一下 Kraft

Kafka 2.8 版本引入了 KRaft(Kafka Raft)作为 Kafka 的新的元数据管理方式,用来替代原本依赖 ZooKeeper 的方案。KRaft 是一个基于 Raft 一致性协议实现的元数据管理系统,它可以作为 Kafka 的替代方案,不再依赖 ZooKeeper。

Kafka with KRaft 使用 Raft 协议来管理和维护 Kafka 的元数据信息,包括分区分配、集群配置等。这样可以简化 Kafka 部署和管理过程,不再需要维护额外的 ZooKeeper 集群。

步骤:

1. 安装 Java

检查是否已经安装 Java:

java-version

如果未安装 Java 或需要更新,可以使用以下命令安装 OpenJDK:

sudoapt update
sudoaptinstall default-jdk
2. 下载 Kafka

在 Apache Kafka 的官方网站下载所需的 Kafka 版本,例如 3.6.0 版本。

在这里插入图片描述

Kafka 的版本号按照

<Scala 版本>-<Kafka 版本>

的格式命名。例如,

kafka_2.13-3.6.0.tgz

中的

3.6.0

是 Kafka 的版本号,而

2.13

表示这个 Kafka 版本是用 Scala 2.13 构建的。Kafka 发布的软件包已经包含了编译后的 Scala 代码,因此你只需按照 Kafka 的安装步骤进行操作即可,无需单独安装 Scala。

wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
3. 解压并移动 Kafka

解压下载的 Kafka 压缩包:

tar-xzf kafka_2.13-3.6.0.tgz

将解压后的文件夹移动到所需位置,例如

/opt

目录:

sudomv kafka_2.13-3.6.0 /opt/kafka
4. 以 Kraft 方式启动 Kafka

生成集群 UUID:

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

使用

bin/kafka-storage.sh format

命令来为 Kafka with KRaft 集群的日志目录进行格式化

bin/kafka-storage.sh format-t$KAFKA_CLUSTER_ID-c config/kraft/server.properties

启动 Kafka 服务器:

# 正常运行
bin/kafka-server-start.sh config/kraft/server.properties
# 也可以选择后台运行nohup bin/kafka-server-start.sh config/kraft/server.properties > my_kafka_run.log 2>&1&

一旦 Kafka 服务器成功启动,你就会拥有一个基本的 Kafka 环境,可以开始使用了。

在这里插入图片描述

启动后的输出信息:
[2023-11-28 07:46:27,307] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)[2023-11-28 07:46:27,603] INFO Setting -Djdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)[2023-11-28 07:46:27,761] INFO Registered signal handlers forTERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)[2023-11-28 07:46:27,764] INFO [ControllerServer id=1] Starting controller (kafka.server.ControllerServer)[2023-11-28 07:46:27,782] INFO authorizerStart completed for endpoint CONTROLLER. Endpoint is now READY. (org.apache.kafka.server.network.EndpointReadyFutures)[2023-11-28 07:46:28,132] INFO Updated connection-accept-rate max connection creation rate to 2147483647(kafka.network.ConnectionQuotas)[2023-11-28 07:46:28,165] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Created data-plane acceptor and processors for endpoint : ListenerName(CONTROLLER)(kafka.network.SocketServer)[2023-11-28 07:46:28,166] INFO [SharedServer id=1] Starting SharedServer (kafka.server.SharedServer)[2023-11-28 07:46:28,224] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2(kafka.log.UnifiedLog$)[2023-11-28 07:46:28,224] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Reloading from producer snapshot and rebuilding producer state from offset 0(kafka.log.UnifiedLog$)[2023-11-28 07:46:28,224] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Producer state recovery took 0ms for snapshot load and 0ms for segment recovery from offset 0(kafka.log.UnifiedLog$)[2023-11-28 07:46:28,262] INFO Initialized snapshots with IDs SortedSet() from /tmp/kraft-combined-logs/__cluster_metadata-0 (kafka.raft.KafkaMetadataLog$)[2023-11-28 07:46:28,301] INFO [raft-expiration-reaper]: Starting (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)[2023-11-28 07:46:28,490] INFO [RaftManager id=1] Completed transition to Unattached(epoch=0, voters=[1], electionTimeoutMs=1226) from null (org.apache.kafka.raft.QuorumState)[2023-11-28 07:46:28,563] INFO [RaftManager id=1] Completed transition to CandidateState(localId=1, epoch=1, retries=1, voteStates={1=GRANTED}, highWatermark=Optional.empty, electionTimeoutMs=1279) from Unattached(epoch=0, voters=[1], electionTimeoutMs=1226)(org.apache.kafka.raft.QuorumState)[2023-11-28 07:46:28,572] INFO [RaftManager id=1] Completed transition to Leader(localId=1, epoch=1, epochStartOffset=0, highWatermark=Optional.empty, voterStates={1=ReplicaState(nodeId=1, endOffset=Optional.empty, lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)}) from CandidateState(localId=1, epoch=1, retries=1, voteStates={1=GRANTED}, highWatermark=Optional.empty, electionTimeoutMs=1279)(org.apache.kafka.raft.QuorumState)[2023-11-28 07:46:28,596] INFO [kafka-1-raft-outbound-request-thread]: Starting (kafka.raft.RaftSendThread)[2023-11-28 07:46:28,596] INFO [kafka-1-raft-io-thread]: Starting (kafka.raft.KafkaRaftManager$RaftIoThread)[2023-11-28 07:46:28,617] INFO [RaftManager id=1] High watermark set to LogOffsetMetadata(offset=1, metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=91)])for the first timefor epoch 1 based on indexOfHw 0 and voters [ReplicaState(nodeId=1, endOffset=Optional[LogOffsetMetadata(offset=1, metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=91)])], lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)](org.apache.kafka.raft.LeaderState)[2023-11-28 07:46:28,619] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:28,620] INFO [ControllerServer id=1] Waiting for controller quorum voters future (kafka.server.ControllerServer)[2023-11-28 07:46:28,621] INFO [ControllerServer id=1] Finished waiting for controller quorum voters future (kafka.server.ControllerServer)[2023-11-28 07:46:28,659] INFO [controller-1-ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,660] INFO [controller-1-ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,661] INFO [controller-1-ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,662] INFO [controller-1-ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,678] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,686] INFO [ControllerServer id=1] Waiting for the controller metadata publishers to be installed (kafka.server.ControllerServer)[2023-11-28 07:46:28,686] INFO [ControllerServer id=1] Finished waiting for the controller metadata publishers to be installed (kafka.server.ControllerServer)[2023-11-28 07:46:28,686] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:28,686] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Enabling request processing. (kafka.network.SocketServer)[2023-11-28 07:46:28,690] INFO Awaiting socket connections on 0.0.0.0:9093. (kafka.network.DataPlaneAcceptor)[2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer)[2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer)[2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer)[2023-11-28 07:46:28,696] INFO [ControllerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer)[2023-11-28 07:46:28,698] INFO [BrokerServer id=1] Transition from SHUTDOWN to STARTING (kafka.server.BrokerServer)[2023-11-28 07:46:28,699] INFO [BrokerServer id=1] Starting broker (kafka.server.BrokerServer)[2023-11-28 07:46:28,706] INFO [broker-1-ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,707] INFO [broker-1-ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,707] INFO [broker-1-ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,707] INFO [broker-1-ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)[2023-11-28 07:46:28,724] INFO [BrokerServer id=1] Waiting for controller quorum voters future (kafka.server.BrokerServer)[2023-11-28 07:46:28,724] INFO [BrokerServer id=1] Finished waiting for controller quorum voters future (kafka.server.BrokerServer)[2023-11-28 07:46:28,729] INFO [broker-1-to-controller-forwarding-channel-manager]: Starting (kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,731] INFO [broker-1-to-controller-forwarding-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,755] INFO Updated connection-accept-rate max connection creation rate to 2147483647(kafka.network.ConnectionQuotas)[2023-11-28 07:46:28,760] INFO [SocketServer listenerType=BROKER, nodeId=1] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT)(kafka.network.SocketServer)[2023-11-28 07:46:28,764] INFO [broker-1-to-controller-alter-partition-channel-manager]: Starting (kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,764] INFO [broker-1-to-controller-alter-partition-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,782] INFO [ExpirationReaper-1-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,783] INFO [ExpirationReaper-1-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,784] INFO [ExpirationReaper-1-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,786] INFO [ExpirationReaper-1-ElectLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,786] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:28,786] INFO [ExpirationReaper-1-RemoteFetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,801] INFO [ExpirationReaper-1-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,804] INFO [ExpirationReaper-1-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,836] INFO [broker-1-to-controller-heartbeat-channel-manager]: Starting (kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,836] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,837] INFO [BrokerLifecycleManager id=1] Incarnation rXokDA-kRI2e0TCw3qUr4g of broker 1in cluster ktQqKm60RwiR-s4Dts0HDg is now STARTING. (kafka.server.BrokerLifecycleManager)[2023-11-28 07:46:28,857] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2023-11-28 07:46:28,877] INFO [BrokerServer id=1] Waiting for the broker metadata publishers to be installed (kafka.server.BrokerServer)[2023-11-28 07:46:28,877] INFO [BrokerServer id=1] Finished waiting for the broker metadata publishers to be installed (kafka.server.BrokerServer)[2023-11-28 07:46:28,877] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:28,877] INFO [BrokerServer id=1] Waiting for the controller to acknowledge that we are caught up (kafka.server.BrokerServer)[2023-11-28 07:46:28,920] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:28,921] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,972] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:28,977] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:28,979] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:28,979] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,029] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,035] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:29,036] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,077] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,086] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,091] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:29,091] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,141] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,147] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:29,147] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,178] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,197] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,202] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:29,202] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,253] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,270] INFO [BrokerToControllerChannelManager id=1name=heartbeat] Client requested disconnect from node1(org.apache.kafka.clients.NetworkClient)[2023-11-28 07:46:29,271] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,276] INFO [RaftManager id=1] Registered the listener org.apache.kafka.image.loader.MetadataLoader@382374793 (org.apache.kafka.raft.KafkaRaftClient)[2023-11-28 07:46:29,276] INFO [RaftManager id=1] Registered the listener org.apache.kafka.controller.QuorumController$QuorumMetaLogListener@859950147 (org.apache.kafka.raft.KafkaRaftClient)[2023-11-28 07:46:29,281] INFO [MetadataLoader id=1] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,288] INFO [MetadataLoader id=1] maybePublishMetadata(LOG_DELTA): The loader is still catching up because we have not loaded a controller record as of offset 0 and high water mark is 1(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,320] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:46:29,332] INFO [MetadataLoader id=1] maybePublishMetadata(LOG_DELTA): The loader finished catching up to the current high water mark of 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,360] INFO [BrokerLifecycleManager id=1] Successfully registered broker 1 with broker epoch 5(kafka.server.BrokerLifecycleManager)[2023-11-28 07:46:29,382] INFO [BrokerLifecycleManager id=1] The broker has caught up. Transitioning from STARTING to RECOVERY. (kafka.server.BrokerLifecycleManager)[2023-11-28 07:46:29,383] INFO [BrokerServer id=1] Finished waiting for the controller to acknowledge that we are caught up (kafka.server.BrokerServer)[2023-11-28 07:46:29,383] INFO [BrokerServer id=1] Waiting for the initial broker metadata update to be published (kafka.server.BrokerServer)[2023-11-28 07:46:29,386] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing SnapshotGenerator with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,386] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing FeaturesPublisher with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,386] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing DynamicConfigPublisher controller id=1 with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,388] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing DynamicClientQuotaPublisher controller id=1 with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,389] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing ScramPublisher controller id=1 with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,390] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing DelegationTokenPublisher controller id=1 with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,392] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing ControllerMetadataMetricsPublisher with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,393] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing AclPublisher controller id=1 with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,394] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing BrokerMetadataPublisher with a snapshot at offset 5(org.apache.kafka.image.loader.MetadataLoader)[2023-11-28 07:46:29,394] INFO [BrokerMetadataPublisher id=1] Publishing initial metadata at offset OffsetAndEpoch(offset=5, epoch=1) with metadata.version 3.6-IV2. (kafka.server.metadata.BrokerMetadataPublisher)[2023-11-28 07:46:29,395] INFO [BrokerLifecycleManager id=1] The broker is in RECOVERY. (kafka.server.BrokerLifecycleManager)[2023-11-28 07:46:29,397] INFO Loading logs from log dirs ArraySeq(/tmp/kraft-combined-logs)(kafka.log.LogManager)[2023-11-28 07:46:29,402] INFO No logs found to be loaded in /tmp/kraft-combined-logs (kafka.log.LogManager)[2023-11-28 07:46:29,409] INFO Loaded 0 logs in 12ms (kafka.log.LogManager)[2023-11-28 07:46:29,410] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)[2023-11-28 07:46:29,415] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)[2023-11-28 07:46:29,555] INFO [kafka-log-cleaner-thread-0]: Starting (kafka.log.LogCleaner$CleanerThread)[2023-11-28 07:46:29,556] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)[2023-11-28 07:46:29,557] INFO [AddPartitionsToTxnSenderThread-1]: Starting (kafka.server.AddPartitionsToTxnManager)[2023-11-28 07:46:29,557] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.group.GroupCoordinator)[2023-11-28 07:46:29,561] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator)[2023-11-28 07:46:29,562] INFO [TransactionCoordinator id=1] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)[2023-11-28 07:46:29,563] INFO [TransactionCoordinator id=1] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)[2023-11-28 07:46:29,563] INFO [BrokerMetadataPublisher id=1] Updating metadata.version to 14 at offset OffsetAndEpoch(offset=5, epoch=1). (kafka.server.metadata.BrokerMetadataPublisher)[2023-11-28 07:46:29,566] INFO [TxnMarkerSenderThread-1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)[2023-11-28 07:46:29,568] INFO [BrokerServer id=1] Finished waiting for the initial broker metadata update to be published (kafka.server.BrokerServer)[2023-11-28 07:46:29,570] INFO KafkaConfig values:
    advertised.listeners = PLAINTEXT://localhost:9092
    alter.config.policy.class.name = null
    alter.log.dirs.replication.quota.window.num =11
    alter.log.dirs.replication.quota.window.size.seconds =1
    authorizer.class.name =
    auto.create.topics.enable =true
    auto.include.jmx.reporter =true
    auto.leader.rebalance.enable =true
    background.threads =10
    broker.heartbeat.interval.ms =2000
    broker.id =1
    broker.id.generation.enable =true
    broker.rack = null
    broker.session.timeout.ms =9000
    client.quota.callback.class = null
    compression.type = producer
    connection.failed.authentication.delay.ms =100
    connections.max.idle.ms =600000
    connections.max.reauth.ms =0
    control.plane.listener.name = null
    controlled.shutdown.enable =true
    controlled.shutdown.max.retries =3
    controlled.shutdown.retry.backoff.ms =5000
    controller.listener.names = CONTROLLER
    controller.quorum.append.linger.ms =25
    controller.quorum.election.backoff.max.ms =1000
    controller.quorum.election.timeout.ms =1000
    controller.quorum.fetch.timeout.ms =2000
    controller.quorum.request.timeout.ms =2000
    controller.quorum.retry.backoff.ms =20
    controller.quorum.voters =[1@localhost:9093]
    controller.quota.window.num =11
    controller.quota.window.size.seconds =1
    controller.socket.timeout.ms =30000
    create.topic.policy.class.name = null
    default.replication.factor =1
    delegation.token.expiry.check.interval.ms =3600000
    delegation.token.expiry.time.ms =86400000
    delegation.token.master.key = null
    delegation.token.max.lifetime.ms =604800000
    delegation.token.secret.key = null
    delete.records.purgatory.purge.interval.requests =1
    delete.topic.enable =true
    early.start.listeners = null
    fetch.max.bytes =57671680
    fetch.purgatory.purge.interval.requests =1000
    group.consumer.assignors =[org.apache.kafka.coordinator.group.assignor.RangeAssignor]
    group.consumer.heartbeat.interval.ms =5000
    group.consumer.max.heartbeat.interval.ms =15000
    group.consumer.max.session.timeout.ms =60000
    group.consumer.max.size =2147483647
    group.consumer.min.heartbeat.interval.ms =5000
    group.consumer.min.session.timeout.ms =45000
    group.consumer.session.timeout.ms =45000
    group.coordinator.new.enable =false
    group.coordinator.threads =1
    group.initial.rebalance.delay.ms =3000
    group.max.session.timeout.ms =1800000
    group.max.size =2147483647
    group.min.session.timeout.ms =6000
    initial.broker.registration.timeout.ms =60000
    inter.broker.listener.name = PLAINTEXT
    inter.broker.protocol.version =3.6-IV2
    kafka.metrics.polling.interval.secs =10
    kafka.metrics.reporters =[]
    leader.imbalance.check.interval.seconds =300
    leader.imbalance.per.broker.percentage =10
    listener.security.protocol.map = CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    listeners = PLAINTEXT://:9092,CONTROLLER://:9093
    log.cleaner.backoff.ms =15000
    log.cleaner.dedupe.buffer.size =134217728
    log.cleaner.delete.retention.ms =86400000
    log.cleaner.enable =true
    log.cleaner.io.buffer.load.factor =0.9
    log.cleaner.io.buffer.size =524288
    log.cleaner.io.max.bytes.per.second =1.7976931348623157E308
    log.cleaner.max.compaction.lag.ms =9223372036854775807
    log.cleaner.min.cleanable.ratio =0.5
    log.cleaner.min.compaction.lag.ms =0
    log.cleaner.threads =1
    log.cleanup.policy =[delete]
    log.dir = /tmp/kafka-logs
    log.dirs = /tmp/kraft-combined-logs
    log.flush.interval.messages =9223372036854775807
    log.flush.interval.ms = null
    log.flush.offset.checkpoint.interval.ms =60000
    log.flush.scheduler.interval.ms =9223372036854775807
    log.flush.start.offset.checkpoint.interval.ms =60000
    log.index.interval.bytes =4096
    log.index.size.max.bytes =10485760
    log.local.retention.bytes =-2
    log.local.retention.ms =-2
    log.message.downconversion.enable =true
    log.message.format.version =3.0-IV1
    log.message.timestamp.after.max.ms =9223372036854775807
    log.message.timestamp.before.max.ms =9223372036854775807
    log.message.timestamp.difference.max.ms =9223372036854775807
    log.message.timestamp.type = CreateTime
    log.preallocate =false
    log.retention.bytes =-1
    log.retention.check.interval.ms =300000
    log.retention.hours =168
    log.retention.minutes = null
    log.retention.ms = null
    log.roll.hours =168
    log.roll.jitter.hours =0
    log.roll.jitter.ms = null
    log.roll.ms = null
    log.segment.bytes =1073741824
    log.segment.delete.delay.ms =60000
    max.connection.creation.rate =2147483647
    max.connections =2147483647
    max.connections.per.ip =2147483647
    max.connections.per.ip.overrides =
    max.incremental.fetch.session.cache.slots =1000
    message.max.bytes =1048588
    metadata.log.dir = null
    metadata.log.max.record.bytes.between.snapshots =20971520
    metadata.log.max.snapshot.interval.ms =3600000
    metadata.log.segment.bytes =1073741824
    metadata.log.segment.min.bytes =8388608
    metadata.log.segment.ms =604800000
    metadata.max.idle.interval.ms =500
    metadata.max.retention.bytes =104857600
    metadata.max.retention.ms =604800000
    metric.reporters =[]
    metrics.num.samples =2
    metrics.recording.level = INFO
    metrics.sample.window.ms =30000
    min.insync.replicas =1
    node.id =1
    num.io.threads =8
    num.network.threads =3
    num.partitions =1
    num.recovery.threads.per.data.dir =1
    num.replica.alter.log.dirs.threads = null
    num.replica.fetchers =1
    offset.metadata.max.bytes =4096
    offsets.commit.required.acks =-1
    offsets.commit.timeout.ms =5000
    offsets.load.buffer.size =5242880
    offsets.retention.check.interval.ms =600000
    offsets.retention.minutes =10080
    offsets.topic.compression.codec =0
    offsets.topic.num.partitions =50
    offsets.topic.replication.factor =1
    offsets.topic.segment.bytes =104857600
    password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
    password.encoder.iterations =4096
    password.encoder.key.length =128
    password.encoder.keyfactory.algorithm = null
    password.encoder.old.secret = null
    password.encoder.secret = null
    principal.builder.class = class org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
    process.roles =[broker, controller]
    producer.id.expiration.check.interval.ms =600000
    producer.id.expiration.ms =86400000
    producer.purgatory.purge.interval.requests =1000
    queued.max.request.bytes =-1
    queued.max.requests =500
    quota.window.num =11
    quota.window.size.seconds =1
    remote.log.index.file.cache.total.size.bytes =1073741824
    remote.log.manager.task.interval.ms =30000
    remote.log.manager.task.retry.backoff.max.ms =30000
    remote.log.manager.task.retry.backoff.ms =500
    remote.log.manager.task.retry.jitter =0.2
    remote.log.manager.thread.pool.size =10
    remote.log.metadata.custom.metadata.max.bytes =128
    remote.log.metadata.manager.class.name = org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
    remote.log.metadata.manager.class.path = null
    remote.log.metadata.manager.impl.prefix = rlmm.config.
    remote.log.metadata.manager.listener.name = null
    remote.log.reader.max.pending.tasks =100
    remote.log.reader.threads =10
    remote.log.storage.manager.class.name = null
    remote.log.storage.manager.class.path = null
    remote.log.storage.manager.impl.prefix = rsm.config.
    remote.log.storage.system.enable =false
    replica.fetch.backoff.ms =1000
    replica.fetch.max.bytes =1048576
    replica.fetch.min.bytes =1
    replica.fetch.response.max.bytes =10485760
    replica.fetch.wait.max.ms =500
    replica.high.watermark.checkpoint.interval.ms =5000
    replica.lag.time.max.ms =30000
    replica.selector.class = null
    replica.socket.receive.buffer.bytes =65536
    replica.socket.timeout.ms =30000
    replication.quota.window.num =11
    replication.quota.window.size.seconds =1
    request.timeout.ms =30000
    reserved.broker.max.id =1000
    sasl.client.callback.handler.class = null
    sasl.enabled.mechanisms =[GSSAPI]
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin =60000
    sasl.kerberos.principal.to.local.rules =[DEFAULT]
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter =0.05
    sasl.kerberos.ticket.renew.window.factor =0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds =300
    sasl.login.refresh.min.period.seconds =60
    sasl.login.refresh.window.factor =0.8
    sasl.login.refresh.window.jitter =0.05
    sasl.login.retry.backoff.max.ms =10000
    sasl.login.retry.backoff.ms =100
    sasl.mechanism.controller.protocol = GSSAPI
    sasl.mechanism.inter.broker.protocol = GSSAPI
    sasl.oauthbearer.clock.skew.seconds =30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms =3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms =10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms =100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    sasl.server.callback.handler.class = null
    sasl.server.max.receive.size =524288
    security.inter.broker.protocol = PLAINTEXT
    security.providers = null
    server.max.startup.time.ms =9223372036854775807
    socket.connection.setup.timeout.max.ms =30000
    socket.connection.setup.timeout.ms =10000
    socket.listen.backlog.size =50
    socket.receive.buffer.bytes =102400
    socket.request.max.bytes =104857600
    socket.send.buffer.bytes =102400
    ssl.cipher.suites =[]
    ssl.client.auth = none
    ssl.enabled.protocols =[TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.principal.mapping.rules = DEFAULT
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.abort.timed.out.transaction.cleanup.interval.ms =10000
    transaction.max.timeout.ms =900000
    transaction.partition.verification.enable =true
    transaction.remove.expired.transaction.cleanup.interval.ms =3600000
    transaction.state.log.load.buffer.size =5242880
    transaction.state.log.min.isr =1
    transaction.state.log.num.partitions =50
    transaction.state.log.replication.factor =1
    transaction.state.log.segment.bytes =104857600
    transactional.id.expiration.ms =604800000
    unclean.leader.election.enable =false
    unstable.api.versions.enable =false
    zookeeper.clientCnxnSocket = null
    zookeeper.connect = null
    zookeeper.connection.timeout.ms = null
    zookeeper.max.in.flight.requests =10
    zookeeper.metadata.migration.enable =false
    zookeeper.session.timeout.ms =18000
    zookeeper.set.acl =false
    zookeeper.ssl.cipher.suites = null
    zookeeper.ssl.client.enable =false
    zookeeper.ssl.crl.enable =false
    zookeeper.ssl.enabled.protocols = null
    zookeeper.ssl.endpoint.identification.algorithm = HTTPS
    zookeeper.ssl.keystore.location = null
    zookeeper.ssl.keystore.password = null
    zookeeper.ssl.keystore.type = null
    zookeeper.ssl.ocsp.enable =false
    zookeeper.ssl.protocol = TLSv1.2
    zookeeper.ssl.truststore.location = null
    zookeeper.ssl.truststore.password = null
    zookeeper.ssl.truststore.type = null
 (kafka.server.KafkaConfig)[2023-11-28 07:46:29,577] INFO [BrokerServer id=1] Waiting for the broker to be unfenced (kafka.server.BrokerServer)[2023-11-28 07:46:29,612] INFO [BrokerLifecycleManager id=1] The broker has been unfenced. Transitioning from RECOVERY to RUNNING. (kafka.server.BrokerLifecycleManager)[2023-11-28 07:46:29,661] INFO [BrokerServer id=1] Finished waiting for the broker to be unfenced (kafka.server.BrokerServer)[2023-11-28 07:46:29,662] INFO authorizerStart completed for endpoint PLAINTEXT. Endpoint is now READY. (org.apache.kafka.server.network.EndpointReadyFutures)[2023-11-28 07:46:29,663] INFO [SocketServer listenerType=BROKER, nodeId=1] Enabling request processing. (kafka.network.SocketServer)[2023-11-28 07:46:29,663] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)[2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)[2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)[2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)[2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)[2023-11-28 07:46:29,664] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)[2023-11-28 07:46:29,665] INFO Kafka version: 3.6.0 (org.apache.kafka.common.utils.AppInfoParser)[2023-11-28 07:46:29,665] INFO Kafka commitId: 60e845626d8a465a (org.apache.kafka.common.utils.AppInfoParser)[2023-11-28 07:46:29,665] INFO Kafka startTimeMs: 1701157589664(org.apache.kafka.common.utils.AppInfoParser)[2023-11-28 07:46:29,666] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)[2023-11-28 07:53:16,542] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new controller, from now on will use node localhost:9093 (id: 1 rack: null)(kafka.server.BrokerToControllerRequestThread)[2023-11-28 07:53:16,543] INFO [BrokerLifecycleManager id=1] Unable to send a heartbeat because the RPC got timed out before it could be sent. (kafka.server.BrokerLifecycleManager)
6. 测试 Kafka

创建一个主题(Topic)并发送/接收一些消息来测试 Kafka。例如,创建名为

test-topic

的主题:

bin/kafka-topics.sh --create--topic test-topic --bootstrap-server localhost:9092 --replication-factor 1--partitions1

生产者发送消息到该主题:

bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092

在另一个终端窗口中启动消费者以接收消息:

bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning

在这里插入图片描述

这些步骤将帮助你在 Ubuntu 上安装并启动 Kafka,并进行简单的测试以确保 Kafka 正常运行。

标签: ubuntu kafka 笔记

本文转载自: https://blog.csdn.net/Backli/article/details/134668245
版权归原作者 天河书阁 VicRestart 所有, 如有侵权,请联系我们删除。

“Ubuntu 环境安装 Kafka、配置运行测试 Kafka 流程笔记”的评论:

还没有评论