0


wsl kafka的简单应用

安装并配置单机版kafka所需环境

wsl2 环境可用性较高,如下介绍在该环境中安装单机版本kafka的详细过程。

  1. 启动命令行工具
  2. 启动wsl:wsl --user root --cd ~,(以root用户启动,进入wsl后当前路径为~“用户主目录”)
  3. 安装java:进入:https://learn.microsoft.com/zh-cn/java/openjdk/download,选择相应的java版本,下载接口 - 创建java的安装路径:mkdir -p /opt/sdk/java- 将刚刚下载的javasdk压缩包移动进创建的路径:mv /mnt/c/Users/你的用户名/Downloads/microsoft-jdk-21.0.1-linux-aarch64.tar.gz /opt/sdk/java/- 切换到你java安装路径:cd /opt/sdk/java/- 解压安装:tar -zxvf microsoft-jdk-21.0.1-linux-aarch64.tar.gz- 可以选择性删除你刚刚的压缩包:rm -rf microsoft-jdk-21.0.1-linux-aarch64.tar.gz- 配置环境变量 在这里插入图片描述进入jdk的解压目录:cd cd jdk-21.0.1+12 查看一下当前的绝对路径:pwd 复制一下绝对路径的字符串- 使用vim 打开 /etc/profile- GG到文件尾部,在文件尾部追加如下内容:exportJAVA_HOME=/opt/sdk/java/jdk-21.0.1+12exportPATH=$PATH:$JAVA_HOME/bin 注意JAVA_HOME后边的jdk_xxx是你自己下载的相应版本信息- 让profile文件生效:source /etc/profile- 查看java是否安装成功:java -version,显示版本信息则安装成功 root@Ophelia:/opt/sdk/java/jdk-21.0.1+12# java -version openjdk version "21.0.1"2023-10-17 LTS OpenJDK Runtime Environment Microsoft-8526870 (build 21.0.1+12-LTS) OpenJDK 64-Bit Server VM Microsoft-8526870 (build 21.0.1+12-LTS, mixed mode)
  4. 安装kafka - 在kafka官网下载相应版本kafka,我这里选最新版:Scala 2.13 - kafka_2.13-3.6.0.tgz (asc, sha512)- 创建kafka的安装路径:mkdir -p /opt/software/kafka- 将刚刚下载的kafka压缩包移动进创建的路径:mv /mnt/c/Users/你的用户名/Downloads/kafka_2.13-3.6.1.tgz /opt/software/kafka/- cd /opt/software/kafka/- tar -zxvf kafka_2.13-3.6.1.tgz- cd kafka_2.13-3.6.1/- pwd,然后复制绝对路径- vim /etc/profile 追加如下内容# KafkaexportKAFKA_HOME=/opt/software/kafka/kafka_2.13-3.6.1 exportPATH=$PATH:$KAFKA_HOME/bin- source /etc/profile- 配置文件修改: vim /opt/software/kafka/kafka_2.13-3.6.1/config/server.properties- 找到log.dirs=/tmp/kafka-logs将其修改为log.dirs=/opt/software/kafka/kafka-logs- 如下所示:# log.dirs=/tmp/kafka-logs log.dirs=/opt/software/kafka/kafka-logs- mkdir -p /opt/software/kafka/kafka-logs- 上述操作目的是防止topic中的数据丢失

启动kafka

启动kafka之前先启动zookeeper, Kafka 2.8.0 版本开始,内置了zookeeper。

  1. 启动zookeeperzookeeper-server-start.sh -daemon /opt/software/kafka/kafka_2.13-3.6.1/config/zookeeper.properties 命令看起来有点长,实际上一点儿也不短。 它分为3段[zookeeper-server-start.sh][-daemon][配置文件路径] - -daemon参数是以守护进程的方式启动。- zookeeper的启动脚本也存储在kafka的环境变量指向的路径中。
  2. 启动kafkakafka-server-start.sh -daemon /opt/software/kafka/kafka_2.13-3.6.1/config/server.properties

关闭kafka

先停止kafka,后停止zookeeper

root@Ophelia:~# kafka-server-stop.sh
root@Ophelia:~# zookeeper-server-stop.sh

当停用kafka集群的时候也是先停止kafka,后停止zookeeper,顺序搞反了会导致kafka无法停止,只能使用kill去杀死。

kafka-topics.sh 主题相关操作

查看一下kafka-tpics.sh命令都能干什么

root@Ophelia:~# kafka-topics.sh
Create, delete, describe, or change a topic.
Option                                   Description
------                                   -----------
--alter                                  Alter the number of partitions and
                                           replica assignment. Update the
                                           configuration of an existing topic
                                           via --alter is no longer supported
                                           here (the kafka-configs CLI supports
                                           altering topic configs with a --
                                           bootstrap-server option).
--at-min-isr-partitions                  ifset when describing topics, only
                                           show partitions whose isr count is
                                           equal to the configured minimum.
--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect
  connect to>                              to.
--command-config <String: command        Property file containing configs to be
  config property file>                    passed to Admin Client. This is used
                                           only with --bootstrap-server option
                                           for describing and altering broker
                                           configs.
--config <String: name=value>            A topic configuration override for the
                                           topic being created or altered. The
                                           following is a list of valid
                                           configurations:
                                                cleanup.policy
                                                compression.type
                                                delete.retention.ms
                                                file.delete.delay.ms
                                                flush.messages
                                                flush.ms
                                                follower.replication.throttled.
                                           replicas
                                                index.interval.bytes
                                                leader.replication.throttled.replicas
                                                local.retention.bytes
                                                local.retention.ms
                                                max.compaction.lag.ms
                                                max.message.bytes
                                                message.downconversion.enable
                                                message.format.version
                                                message.timestamp.after.max.ms
                                                message.timestamp.before.max.ms
                                                message.timestamp.difference.max.ms
                                                message.timestamp.type
                                                min.cleanable.dirty.ratio
                                                min.compaction.lag.ms
                                                min.insync.replicas
                                                preallocate
                                                remote.storage.enable
                                                retention.bytes
                                                retention.ms
                                                segment.bytes
                                                segment.index.bytes
                                                segment.jitter.ms
                                                segment.ms
                                                unclean.leader.election.enable
                                         See the Kafka documentation for full
                                           details on the topic configs. It is
                                           supported only in combination with --
                                           create if --bootstrap-server option
                                           is used (the kafka-configs CLI
                                           supports altering topic configs with
                                           a --bootstrap-server option).
--create                                 Create a new topic.
--delete                                 Delete a topic
--delete-config <String: name>           A topic configuration override to be
                                           removed for an existing topic (see
                                           the list of configurations under the
                                           --config option). Not supported with
                                           the --bootstrap-server option.
--describe                               List details for the given topics.
--exclude-internal                       exclude internal topics when running
                                           list or describe command. The
                                           internal topics will be listed by
                                           default
--help                                   Print usage information.
--if-exists                              ifset when altering or deleting or
                                           describing topics, the action will
                                           only execute if the topic exists.
--if-not-exists                          ifset when creating topics, the
                                           action will only execute if the
                                           topic does not already exist.
--list                                   List all available topics.
--partitions <Integer: # of partitions>  The number of partitions for the topic
                                           being created or altered (WARNING:
                                           If partitions are increased for a
                                           topic that has a key, the partition
                                           logic or ordering of the messages
                                           will be affected). If not supplied
                                           for create, defaults to the cluster
                                           default.
--replica-assignment <String:            A list of manual partition-to-broker
  broker_id_for_part1_replica1 :           assignments for the topic being
  broker_id_for_part1_replica2 ,           created or altered.
  broker_id_for_part2_replica1 :
  broker_id_for_part2_replica2 , ...>
--replication-factor <Integer:           The replication factor for each
  replication factor>                      partition in the topic being
                                           created. If not supplied, defaults
                                           to the cluster default.
--topic <String: topic>                  The topic to create, alter, describe
                                           or delete. It also accepts a regular
                                           expression, except for --create
                                           option. Put topic name in double
                                           quotes and use the '\' prefix to
                                           escape regular expression symbols; e.
                                           g. "test\.topic".
--topic-id <String: topic-id>            The topic-id to describe.This is used
                                           only with --bootstrap-server option
                                           for describing topics.
--topics-with-overrides                  ifset when describing topics, only
                                           show topics that have overridden
                                           configs
--unavailable-partitions                 ifset when describing topics, only
                                           show partitions whose leader is not
                                           available
--under-min-isr-partitions               ifset when describing topics, only
                                           show partitions whose isr count is
                                           less than the configured minimum.
--under-replicated-partitions            ifset when describing topics, only
                                           show under replicated partitions
--version                                Display Kafka version.

列出主要命令
选项描述信息–bootstrap-server <String: server to connect to>连接kafka Broker 主机名:端口号–topic <String: topic>操作的topic名称–create创建topic–delete删除topic–alter修改topic–list查看所有topic–describe查看topic的详细描述信息–partitions <Integer: # of partitions>设置分区数–replication-factor <Integer: replication factor>设置分区副本–config <String: name=value>更细系统默认的配置

查看topic

kafka-topics.sh --list --bootstrap-server localhost:9092

或者,查看指定的topic

kafka-topics.sh --bootstrap-server localhost:9092 --topic 名称 --describe

生产环境中可能有多台机器,为了保证高可用性,防止访问的kafka挂掉之后无法查询数据,可以指定多个实例,如:

kafka-topics.sh --list --bootstrap-server localhost:9092 hostName1:9092 hostName2:9092

创建topic

创建之前先查看一下有多少个topic

kafka-topics.sh --list --bootstrap-server localhost:9092

一看,啥也木有,看了个寂寞,这就对溜,你不创建topic,它就没topic
在这里插入图片描述创建单副本,3个分区的 名 为 first的topic

 kafka-topics.sh --bootstrap-server localhost:9092 --topic first --create --partitions 3 --replication-factor 1

查看一下:

root@Ophelia:~# kafka-topics.sh --list --bootstrap-server localhost:9092
first

或者

kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
Topic: first    TopicId: Db5XuRoASzi4W2FgQbaW7A PartitionCount: 3       ReplicationFactor: 1    Configs:
        Topic: first    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: first    Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: first    Partition: 2    Leader: 0       Replicas: 0     Isr: 0

修改topic

first topic的分区数从3个修改成4个,注意,分区数在命令行中只能增加,不能减少

kafka-topics.sh --bootstrap-server localhost:9092 --topic first --alter --partitions 4

尝试一下,会报错的

root@Ophelia:~# kafka-topics.sh --bootstrap-server localhost:9092 --topic first --alter --partitions 3
Error while executing topic command: Topic currently has 4 partitions, which is higher than the requested 3.
[2023-12-17 17:55:41,130] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 3.
 (kafka.admin.TopicCommand$)

删除topic

删除名为first的topic

kafka-topics.sh --bootstrap-server localhost:9092 --topic first --delete

kafka-console-producer.sh生产者相关操作

root@Ophelia:~# kafka-console-producer.sh
Missing required option(s)[bootstrap-server]
Option                                   Description
------                                   -----------
--batch-size <Integer: size>             Number of messages to send in a single
                                           batch if they are not being sent
                                           synchronously. please note that this
                                           option will be replaced if max-
                                           partition-memory-bytes is also set(default: 16384)
--bootstrap-server <String: server to    REQUIRED unless --broker-list
  connect to>(deprecated) is specified. The server
                                           (s) to connect to. The broker list
                                           string in the form HOST1:PORT1,HOST2:
                                           PORT2.
--broker-list <String: broker-list>      DEPRECATED, use --bootstrap-server
                                           instead; ignored if --bootstrap-
                                           server is specified.  The broker
                                           list string in the form HOST1:PORT1,
                                           HOST2:PORT2.
--compression-codec [String:             The compression codec: either 'none',
  compression-codec]'gzip', 'snappy', 'lz4', or 'zstd'.
                                           If specified without value, then it
                                           defaults to 'gzip'
--help                                   Print usage information.
--line-reader <String: reader_class>     The class name of the class to use for
                                           reading lines from standard in. By
                                           default each line is read as a
                                           separate message. (default: kafka.
                                           tools.
                                           ConsoleProducer$LineMessageReader)
--max-block-ms <Long: max block on       The max time that the producer will
  send>                                    block for during a send request.
                                           (default: 60000)
--max-memory-bytes <Long: total memory   The total memory used by the producer
  in bytes>                                to buffer records waiting to be sent
                                           to the server. This is the option to
                                           control `buffer.memory`in producer
                                           configs. (default: 33554432)
--max-partition-memory-bytes <Integer:   The buffer size allocated for a
  memory in bytes per partition>           partition. When records are received
                                           which are smaller than this size the
                                           producer will attempt to
                                           optimistically group them together
                                           until this size is reached. This is
                                           the option to control `batch.size`in producer configs. (default: 16384)
--message-send-max-retries <Integer>     Brokers can fail receiving the message
                                           for multiple reasons, and being
                                           unavailable transiently is just one
                                           of them. This property specifies the
                                           number of retries before the
                                           producer give up and drop this
                                           message. This is the option to
                                           control `retries`in producer
                                           configs. (default: 3)
--metadata-expiry-ms <Long: metadata     The period of timein milliseconds
  expiration interval>                     after which we force a refresh of
                                           metadata even if we haven't seen any
                                           leadership changes. This is the
                                           option to control `metadata.max.age.
                                           ms`in producer configs. (default:
                                           300000)
--producer-property <String:             A mechanism to pass user-defined
  producer_prop>                           properties in the form key=value to
                                           the producer.
--producer.config <String: config file>  Producer config properties file. Note
                                           that [producer-property] takes
                                           precedence over this config.
--property <String: prop>                A mechanism to pass user-defined
                                           properties in the form key=value to
                                           the message reader. This allows
                                           custom configuration for a user-
                                           defined message reader.
                                         Default properties include:
                                          parse.key=false
                                          parse.headers=false
                                          ignore.error=false
                                          key.separator=\t
                                          headers.delimiter=\t
                                          headers.separator=,
                                          headers.key.separator=:
                                          null.marker=   When set, any fields
                                           (key, value and headers) equal to
                                           this will be replaced by null
                                         Default parsing pattern when:
                                          parse.headers=true and parse.key=true:
                                           "h1:v1,h2:v2...\tkey\tvalue"
                                          parse.key=true:
                                           "key\tvalue"
                                          parse.headers=true:
                                           "h1:v1,h2:v2...\tvalue"
--reader-config <String: config file>    Config properties filefor the message
                                           reader. Note that [property] takes
                                           precedence over this config.
--request-required-acks <String:         The required `acks` of the producer
  request required acks>                   requests (default: -1)
--request-timeout-ms <Integer: request   The ack timeout of the producer
  timeout ms>                              requests. Value must be non-negative
                                           and non-zero. (default: 1500)
--retry-backoff-ms <Long>                Before each retry, the producer
                                           refreshes the metadata of relevant
                                           topics. Since leader election takes
                                           a bit of time, this property
                                           specifies the amount of time that
                                           the producer waits before refreshing
                                           the metadata. This is the option to
                                           control `retry.backoff.ms`in
                                           producer configs. (default: 100)
--socket-buffer-size <Integer: size>     The size of the tcp RECV size. This is
                                           the option to control `send.buffer.
                                           bytes`in producer configs.
                                           (default: 102400)
--sync                                   If set message send requests to the
                                           brokers are synchronously, one at a
                                           time as they arrive.
--timeout <Long: timeout_ms>             If set and the producer is running in
                                           asynchronous mode, this gives the
                                           maximum amount of time a message
                                           will queue awaiting sufficient batch
                                           size. The value is given in ms. This
                                           is the option to control `linger.ms`in producer configs. (default: 1000)
--topic <String: topic>                  REQUIRED: The topic id to produce
                                           messages to.
--version                                Display Kafka version.

生产消息

用生产者连接broker

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first

再启动一个命令行

wsl --user root --cd ~

启动一个消费者

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first

生产者发送消息
消费者接收消息
在这里插入图片描述

kafka-console-consumer.sh消费者相关操作

root@Ophelia:~# kafka-console-consumer.sh
This tool helps to read data from Kafka topics and outputs it to standard output.
Option                                   Description

------                                   -----------

--bootstrap-server <String: server to    REQUIRED: The server(s) to connect to.
  connect to>

--consumer-property <String:             A mechanism to pass user-defined
  consumer_prop>                           properties in the form key=value to
                                           the consumer.

--consumer.config <String: config file>  Consumer config properties file. Note
                                           that [consumer-property] takes
                                           precedence over this config.
--enable-systest-events                  Log lifecycle events of the consumer
                                           in addition to logging consumed
                                           messages. (This is specific for
                                           system tests.)

--formatter <String: class>              The name of a class to use for
                                           formatting kafka messages for
                                           display. (default: kafka.tools.
                                           DefaultMessageFormatter)
--formatter-config <String: config       Config properties file to initialize
  file>                                    the message formatter. Note that
                                           [property] takes precedence over
                                           this config.

--from-beginning                         If the consumer does not already have
                                           an established offset to consume
                                           from, start with the earliest
                                           message present in the log rather
                                           than the latest message.
--group <String: consumer group id>      The consumer group id of the consumer.
--help                                   Print usage information.
--include <String: Java regex (String)>  Regular expression specifying list of
                                           topics to include for consumption.
--isolation-level <String>               Set to read_committed in order to
                                           filter out transactional messages
                                           which are not committed. Set to
                                           read_uncommitted to read all
                                           messages. (default: read_uncommitted)
--key-deserializer <String:

  deserializer for key>

--max-messages <Integer: num_messages>   The maximum number of messages to
                                           consume before exiting. If not set,
                                           consumption is continual.
--offset <String: consume offset>        The offset to consume from (a non-
                                           negative number), or 'earliest'which means from beginning, or
                                           'latest'which means from end
                                           (default: latest)
--partition <Integer: partition>         The partition to consume from.
                                           Consumption starts from the end of
                                           the partition unless '--offset' is
                                           specified.

--property <String: prop>                The properties to initialize the
                                           message formatter. Default
                                           properties include:
                                          print.timestamp=true|false
                                          print.key=true|false
                                          print.offset=true|false
                                          print.partition=true|false
                                          print.headers=true|false
                                          print.value=true|false
                                          key.separator=<key.separator>
                                          line.separator=<line.separator>
                                          headers.separator=<line.separator>
                                          null.literal=<null.literal>
                                          key.deserializer=<key.deserializer>
                                          value.deserializer=<value.
                                           deserializer>

                                          header.deserializer=<header.
                                           deserializer>

                                         Users can also pass in customized
                                           properties for their formatter;more
                                           specifically, users can pass in
                                           properties keyed with 'key.
                                           deserializer.', 'value.
                                           deserializer.' and 'headers.
                                           deserializer.' prefixes to configure
                                           their deserializers.
--skip-message-on-error                  If there is an error when processing a
                                           message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms>       If specified, exitif no message is
                                           available for consumption for the
                                           specified interval.
--topic <String: topic>                  The topic to consume on.
--value-deserializer <String:

  deserializer for values>

--version                                Display Kafka version.
--whitelist <String: Java regex          DEPRECATED, use --include instead;(String)>                                ignored if --include specified.
                                           Regular expression specifying list
                                           of topics to include for consumption.
增量接收
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
接收历史全部数据 –from-beginning
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning

使用场景:根据情况而定

处理分区leader分布不均匀的问题
kafka-leader-election.sh --bootstrap-server localhost:9092 --topic first --election-type preferred --partition 0
标签: kafka 中间件

本文转载自: https://blog.csdn.net/dawnto/article/details/135045055
版权归原作者 metabit 所有, 如有侵权,请联系我们删除。

“wsl kafka的简单应用”的评论:

还没有评论