0


Kafka 命令行操作

版本:3.6.1

1 kafka-topics.sh

Create, delete, describe, or change a topic.

创建、删除、描述或更改主题。
Option(选项)Description(描述)翻译–alterAlter the number of partitions and replica assignment. Update the configuration of an existing topic via --alter is no longer supported here (the kafka-configs CLI supports altering topic configs with a --bootstrap-server option).更改分区数和副本分配。此处不再支持通过–alter更新现有主题的配置(kafka configs CLI支持使用–bootstrap server选项更改主题配置)。–at-min-isr-partitionsIf set when describing topics, only show partitions whose isr count is equal to the configured minimum.如果在描述主题时设置,则仅显示isr计数等于配置的最小值的分区。–bootstrap-server <String: server to connect to>REQUIRED: The Kafka server to connect to.必需:要连接的Kafka服务器。–command-config <String: command config property file>Property file containing configs to be passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs.包含要传递给管理客户端的配置的属性文件。这仅与–bootstrap服务器选项一起使用,用于描述和更改代理配置。–config <String: name=value>A topic configuration override for the topic being created or altered.正在创建或更改的主题的主题配置覆盖。–createCreate a new topic.创建一个新的主题。–deleteDelete a topic.删除一个主题。–delete-config <String: name>A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). Not supported with the --bootstrap-server option.为现有主题删除主题配置。(请参阅–config选项下的配置列表)。不支持–bootstrap-server选项。–describeList details for the given topics.列出给定主题的详细信息。–exclude-internalExclude internal topics when running list or describe command. The internal topics will be listed by default.运行list或describe命令时排除内部主题。默认情况下会列出内部主题。–helpPrint usage information.打印使用信息。–if-existsif set when altering or deleting or describing topics, the action will only execute if the topic exists.如果在更改、删除或描述主题时设置,则仅当主题存在时才会执行该操作。–if-not-existsIf set when creating topics, the action will only execute if the topic does not already exist.如果在创建主题时设置,则仅当主题不存在时才会执行该操作。–listList all available topics.列出所有可用的主题。–partitions <Integer: # of partitions>The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected). If not supplied for create, defaults to the cluster default.正在创建或更改的主题的分区数(警告:如果为具有键的主题增加分区,则分区逻辑或消息顺序将受到影响)。如果未提供用于创建,则默认为集群默认值。–replica-assignment <String: broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , …>A list of manual partition-to-broker assignments for the topic being created or altered.为正在创建或更改的主题分配给代理程序的手动分区列表。–replication-factor <Integer: replication factor>The replication factor for each partition in the topic being created. If not supplied, defaults to the cluster default.正在创建的主题中每个分区的副本。如果未提供,则默认为集群默认值。–topic <String: topic>The topic to create, alter, describe or delete. It also accepts a regular expression, except for --create option. Put topic name in double quotes and use the ‘’ prefix to escape regular expression symbols; e.g. “test.topic”.要创建、更改、描述或删除的主题。它还接受一个正则表达式,但–create选项除外。将主题名称置于双引号中,并使用“\”前缀转义正则表达式符号;例如“test.topic”。–topic-id <String: topic-id>The topic-id to describe.This is used only with --bootstrap-server option for describing topics.要描述的主题id。这仅与用于描述主题的–bootstrap-server选项一起使用。–topics-with-overridesIf set when describing topics, only show topics that have overridden configs.如果在描述主题时设置,则仅显示已重写配置的主题。–unavailable-partitionsIf set when describing topics, only show partitions whose leader is not available.如果在描述主题时设置,则仅显示其leader不可用的分区。–under-min-isr-partitionsIf set when describing topics, only show partitions whose isr count is less than the configured minimum.如果在描述主题时设置,则仅显示isr计数小于配置的最小值的分区。–under-replicated-partitionsIf set when describing topics, only show under replicated partitions.如果在描述主题时设置,则仅显示已复制分区–versionDisplay Kafka version.显示Kafka版本。

1.1 查看所有topic

kafka-topics.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --list

1.2 创建topic

kafka-topics.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --create--partitions1 --replication-factor 3--topic first

–topic 定义topic名

–replication-factor 定义副本数

–partitions 定义分区数

1.3 描述topic详情

kafka-topics.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --describe--topic first

1.4 修改topic分区数

kafka-topics.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --alter--topic first --partitions3

1.5 删除topic

kafka-topics.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --delete--topic first

2 kafka-console-producer.sh

This tool helps to read data from standard input and publish it to Kafka.

该工具从标准输入中读取数据并将其发布到Kafka。
Option(选项)Description(描述)翻译–batch-size <Integer: size>Number of messages to send in a single batch if they are not being sent synchronously. please note that this option will be replaced if max-partition-memory-bytes is also set (default: 16384)如果不是同步发送,则在单个批处理中发送的消息数。请注意,如果还设置了最大分区内存字节数(max-partition-memory-bytes)(默认值:16384),则此选项将被替换–bootstrap-server <String: server to connect to>REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.必需,除非指定了–broker-list(已弃用)。要连接到的服务器。形式为HOST1:PORT1,HOST2:PORT2的broker列表字符串。–broker-list <String: broker-list>DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The broker list string in the form HOST1:PORT1,HOST2:PORT2.已弃用,请改用–bootstrap-server;如果指定了–bootstrap-server,则忽略。形式为HOST1:PORT1,HOST2:PORT2的broker列表字符串。–compression-codec [String:compression-codec]The compression codec: either ‘none’, ‘gzip’, ‘snappy’, ‘lz4’, or ‘zstd’.If specified without value, then it defaults to ‘gzip’压缩编解码器:“none”、“gzip”、“snappy”、“lz4”或“zstd”。如果未指定值,则默认为“gzip”–helpPrint usage information.打印使用信息。–line-reader <String: reader_class>The class name of the class to use for reading lines from standard in. By default each line is read as a separate message. (default: kafka.tools.ConsoleProducer$LineMessageReader)用于从中的标准读取行的类的类名。默认情况下,每一行都作为一条单独的消息读取。(默认值:kafka.tools.ConsoleProducer$LineMessageReader)–max-block-ms <Long: max block on send>The max time that the producer will block for during a send request.(default: 60000)生产者在发送请求期间阻止的最长时间。(默认值:60000)–max-memory-bytes <Long: total memory in bytes>The total memory used by the producer to buffer records waiting to be sent to the server. This is the option to control

buffer.memory

in producer configs. (default: 33554432)生产者用于缓冲等待发送到服务器的记录的总内存。这是在生产者配置中控制“buffer.memory”的选项。(默认值:33554432)–max-partition-memory-bytes <Integer: memory in bytes per partition>The buffer size allocated for a partition. When records are received which are smaller than this size the producer will attempt to optimistically group them together until this size is reached. This is the option to control

batch.size

in producer configs. (default: 16384)为一个分区分配的缓冲区大小。当接收到小于此大小的记录时,生产者将尝试乐观地将它们组合在一起,直到达到此大小。这是在生产者配置中控制

batch.size

的选项。(默认值:16384)–message-send-max-retries Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. This is the option to control

retries

in producer configs. (default: 3)Brokers可能由于多种原因无法接收消息,而暂时不可用只是其中之一。此属性指定生产者放弃并丢弃此消息之前的重试次数。这是在生产者配置中控制·retries·的选项。(默认值:3)–metadata-expiry-ms <Long: metadata expiration interval>The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any leadership changes. This is the option to control

metadata.max.age.ms

in producer configs. (default:300000)以毫秒为单位的一段时间,在此之后,即使我们没有看到任何领导层变动,我们也会强制刷新元数据。这是在生产者配置中控制

metadata.max.age.ms

的选项。(默认值:300000)–producer-property <String: producer_prop>A mechanism to pass user-defined properties in the form key=value to the producer.将key=value形式的用户定义属性传递给生产者的机制。–producer.config <String: config file>Producer config properties file. Note that [producer-property] takes precedence over this config.生产者配置属性文件。请注意,[producer-property]优先于此配置。–property <String: prop>A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader.将key=value形式的用户定义属性传递给消息读取器的机制。这允许对用户定义的消息读取器进行自定义配置。–reader-config <String: config file>Config properties file for the message reader. Note that [property] takes precedence over this config.配置消息读取器的属性文件。请注意,[property]优先于此配置。–request-required-acks <String: request required acks>The required

acks

of the producer requests (default: -1)生产者请求所需的“acks”(默认值:-1)–request-timeout-ms <Integer: request timeout ms>The ack timeout of the producer requests. Value must be non-negative and non-zero. (default: 1500)生产者请求的ack超时。值必须为非负且非零。(默认值:1500)–retry-backoff-ms Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. This is the option to control

retry.backoff.ms

in producer configs. (default: 100)每次重试之前,生产者都会刷新相关主题的元数据。由于leader选举需要一些时间,因此此属性指定生产者在刷新元数据之前等待的时间。这是在生产者配置中控制“retry.backoff.ms”的选项。(默认值:100)–socket-buffer-size <Integer: size>The size of the tcp RECV size. This is the option to control

send.buffer.bytes

in producer configs. (default: 102400)tcp RECV size的大小。这是在生产者配置中控制“send.buffer.bytes”的选项。(默认值:102400)–syncIf set message send requests to the brokers are synchronously, one at a time as they arrive.如果设置消息发送请求到brokers是同步的,则在它们到达时一次一个。–timeout <Long: timeout_ms>If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms. This is the option to control

linger.ms

in producer configs. (default: 1000)如果设置了,并且生产者以异步模式运行,这将为消息排队等待足够的批处理大小提供最长时间。该值以毫秒为单位。这是在生产者配置中控制

linger.ms

的选项`。(默认值:1000)–topic <String: topic>REQUIRED: The topic id to produce messages to.REQUIRED:要向其生成消息的主题id。–versionDisplay Kafka version.显示Kafka版本。

2.1 发送消息

kafka-console-producer.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --topic first

3 kafka-console-consumer.sh

This tool helps to read data from Kafka topics and outputs it to standard output.

该工具读取Kafka主题中的数据,并将其输出到标准输出。
Option(选项)Description(描述)翻译–bootstrap-server <String: server to connect to>REQUIRED: The server(s) to connect to.必需:要连接到的服务器。–consumer-property <String: consumer_prop>A mechanism to pass user-defined properties in the form key=value to the consumer.将key=value形式的用户定义属性传递给消费者的机制。–consumer.config <String: config file>Consumer config properties file. Note that [consumer-property] takes precedence over this config.消费者配置属性文件。请注意,[consumer-property]优先于此配置。–enable-systest-eventsLog lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)除了记录所消费的消息之外,还记录消费者的生命周期事件。(这是针对系统测试的。)–formatter <String: class>The name of a class to use for formatting kafka messages for display. (default: kafka.tools.DefaultMessageFormatter)用于格式化要显示的kafka消息的类的名称。(默认值:kafka.tools.DefaultMessageFormatter)–formatter-config <String: config file>Config properties file to initialize the message formatter. Note that [property] takes precedence over this config.配置属性文件以初始化消息格式化程序。请注意,[property]优先于此配置。–from-beginningIf the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.如果消费者还没有建立的偏移量,从日志中出现的最早消息开始,而不是从最新消息开始。–group <String: consumer group id>The consumer group id of the consumer.消费者的消费者组id。–helpPrint usage information.打印使用信息。–include <String: Java regex (String)>Regular expression specifying list of topics to include for consumption.指定要包含以供使用的主题列表的正则表达式。–isolation-level Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted to read all messages. (default: read_uncommitted)设置为read_committed,以便筛选出未提交的事务性消息。设置为read_uncommitted可读取所有消息。(默认值:read_uncommitted)–key-deserializer <String: deserializer for key>–max-messages <Integer: num_messages>The maximum number of messages to consume before exiting. If not set, consumption is continual.退出前要消费的最大消息数。如果未设置,则消费是连续的。–offset <String: consume offset>The offset to consume from (a non-negative number), or ‘earliest’ which means from beginning, or ‘latest’ which means from end (default: latest)从(非负数)开始消费的偏移量,或“earliest”表示从开始,或“latest”表示从结束(默认值:最新)–partition <Integer: partition>The partition to consume from. Consumption starts from the end of the partition unless ‘–offset’ is specified.要从中消费的分区。除非指定了“–offset”,否则消费从分区的末尾开始。–property <String: prop>The properties to initialize the message formatter.用于初始化消息格式化程序的配置文件。–skip-message-on-errorIf there is an error when processing a message, skip it instead of halt.如果在处理消息时出现错误,跳过它而不是停止。–timeout-ms <Integer: timeout_ms>If specified, exit if no message is available for consumption for the specified interval.如果指定了,如果在指定的时间间隔内没有消息可供消费,则退出。–topic <String: topic>The topic to consume on.要消费的主题。–value-deserializer <String: deserializer for values>–versionDisplay Kafka version.显示Kafka版本。–whitelist <String: Java regex (String)>DEPRECATED, use --include instead; ignored if --include specified. Regular expression specifying list of topics to include for consumption.弃用,请改用–include;如果指定了–include,则忽略。指定要包含以供使用的主题列表的正则表达式。

3.1 消费主题中的数据

kafka-console-consumer.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --topic first

3.2 从头开始消费数据

kafka-console-consumer.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --from-beginning --topic first

3.3 指定消费者组消费

kafka-console-consumer.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --from-beginning --topic first --group first_group

3.4 打印时间戳

kafka-console-consumer.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --from-beginning --topic first --propertyprint.timestamp=true

4 kafka-consumer-groups.sh

Option(选项)Description(描述)翻译–all-groupsApply to all consumer groups.应用于所有消费者组。–all-topicsConsider all topics assigned to a group in the

reset-offsets

process.在

reset-offsets

过程中,应用于一个组的所有主题。–bootstrap-server <String: server to connect to>REQUIRED: The server(s) to connect to.必需:要连接到的服务器。–by-duration <String: duration>Reset offsets to offset by duration from current timestamp. Format: ‘PnDTnHnMnS’将偏移量重置为当前时间戳的持续时间偏移量。格式:“PnDTnHnMnS”–command-config <String: command config property file>Property file containing configs to be passed to Admin Client and Consumer.包含要传递给管理客户端和消费者的配置的属性文件。–deletePass in groups to delete topic partition offsets and ownership information over the entire consumer group. For instance --group g1 --group g2传入消费者组以删除整个消费者组的主题分区偏移和所有权信息。例如 --group g1 --group g2–delete-offsetsDelete offsets of consumer group. Supports one consumer group at the time, and multiple topics.删除消费者组的偏移量。一次支持一个消费者组和多个主题。–describeDescribe consumer group and list offset lag (number of messages not yet processed) related to given group.描述消费者组并列出与给定组相关的偏移滞后(尚未处理的消息数)。–dry-runOnly show results without executing changes on Consumer Groups. Supported operations: reset-offsets.仅显示结果,而不对消费者组执行更改。支持的操作:reset-offsets。–executeExecute operation. Supported operations: reset-offsets.执行操作。支持的操作:reset-offsets。–exportExport operation execution to a CSV file. Supported operations: reset-offsets.将操作执行导出到CSV文件。支持的操作:reset-offsets。–from-file <String: path to CSV file>Reset offsets to values defined in CSV file.将偏移重置为CSV文件中定义的值。–group <String: consumer group>The consumer group we wish to act on.我们希望采取行动的消费者组。–helpPrint usage information.打印使用信息。–listList all consumer groups.列出所有消费者组。–membersDescribe members of the group. This option may be used with ‘–describe’ and ‘–bootstrap-server’ options only.
Example: --bootstrap-server localhost: 9092 --describe --group group1 --members描述小组成员。此选项只能与“–description”和“–bootstrap server”选项一起使用。
示例:–bootstrap-server localhost:9092 --describe --group group1–members–offsetsDescribe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with ‘–describe’ and ‘–bootstrap-server’ options only.
Example: --bootstrap-server localhost: 9092 --describe --group group1 --offsets描述组并列出组中的所有主题分区及其偏移滞后。这是的默认子操作,只能与“–description”和“–bootstrap server”选项一起使用。
示例:–bootstrap-server localhost: 9092 --describe --group group1 --offsets–reset-offsetsReset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive.
Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. Additionally, the --export option is used to export the results to a CSV format.
You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, --to-latest, --shift-by, --from-file, --to-current, --to-offset.
To define the scope use --all-topics or --topic. One scope must be specified unless you use ‘–from-file’.重置消费者组的偏移量。一次支持一个消费者组,并且实例应处于非活动状态。
有两个执行选项:–dry-run(默认)计划要重置的偏移量,–execute更新偏移量。此外,–export选项用于将结果导出为CSV格式。
您必须选择以下重置规范之一:–to-datetime, --by-duration, --to-earliest, --to-latest, --shift-by, --from-file, --to-current, --to-offset。
要定义范围,请使用—all-topics或–topic。除非使用“–from-file”,否则必须指定一个作用域。–shift-by <Long: number-of-offsets>Reset offsets shifting current offset by ‘n’, where ‘n’ can be positive or negative.重置偏移量将当前偏移量偏移“n”,其中“n”可以是正或负。–state [String]When specified with ‘–describe’, includes the state of the group.
Example: --bootstrap-server localhost:9092 --describe --group group1 --state
When specified with ‘–list’, it displays the state of all groups. It can also be used to list groups with specific states.
Example: --bootstrap-server localhost:9092 --list --state stable,empty
This option may be used with ‘–describe’, ‘–list’ and ‘–bootstrap-server’ options only.当使用“–describe”指定时,包括组的状态。
示例:–bootstrap-server localhost:9092 --describe --group group1 --state
当使用“–list”指定时,它将显示所有组的状态。它还可以用于列出具有特定状态的组。
示例: --bootstrap-server localhost:9092 --list --state stable,empty
此选项只能与“–description”、“–list”和“–bootstrap server”选项一起使用。–timeout <Long: timeout (ms)>The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000)可以为某些用例设置的超时。例如,在描述组时,可以使用它来指定在组稳定之前(当组刚刚创建或正在进行一些更改时)等待的最大时间(以毫秒为单位)。(默认值:5000)–to-currentReset offsets to current offset.将偏移重置为当前偏移。–to-datetime <String: datetime>Reset offsets to offset from datetime. Format: ‘YYYY-MM-DDTHH:mm:SS.sss’将偏移量重置为来自日期时间的偏移量。格式:‘YYYY-MM-DDTHH:mm:SS.sss’–to-earliestReset offsets to earliest offset.将偏移重置为最早偏移。–to-latestReset offsets to latest offset.将偏移重置为最新偏移。–to-offset <Long: offset>Reset offsets to a specific offset.将偏移重置为特定偏移。–topic <String: topic>The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. In

reset-offsets

case, partitions can be specified using this format:

topic1:0,1,2

, where 0,1,2 are the partition to be included in the process. Reset-offsets also supports multiple topic inputs.应删除其消费者组信息的主题或应将其包含在重置偏移过程中的主题。在

reset-offsets

的情况下,可以使用以下格式指定分区:topic1:0,1,2`,其中0,1,2是要包含在进程中的分区。重置偏移量还支持多个主题输入。–verboseProvide additional information, if any, when describing the group. This option may be used with ‘–offsets’/‘–members’/‘–state’ and ‘–bootstrap-server’ options only.
Example: --bootstrap-server localhost:9092 --describe --group group1 --members --verbose在描述消费者组时,如有其他信息,则提供。此选项只能与“–offsets”/“–members”/“–state”和“–bootstrap-server”选项一起使用。
示例:–bootstrap-server localhost:9092 --describe --group group1 --members --verbose–versionDisplay Kafka version.显示Kafka版本。

4.1 查看所有消费者组

kafka-consumer-groups.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --list

4.2 描述指定消费者组

kafka-consumer-groups.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --describe--group first_group

4.3 重置消费偏移

kafka-consumer-groups.sh --bootstrap-server centos701:9092,centos702:9092,centos704:9092 --group fist_group --topic first --reset-offsets --to-datetime 2023-10-08T00:00:00.000 -execute

5 kafka-reassign-partitions.sh

This tool helps to move topic partitions between replicas.

此工具帮助在副本之间移动主题分区。
Option(选项)Description(描述)翻译–additionalExecute this reassignment in addition to any other ongoing ones. This option can also be used to change the throttle of an ongoing reassignment.除了执行任何其他正在进行的重新分配外,还要执行此重新分配。此选项也可用于更改正在进行的重新分配的限制。–bootstrap-server <String: Server(s) to use for bootstrapping>REQUIRED: the server(s) to use for bootstrapping.必需:用于引导的服务器。–broker-list <String: brokerlist>The list of brokers to which the partitions need to be reassigned in the form “0,1,2”. This is required if --topics-to-move-json-file is used to generate reassignment configuration需要将分区重新分配到的broker列表,格式为“0,1,2”。如果使用–topics-to-move-json-file以生成重新分配配置,则这是必需的–cancelCancel an active reassignment.取消活动的重新分配。–command-config <String: Admin client property file>Property file containing configs to be passed to Admin Client.包含要传递给管理客户端的配置的属性文件。–disable-rack-awareDisable rack aware replica assignment禁用机架感知复制副本分配–executeKick off the reassignment as specified by the --reassignment-json-file option.按照–reassignment-json-file选项的指定启动重新分配。–generateGenerate a candidate partition reassignment configuration. Note that this only generates a candidate assignment, it does not execute it.生成候选分区重新分配配置。请注意,这只会生成一个候选赋值,而不会执行它。–helpPrint usage information.打印使用信息。–listList all active partition reassignments.列出所有活动的分区重新分配。–preserve-throttlesDo not modify broker or topic throttles.不要修改broker或主题限制。–reassignment-json-file <String: manual assignment json file path>The JSON file with the partition reassignment configurationThe format to use is -
{“partitions”:
[{“topic”: “foo”,
“partition”: 1,
“replicas”: [1,2,3],
“log_dirs”: [“dir1”,“dir2”,“dir3”]
}],
“version”:1
}
Note that “log_dirs” is optional. When it is specified, its length must equal the length of the replicas list. The value in this list can be either “any” or the absolution path of the log directory on the broker. If absolute log directory path is specified, the replica will be moved to the specified log directory on the broker.带有分区重新分配配置的JSON文件要使用的格式是-
{“partitions”:
[{“topic”: “foo”,
“partition”: 1,
“replicas”: [1,2,3],
“log_dirs”: [“dir1”,“dir2”,“dir3”]
}],
“version”:1
}
注意,“log-dirs”是可选的。指定时,其长度必须等于"replicas"列表的长度。此列表中的值可以是的“any”或broker日志目录绝对路径。如果指定了日志目录绝对路径,则副本将移动到代理上指定的日志目录。–replica-alter-log-dirs-throttle <Long: replicaAlterLogDirsThrottle>The movement of replicas between log directories on the same broker will be throttled to this value (bytes/sec). This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment along with the --additional flag. The throttle rate should be at least 1 KB/s. (default: -1)同一broker上的日志目录之间的副本移动将被限制为此值(字节/秒)。当重新分配开始时,–execute可以包含此选项,并且可以通过重新提交当前的重新分配以及–additional标志来更改此选项。节流速率应至少为1 KB/s。(默认值:-1)–throttle <Long: throttle>The movement of partitions between brokers will be throttled to this value (bytes/sec). This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment along with the --additional flag. The throttle rate should be at least 1 KB/s. (default: -1)broker之间的分区移动将被限制为此值(字节/秒)。当重新分配开始时,–execute可以包含此选项,并且可以通过重新提交当前的重新分配–additional标志来更改此选项。节流速率应至少为1 KB/s。(默认值:-1)–timeout <Long: timeout>The maximum time in ms to wait for log directory replica assignment to begin. (default: 10000)等待日志目录副本分配开始的最长时间(毫秒)。(默认值:10000)–topics-to-move-json-file <String: topics to reassign json file path>Generate a reassignment configuration to move the partitions of the specified topics to the list of brokers specified by the --broker-list option. The format to use is -
{“topics”:
[{“topic”: “foo”},{“topic”: “foo1”}],
“version”:1
}生成一个重新分配配置,将指定主题的分区移动到–broker-list选项指定的代理列表中。要使用的格式是-
{“topics”:
[{“topic”: “foo”},{“topic”: “foo1”}],
“version”:1
}–verifyVerify if the reassignment completed as specified by the --reassignment-json-file option. If there is a throttle engaged for the replicas specified, and the rebalance has completed, the throttle will be removed验证重新分配是否按照–reassignment-json-file选项的指定完成。如果为指定的复制副本启用了节流阀,并且重新平衡已完成,则节流阀将被移除–versionDisplay Kafka version.显示Kafka版本。

5.1 手动调整分区副本

5.1.1 查看topic分区副本存储情况
[root@centos701 ~]# /opt/module/kafka_2.12-3.6.1/bin/kafka-topics.sh --bootstrap-server centos701:9092 --describe --topic first
Topic: first    TopicId: bhIgBX4wQTqn__OP82gpXQ PartitionCount: 3       ReplicationFactor: 3    Configs: 
        Topic: first    Partition: 0    Leader: 702     Replicas: 704,701,702   Isr: 702,701,704
        Topic: first    Partition: 1    Leader: 701     Replicas: 701,702,704   Isr: 702,701,704
        Topic: first    Partition: 2    Leader: 702     Replicas: 702,704,701   Isr: 702,701,704
5.1.2 创建副本存储计划
[root@centos701 ~]# vim /opt/module/kafka_2.12-3.6.1/json/change-replication-factor-first.json
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[701,702,704],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[702,704,701],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[704,701,702],"log_dirs":["any","any","any"]}]}
5.1.3 执行副本存储计划
[root@centos701 ~]# /opt/module/kafka_2.12-3.6.1/bin/kafka-reassign-partitions.sh --bootstrap-server centos701:9092 --reassignment-json-file /opt/module/kafka_2.12-3.6.1/json/change-replication-factor-first.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[704,701,702],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[701,702,704],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[702,704,701],"log_dirs":["any","any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-0,first-1,first-2
5.1.4 验证副本存储计划
[root@centos701 ~]# /opt/module/kafka_2.12-3.6.1/bin/kafka-reassign-partitions.sh --bootstrap-server centos701:9092 --reassignment-json-file /opt/module/kafka_2.12-3.6.1/json/change-replication-factor-first.json --verify
Status of partition reassignment:
Reassignment of partition first-0 is completed.
Reassignment of partition first-1 is completed.
Reassignment of partition first-2 is completed.

Clearing broker-level throttles on brokers 704,701,702
Clearing topic-level throttles on topic first
5.1.5 查看分区副本存储情况
[root@centos701 ~]# /opt/module/kafka_2.12-3.6.1/bin/kafka-topics.sh --bootstrap-server centos701:9092 --describe --topic first
Topic: first    TopicId: bhIgBX4wQTqn__OP82gpXQ PartitionCount: 3       ReplicationFactor: 3    Configs: 
        Topic: first    Partition: 0    Leader: 702     Replicas: 701,702,704   Isr: 702,701,704
        Topic: first    Partition: 1    Leader: 701     Replicas: 702,704,701   Isr: 702,701,704
        Topic: first    Partition: 2    Leader: 702     Replicas: 704,701,702   Isr: 702,701,704

本文转载自: https://blog.csdn.net/weixin_43724577/article/details/134895328
版权归原作者 Mranth 所有, 如有侵权,请联系我们删除。

“Kafka 命令行操作”的评论:

还没有评论