0


wsl kafka的简单应用

安装并配置单机版kafka所需环境

wsl2 环境可用性较高,如下介绍在该环境中安装单机版本kafka的详细过程。

  1. 启动命令行工具
  2. 启动wsl:wsl --user root --cd ~,(以root用户启动,进入wsl后当前路径为~“用户主目录”)
  3. 安装java:进入:https://learn.microsoft.com/zh-cn/java/openjdk/download,选择相应的java版本,下载接口 - 创建java的安装路径:mkdir -p /opt/sdk/java- 将刚刚下载的javasdk压缩包移动进创建的路径:mv /mnt/c/Users/你的用户名/Downloads/microsoft-jdk-21.0.1-linux-aarch64.tar.gz /opt/sdk/java/- 切换到你java安装路径:cd /opt/sdk/java/- 解压安装:tar -zxvf microsoft-jdk-21.0.1-linux-aarch64.tar.gz- 可以选择性删除你刚刚的压缩包:rm -rf microsoft-jdk-21.0.1-linux-aarch64.tar.gz- 配置环境变量 在这里插入图片描述进入jdk的解压目录:cd cd jdk-21.0.1+12 查看一下当前的绝对路径:pwd 复制一下绝对路径的字符串- 使用vim 打开 /etc/profile- GG到文件尾部,在文件尾部追加如下内容:exportJAVA_HOME=/opt/sdk/java/jdk-21.0.1+12exportPATH=$PATH:$JAVA_HOME/bin 注意JAVA_HOME后边的jdk_xxx是你自己下载的相应版本信息- 让profile文件生效:source /etc/profile- 查看java是否安装成功:java -version,显示版本信息则安装成功 root@Ophelia:/opt/sdk/java/jdk-21.0.1+12# java -version openjdk version "21.0.1"2023-10-17 LTS OpenJDK Runtime Environment Microsoft-8526870 (build 21.0.1+12-LTS) OpenJDK 64-Bit Server VM Microsoft-8526870 (build 21.0.1+12-LTS, mixed mode)
  4. 安装kafka - 在kafka官网下载相应版本kafka,我这里选最新版:Scala 2.13 - kafka_2.13-3.6.0.tgz (asc, sha512)- 创建kafka的安装路径:mkdir -p /opt/software/kafka- 将刚刚下载的kafka压缩包移动进创建的路径:mv /mnt/c/Users/你的用户名/Downloads/kafka_2.13-3.6.1.tgz /opt/software/kafka/- cd /opt/software/kafka/- tar -zxvf kafka_2.13-3.6.1.tgz- cd kafka_2.13-3.6.1/- pwd,然后复制绝对路径- vim /etc/profile 追加如下内容# KafkaexportKAFKA_HOME=/opt/software/kafka/kafka_2.13-3.6.1 exportPATH=$PATH:$KAFKA_HOME/bin- source /etc/profile- 配置文件修改: vim /opt/software/kafka/kafka_2.13-3.6.1/config/server.properties- 找到log.dirs=/tmp/kafka-logs将其修改为log.dirs=/opt/software/kafka/kafka-logs- 如下所示:# log.dirs=/tmp/kafka-logs log.dirs=/opt/software/kafka/kafka-logs- mkdir -p /opt/software/kafka/kafka-logs- 上述操作目的是防止topic中的数据丢失

启动kafka

启动kafka之前先启动zookeeper, Kafka 2.8.0 版本开始,内置了zookeeper。

  1. 启动zookeeperzookeeper-server-start.sh -daemon /opt/software/kafka/kafka_2.13-3.6.1/config/zookeeper.properties 命令看起来有点长,实际上一点儿也不短。 它分为3段[zookeeper-server-start.sh][-daemon][配置文件路径] - -daemon参数是以守护进程的方式启动。- zookeeper的启动脚本也存储在kafka的环境变量指向的路径中。
  2. 启动kafkakafka-server-start.sh -daemon /opt/software/kafka/kafka_2.13-3.6.1/config/server.properties

关闭kafka

先停止kafka,后停止zookeeper

  1. root@Ophelia:~# kafka-server-stop.sh
  2. root@Ophelia:~# zookeeper-server-stop.sh

当停用kafka集群的时候也是先停止kafka,后停止zookeeper,顺序搞反了会导致kafka无法停止,只能使用kill去杀死。

kafka-topics.sh 主题相关操作

查看一下kafka-tpics.sh命令都能干什么

  1. root@Ophelia:~# kafka-topics.sh
  2. Create, delete, describe, or change a topic.
  3. Option Description
  4. ------ -----------
  5. --alter Alter the number of partitions and
  6. replica assignment. Update the
  7. configuration of an existing topic
  8. via --alter is no longer supported
  9. here (the kafka-configs CLI supports
  10. altering topic configs with a --
  11. bootstrap-server option).
  12. --at-min-isr-partitions ifset when describing topics, only
  13. show partitions whose isr count is
  14. equal to the configured minimum.
  15. --bootstrap-server <String: server to REQUIRED: The Kafka server to connect
  16. connect to> to.
  17. --command-config <String: command Property file containing configs to be
  18. config property file> passed to Admin Client. This is used
  19. only with --bootstrap-server option
  20. for describing and altering broker
  21. configs.
  22. --config <String: name=value> A topic configuration override for the
  23. topic being created or altered. The
  24. following is a list of valid
  25. configurations:
  26. cleanup.policy
  27. compression.type
  28. delete.retention.ms
  29. file.delete.delay.ms
  30. flush.messages
  31. flush.ms
  32. follower.replication.throttled.
  33. replicas
  34. index.interval.bytes
  35. leader.replication.throttled.replicas
  36. local.retention.bytes
  37. local.retention.ms
  38. max.compaction.lag.ms
  39. max.message.bytes
  40. message.downconversion.enable
  41. message.format.version
  42. message.timestamp.after.max.ms
  43. message.timestamp.before.max.ms
  44. message.timestamp.difference.max.ms
  45. message.timestamp.type
  46. min.cleanable.dirty.ratio
  47. min.compaction.lag.ms
  48. min.insync.replicas
  49. preallocate
  50. remote.storage.enable
  51. retention.bytes
  52. retention.ms
  53. segment.bytes
  54. segment.index.bytes
  55. segment.jitter.ms
  56. segment.ms
  57. unclean.leader.election.enable
  58. See the Kafka documentation for full
  59. details on the topic configs. It is
  60. supported only in combination with --
  61. create if --bootstrap-server option
  62. is used (the kafka-configs CLI
  63. supports altering topic configs with
  64. a --bootstrap-server option).
  65. --create Create a new topic.
  66. --delete Delete a topic
  67. --delete-config <String: name> A topic configuration override to be
  68. removed for an existing topic (see
  69. the list of configurations under the
  70. --config option). Not supported with
  71. the --bootstrap-server option.
  72. --describe List details for the given topics.
  73. --exclude-internal exclude internal topics when running
  74. list or describe command. The
  75. internal topics will be listed by
  76. default
  77. --help Print usage information.
  78. --if-exists ifset when altering or deleting or
  79. describing topics, the action will
  80. only execute if the topic exists.
  81. --if-not-exists ifset when creating topics, the
  82. action will only execute if the
  83. topic does not already exist.
  84. --list List all available topics.
  85. --partitions <Integer: # of partitions> The number of partitions for the topic
  86. being created or altered (WARNING:
  87. If partitions are increased for a
  88. topic that has a key, the partition
  89. logic or ordering of the messages
  90. will be affected). If not supplied
  91. for create, defaults to the cluster
  92. default.
  93. --replica-assignment <String: A list of manual partition-to-broker
  94. broker_id_for_part1_replica1 : assignments for the topic being
  95. broker_id_for_part1_replica2 , created or altered.
  96. broker_id_for_part2_replica1 :
  97. broker_id_for_part2_replica2 , ...>
  98. --replication-factor <Integer: The replication factor for each
  99. replication factor> partition in the topic being
  100. created. If not supplied, defaults
  101. to the cluster default.
  102. --topic <String: topic> The topic to create, alter, describe
  103. or delete. It also accepts a regular
  104. expression, except for --create
  105. option. Put topic name in double
  106. quotes and use the '\' prefix to
  107. escape regular expression symbols; e.
  108. g. "test\.topic".
  109. --topic-id <String: topic-id> The topic-id to describe.This is used
  110. only with --bootstrap-server option
  111. for describing topics.
  112. --topics-with-overrides ifset when describing topics, only
  113. show topics that have overridden
  114. configs
  115. --unavailable-partitions ifset when describing topics, only
  116. show partitions whose leader is not
  117. available
  118. --under-min-isr-partitions ifset when describing topics, only
  119. show partitions whose isr count is
  120. less than the configured minimum.
  121. --under-replicated-partitions ifset when describing topics, only
  122. show under replicated partitions
  123. --version Display Kafka version.

列出主要命令
选项描述信息–bootstrap-server <String: server to connect to>连接kafka Broker 主机名:端口号–topic <String: topic>操作的topic名称–create创建topic–delete删除topic–alter修改topic–list查看所有topic–describe查看topic的详细描述信息–partitions <Integer: # of partitions>设置分区数–replication-factor <Integer: replication factor>设置分区副本–config <String: name=value>更细系统默认的配置

查看topic

  1. kafka-topics.sh --list --bootstrap-server localhost:9092

或者,查看指定的topic

  1. kafka-topics.sh --bootstrap-server localhost:9092 --topic 名称 --describe

生产环境中可能有多台机器,为了保证高可用性,防止访问的kafka挂掉之后无法查询数据,可以指定多个实例,如:

  1. kafka-topics.sh --list --bootstrap-server localhost:9092 hostName1:9092 hostName2:9092

创建topic

创建之前先查看一下有多少个topic

  1. kafka-topics.sh --list --bootstrap-server localhost:9092

一看,啥也木有,看了个寂寞,这就对溜,你不创建topic,它就没topic
在这里插入图片描述创建单副本,3个分区的 名 为 first的topic

  1. kafka-topics.sh --bootstrap-server localhost:9092 --topic first --create --partitions 3 --replication-factor 1

查看一下:

  1. root@Ophelia:~# kafka-topics.sh --list --bootstrap-server localhost:9092
  2. first

或者

  1. kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
  2. Topic: first TopicId: Db5XuRoASzi4W2FgQbaW7A PartitionCount: 3 ReplicationFactor: 1 Configs:
  3. Topic: first Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  4. Topic: first Partition: 1 Leader: 0 Replicas: 0 Isr: 0
  5. Topic: first Partition: 2 Leader: 0 Replicas: 0 Isr: 0

修改topic

first topic的分区数从3个修改成4个,注意,分区数在命令行中只能增加,不能减少

  1. kafka-topics.sh --bootstrap-server localhost:9092 --topic first --alter --partitions 4

尝试一下,会报错的

  1. root@Ophelia:~# kafka-topics.sh --bootstrap-server localhost:9092 --topic first --alter --partitions 3
  2. Error while executing topic command: Topic currently has 4 partitions, which is higher than the requested 3.
  3. [2023-12-17 17:55:41,130] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 3.
  4. (kafka.admin.TopicCommand$)

删除topic

删除名为first的topic

  1. kafka-topics.sh --bootstrap-server localhost:9092 --topic first --delete

kafka-console-producer.sh生产者相关操作

  1. root@Ophelia:~# kafka-console-producer.sh
  2. Missing required option(s)[bootstrap-server]
  3. Option Description
  4. ------ -----------
  5. --batch-size <Integer: size> Number of messages to send in a single
  6. batch if they are not being sent
  7. synchronously. please note that this
  8. option will be replaced if max-
  9. partition-memory-bytes is also set(default: 16384)
  10. --bootstrap-server <String: server to REQUIRED unless --broker-list
  11. connect to>(deprecated) is specified. The server
  12. (s) to connect to. The broker list
  13. string in the form HOST1:PORT1,HOST2:
  14. PORT2.
  15. --broker-list <String: broker-list> DEPRECATED, use --bootstrap-server
  16. instead; ignored if --bootstrap-
  17. server is specified. The broker
  18. list string in the form HOST1:PORT1,
  19. HOST2:PORT2.
  20. --compression-codec [String: The compression codec: either 'none',
  21. compression-codec]'gzip', 'snappy', 'lz4', or 'zstd'.
  22. If specified without value, then it
  23. defaults to 'gzip'
  24. --help Print usage information.
  25. --line-reader <String: reader_class> The class name of the class to use for
  26. reading lines from standard in. By
  27. default each line is read as a
  28. separate message. (default: kafka.
  29. tools.
  30. ConsoleProducer$LineMessageReader)
  31. --max-block-ms <Long: max block on The max time that the producer will
  32. send> block for during a send request.
  33. (default: 60000)
  34. --max-memory-bytes <Long: total memory The total memory used by the producer
  35. in bytes> to buffer records waiting to be sent
  36. to the server. This is the option to
  37. control `buffer.memory`in producer
  38. configs. (default: 33554432)
  39. --max-partition-memory-bytes <Integer: The buffer size allocated for a
  40. memory in bytes per partition> partition. When records are received
  41. which are smaller than this size the
  42. producer will attempt to
  43. optimistically group them together
  44. until this size is reached. This is
  45. the option to control `batch.size`in producer configs. (default: 16384)
  46. --message-send-max-retries <Integer> Brokers can fail receiving the message
  47. for multiple reasons, and being
  48. unavailable transiently is just one
  49. of them. This property specifies the
  50. number of retries before the
  51. producer give up and drop this
  52. message. This is the option to
  53. control `retries`in producer
  54. configs. (default: 3)
  55. --metadata-expiry-ms <Long: metadata The period of timein milliseconds
  56. expiration interval> after which we force a refresh of
  57. metadata even if we haven't seen any
  58. leadership changes. This is the
  59. option to control `metadata.max.age.
  60. ms`in producer configs. (default:
  61. 300000)
  62. --producer-property <String: A mechanism to pass user-defined
  63. producer_prop> properties in the form key=value to
  64. the producer.
  65. --producer.config <String: config file> Producer config properties file. Note
  66. that [producer-property] takes
  67. precedence over this config.
  68. --property <String: prop> A mechanism to pass user-defined
  69. properties in the form key=value to
  70. the message reader. This allows
  71. custom configuration for a user-
  72. defined message reader.
  73. Default properties include:
  74. parse.key=false
  75. parse.headers=false
  76. ignore.error=false
  77. key.separator=\t
  78. headers.delimiter=\t
  79. headers.separator=,
  80. headers.key.separator=:
  81. null.marker= When set, any fields
  82. (key, value and headers) equal to
  83. this will be replaced by null
  84. Default parsing pattern when:
  85. parse.headers=true and parse.key=true:
  86. "h1:v1,h2:v2...\tkey\tvalue"
  87. parse.key=true:
  88. "key\tvalue"
  89. parse.headers=true:
  90. "h1:v1,h2:v2...\tvalue"
  91. --reader-config <String: config file> Config properties filefor the message
  92. reader. Note that [property] takes
  93. precedence over this config.
  94. --request-required-acks <String: The required `acks` of the producer
  95. request required acks> requests (default: -1)
  96. --request-timeout-ms <Integer: request The ack timeout of the producer
  97. timeout ms> requests. Value must be non-negative
  98. and non-zero. (default: 1500)
  99. --retry-backoff-ms <Long> Before each retry, the producer
  100. refreshes the metadata of relevant
  101. topics. Since leader election takes
  102. a bit of time, this property
  103. specifies the amount of time that
  104. the producer waits before refreshing
  105. the metadata. This is the option to
  106. control `retry.backoff.ms`in
  107. producer configs. (default: 100)
  108. --socket-buffer-size <Integer: size> The size of the tcp RECV size. This is
  109. the option to control `send.buffer.
  110. bytes`in producer configs.
  111. (default: 102400)
  112. --sync If set message send requests to the
  113. brokers are synchronously, one at a
  114. time as they arrive.
  115. --timeout <Long: timeout_ms> If set and the producer is running in
  116. asynchronous mode, this gives the
  117. maximum amount of time a message
  118. will queue awaiting sufficient batch
  119. size. The value is given in ms. This
  120. is the option to control `linger.ms`in producer configs. (default: 1000)
  121. --topic <String: topic> REQUIRED: The topic id to produce
  122. messages to.
  123. --version Display Kafka version.

生产消息

用生产者连接broker

  1. kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first

再启动一个命令行

  1. wsl --user root --cd ~

启动一个消费者

  1. kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first

生产者发送消息
消费者接收消息
在这里插入图片描述

kafka-console-consumer.sh消费者相关操作

  1. root@Ophelia:~# kafka-console-consumer.sh
  2. This tool helps to read data from Kafka topics and outputs it to standard output.
  3. Option Description
  4. ------ -----------
  5. --bootstrap-server <String: server to REQUIRED: The server(s) to connect to.
  6. connect to>
  7. --consumer-property <String: A mechanism to pass user-defined
  8. consumer_prop> properties in the form key=value to
  9. the consumer.
  10. --consumer.config <String: config file> Consumer config properties file. Note
  11. that [consumer-property] takes
  12. precedence over this config.
  13. --enable-systest-events Log lifecycle events of the consumer
  14. in addition to logging consumed
  15. messages. (This is specific for
  16. system tests.)
  17. --formatter <String: class> The name of a class to use for
  18. formatting kafka messages for
  19. display. (default: kafka.tools.
  20. DefaultMessageFormatter)
  21. --formatter-config <String: config Config properties file to initialize
  22. file> the message formatter. Note that
  23. [property] takes precedence over
  24. this config.
  25. --from-beginning If the consumer does not already have
  26. an established offset to consume
  27. from, start with the earliest
  28. message present in the log rather
  29. than the latest message.
  30. --group <String: consumer group id> The consumer group id of the consumer.
  31. --help Print usage information.
  32. --include <String: Java regex (String)> Regular expression specifying list of
  33. topics to include for consumption.
  34. --isolation-level <String> Set to read_committed in order to
  35. filter out transactional messages
  36. which are not committed. Set to
  37. read_uncommitted to read all
  38. messages. (default: read_uncommitted)
  39. --key-deserializer <String:
  40. deserializer for key>
  41. --max-messages <Integer: num_messages> The maximum number of messages to
  42. consume before exiting. If not set,
  43. consumption is continual.
  44. --offset <String: consume offset> The offset to consume from (a non-
  45. negative number), or 'earliest'which means from beginning, or
  46. 'latest'which means from end
  47. (default: latest)
  48. --partition <Integer: partition> The partition to consume from.
  49. Consumption starts from the end of
  50. the partition unless '--offset' is
  51. specified.
  52. --property <String: prop> The properties to initialize the
  53. message formatter. Default
  54. properties include:
  55. print.timestamp=true|false
  56. print.key=true|false
  57. print.offset=true|false
  58. print.partition=true|false
  59. print.headers=true|false
  60. print.value=true|false
  61. key.separator=<key.separator>
  62. line.separator=<line.separator>
  63. headers.separator=<line.separator>
  64. null.literal=<null.literal>
  65. key.deserializer=<key.deserializer>
  66. value.deserializer=<value.
  67. deserializer>
  68. header.deserializer=<header.
  69. deserializer>
  70. Users can also pass in customized
  71. properties for their formatter;more
  72. specifically, users can pass in
  73. properties keyed with 'key.
  74. deserializer.', 'value.
  75. deserializer.' and 'headers.
  76. deserializer.' prefixes to configure
  77. their deserializers.
  78. --skip-message-on-error If there is an error when processing a
  79. message, skip it instead of halt.
  80. --timeout-ms <Integer: timeout_ms> If specified, exitif no message is
  81. available for consumption for the
  82. specified interval.
  83. --topic <String: topic> The topic to consume on.
  84. --value-deserializer <String:
  85. deserializer for values>
  86. --version Display Kafka version.
  87. --whitelist <String: Java regex DEPRECATED, use --include instead;(String)> ignored if --include specified.
  88. Regular expression specifying list
  89. of topics to include for consumption.
增量接收
  1. kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
接收历史全部数据 –from-beginning
  1. kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning

使用场景:根据情况而定

处理分区leader分布不均匀的问题
  1. kafka-leader-election.sh --bootstrap-server localhost:9092 --topic first --election-type preferred --partition 0
标签: kafka 中间件

本文转载自: https://blog.csdn.net/dawnto/article/details/135045055
版权归原作者 metabit 所有, 如有侵权,请联系我们删除。

“wsl kafka的简单应用”的评论:

还没有评论