0


Kafka详解(中)——Kafka客户端操作

3-1 shell列举

​ kafka安装目录下的bin目录包含了很多运维可操作的shell脚本,列举如下:
脚本名称用途描述connect-distributed.sh连接kafka集群模式connect-standalone.sh连接kafka单机模式kafka-acls.sh设置Kafka权限kafka-broker-api-versions.sh检索代理版本信息kafka-configs.sh配置管理脚本kafka-console-consumer.shkafka消费者控制台kafka-console-producer.shkafka生产者控制台kafka-consumer-groups.shkafka消费者组相关信息kafka-consumer-perf-test.shkafka消费者性能测试脚本kafka-delegation-tokens.sh管理Delegation Tokenkafka-delete-records.sh将给定分区的日志向下删除到指定的偏移量kafka-dump-log.sh用来查看Topic的的文件内容kafka-log-dirs.sh查询各个Broker上的各个日志路径的磁盘占用情况kafka-mirror-maker.sh在Kafka集群间实现数据镜像kafka-preferred-replica-election.sh触发preferred replica选举kafka-producer-perf-test.shkafka生产者性能测试脚本kafka-reassign-partitions.sh分区重分配脚本kafka-replica-verification.sh复制进度验证脚本kafka-run-class.sh执行任何带main方法的Kafka类kafka-server-start.sh启动kafka服务kafka-server-stop.sh停止kafka服务kafka-simple-consumer-shell.shdeprecated,推荐使用kafka-console-consumer.shkafka-streams-application-reset.sh给Kafka Streams应用程序重设位移,以便重新消费数据kafka-topics.shtopic管理脚本kafka-verifiable-consumer.sh可检验的kafka消费者kafka-verifiable-producer.sh可检验的kafka生产者trogdor.shKafka的测试框架,用于执行各种基准测试和负载测试zookeeper-server-start.sh启动zookeeper服务zookeeper-server-stop.sh停止zookeeper服务zookeeper-shell.sh连接操作zookeeper的脚本,可以查看kafka在zk上的节点信息
​ 接下来会详细说明常用脚本的使用方法。

3-2 topic管理

​ 对于kafka的topic操作,我们需要用到的是

bin/kafka-topics.sh

这个脚本文件。

  • 创建topic:创建一个名为my-topic的主题(–bootstrap-server后面是连接Kafka的地址)
bin/kafka-topics.sh --bootstrap-server 192.168.8.128:9092 --create --topic my-topic

注意:Topic 名称中一定不要同时出现下划线 (’_’) 和小数点 (’.’)。

​ 在创建topic时选项说明:

--topic test

:定义topic名称

--partitions 3

:指定当前topic的分区数,若不指定则根据配置文件的默认分区数进行创建

--replication-factor 1

: 定义副本数为1,副本数不能超过当前集群broker数,否则会抛出InvalidReplicationFactorException异常
在这里插入图片描述

  • 查看当前服务器所有topic
bin/kafka-topics.sh --bootstrap-server 192.168.8.128:9092 --list

在这里插入图片描述

  • 修改topic配置 1.增加分区数:
bin/kafka-topics.sh --bootstrap-server 192.168.8.128:9092 --alter --topic test  --partitions 40

在这里插入图片描述
2. 增加配置

bin/kafka-topics.sh --alter --bootstrap-server 192.168.8.128:9092 --topic test --config cleanup.policy=compact
  1. 删除配置
bin/kafka-topics.sh --alter --bootstrap-server 192.168.8.128:9092 --topic test --delete-config flush.messages

当如下所示的属性配置到 Topic 上时,将会覆盖 server.properties 上对应的属性。
Topic级别配置属性类型有效值描述cleanup.policylistdelete(默认)
compact过期或达到上限日志的清理策略。
delete:删除
compact:压缩compression.typestringuncompressed
snappy
lz4
gzip
producer(默认)指定给该topic最终的压缩类型delete.retention.mslong86400000(默认)压缩的日志保留的最长时间,也是客户端消费消息的最长时间。
与 log.retention.minutes 的区别在于:一个控制未压缩的数据,一个控制压缩后的数据。file.delete.delay.mslong60000从文件系统中删除前所等待的时间flush.messageslong9223372036854775807在消息刷到磁盘之前,日志分区收集的消息数flush.mslong9223372036854775807消息在刷到磁盘之前,保存在内存中的最长时间,单位是msindex.interval.bytesint4096执行 fetch 操作后,扫描最近的 offset 运行空间的大小。 设置越大,代表扫描速度越快,但是也更耗内存。 (一般情况下不需要设置此参数)message.max.bytesint1000012log中能够容纳消息的最大字节数min.cleanable.dirty.ratiodouble0.5日志清理的频率控制,占该log的百分比。
越大意味着更高效的清理,同时会存在空间浪费问题retention.byteslong-1(默认)topic每个分区的最大文件大小。
一个 topic 的大小限制 = 分区数 * log.retention.bytes。
-1 表示没有大小限制。retention.msint604800000(默认)日志文件保留的分钟数。 数据存储的最大时间超过这个时间会根据 log.cleanup.policy 设置的策略处理数据segment.bytesint1073741824(默认)每个 segment 的大小 (默认为1G)segment.index.bytesint10485760(默认)对于segment日志的索引文件大小限制(默认为10M)

注意

  1. partition数量只能增加,不能减少[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JcoyuUM8-1651065487273)(Kafka.assets/image-20211022172107197.png)]
  1. 此脚本不能用来修改副本个数。(使用 kafka-reassign-partitions.sh 脚本修改副本数)- 首先根据需要创建topic文件配置partitions-topic.json,配置内容如下:{"partitions":[{"topic":"test", "partition":0, "replicas":[1,2]}, {"topic":"test", "partition":1, "replicas":[1,3]}, {"topic":"test", "partition":2, "replicas":[2,3]}], "version":1}- 执行副本搬迁bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.8.128:9092 --reassignment-json-file partitions-topic.json --execute - 查看迁移情况:bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.8.128:9092 --reassignment-json-file partitions-topic.json --verify
  • 删除topic
bin/kafka-topics.sh --bootstrap-server 192.168.8.128:9092 --delete --topic my_topic_name
  • 查看topic详情
bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server 196.168.8.128:9092

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-b4g2x7PV-1651065487274)(Kafka.assets/image-20211011170325355.png)]

​ 输出信息中,第一行包含该topic的配置信息(名称,id,分区数,副本数和配置),后面每一行说明一个分区的信息。

​ 其中,Leader,Replicas和Isr后面的数字都是 broker 的 id,Leader 表示该节点负责该分区的所有读写操作,Replicas 表示备份的节点,Isr表示当前 kakfa 集群中可用的 breaker.id 列表。因为目前是单节点,所以所有信息均为0。

3-3 生产者产生数据

​ 消息是kafka中最基本的数据单元,在kafka中,一条消息由key、value两部分构成,在发送一条消息时,我们可以指定这个key,producer的分发机制会根据key来判断当前这条消息应该发送并存储到哪个partition中。默认情况下,kafka采用的是hash取模的分区算法,如果Key为null,则会随机分配一个分区。

​ 生产消息需要使用

bin/kafka-console-producer.sh

脚本。直接输入消息值(value)即可,输入的每一行表示一条消息,都会导致将单独的事件写入主题。每次回车表示触发“发送”操作,回车后可直接使用“Ctrl + c”退出生产者控制台。

bin/kafka-console-producer.sh --topic test --bootstrap-server 192.168.8.128:9092

在这里插入图片描述
如需指定消息的key值可以通过

--property parse.key=true

配置。输入消息时,默认消息的key和value之间使用Table键进行分隔(请勿使用转义字符\t)。

bin/kafka-console-producer.sh --bootstrap-server 192.168.8.128:9092 --topic test --property parse.key=true

在这里插入图片描述
输入如上信息表示所生产的消息“Key1”和"Key2"为消息键,“Value1”和“Value2”为消息值。

​ 下表列举了3.0版本支持的所有参数用法:
参数值类型说明有效值–bootstrap-serverString要连接的服务器
必需(除非指定–broker-list,
但broker-list在新版本中已过时)形如:host1:prot1,host2:prot2–topicString(必需)接收消息的主题名称–broker-listString已过时要连接的服务器形如:host1:prot1,host2:prot2–batch-sizeInteger单个批处理中发送的消息数200(默认值)–compression-codecString压缩编解码器none、gzip(默认值) snappy、lz4、zstd–line-readerString默认情况下,每一行都被读取为单独的消息kfka.tools.
ConsoleProducer$LineMessageReader–max-block-msLong在发送请求期间,
生产者将阻塞的最长时间60000(默认值)–max-memory-bytesLong生产者用来缓冲等待发送到服务器的总内存33554432(默认值)–max-partition-memory-bytesLong为分区分配的缓冲区大小16384(默认值)–message-send-max-retriesInteger最大的重试发送次数3(默认值)–metadata-expiry-msLong强制更新元数据的时间阈值(ms)300000(默认值)–producer-propertyString将自定义属性传递给生产者的机制形如:key=value–producer.configString生产者配置属性文件 ,注意[–producer-property]优先于此配置配置文件完整路径–propertyString自定义消息读取器parse.key=true|false
key.separator=<key.separator> ignore.error=true|false–request-required-acksString生产者请求的确认方式(具体讲解在Producer API)0、1(默认值)、all–request-timeout-msInteger生产者请求的确认超时时间1500(默认值)–retry-backoff-msInteger生产者重试前,刷新元数据的等待时间阈值100(默认值)–socket-buffer-sizeIntegerTCP接收缓冲大小102400(默认值)–timeoutInteger消息排队异步等待处理的时间阈值1000(默认值)–sync同步发送消息–version显示 Kafka 版本 不配合其他参数时,显示为本地Kafka版本–help打印帮助信息

3-4 消费者消费数据

​ 消费消息需要使用

bin/kafka-console-consumer.sh 

脚本。该 shell 脚本的功能通过调用 kafka.tools 包下的

ConsoleConsumer

类,并将提供的命令行参数全部传给该类实现。

​ 打开另一个终端会话并运行控制台使用者客户端以读取刚刚创建的事件:

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server 192.168.8.128:9092

在这里插入图片描述
​ 默认情况消费出来现实的信息是只有消息的Value值,如果要展示消息的Key,时间戳或其他信息需要通过选项–property进行配置。

bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server 192.168.8.128:9092 --property print.partition=true --property print.key=true --property print.timestamp=true --property print.offset=true

在这里插入图片描述
​ 若不适用–from-beginning选项,表示从最新处消费该topic的所有分区的的消息,即仅消费正在写入的消息。加上–from-beginning选项表示从该topic存在的所有消息中从头开始消费。

注意:

consumer默认将offset保存在Kafka一个内置的topic中,该topic名 为

__consumer_offsets

​ 该进程会一直运行,当有新消息进来,这里会直接读取出来消息。当有Leader节点出现错误时,会在剩余的follower中推举出一个leader,而且这些数据还没有丢失,因为follower是leader的备份节点。
参数值类型说明有效值–topicstring被消费的topic–includestring正则表达式,指定要包含以供使用的主题的白名单–partitioninteger指定消费的分区,默认从该分区的末尾开始消费,除非指定了offset–offsetstring执行消费的起始偏移量位置
默认值:latestlatest:从最新处开始消费
earliest :从最早处开始消费
offset:从指定偏移量开始消费–consumer-propertystring将用户定义的属性以key=value的形式传递给使用者–consumer.configstring消费者配置属性文件
[consumer-property]配置优先级高于此配置–formatterstring用于格式化kafka消息以供显示的类的名称 ,kafka.tools下的DefaultMessageFormatter LoggingMessageFormatter NoOpMessageFormatter ChecksumMessageFormatter–propertystring初始化消息格式化程序的属性print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer>–from-beginning从存在的最早消息开始,而不是从最新消息开始–max-messagesinteger设置要消费的最大消息数。如果未设置,则连续消耗–timeout-msinteger如果没有消息可消费,将在指定时间后终止消费者进程–skip-message-on-error如果处理消息时出错,请跳过它而不是暂停–bootstrap-serverstring必需,要连接的服务器–key-deserializerstring消息的key序列化方式–value-deserializerstring消息的value序列化方式–enable-systest-events除记录消费的消息外,还记录消费者的生命周期 (用于系统测试)–isolation-levelstring设置为read_committed以过滤掉未提交的事务性消息
设置为read_uncommitted以读取所有消息read_uncommitted,
read_uncommitted(默认值)–groupstring指定消费者所属组的ID

3-5 配置管理

kafka-configs.sh

配置管理脚本,这个脚本主要分两类用法:describe和alter。

  1. describe相关用法- 查看每个topic的配置bin/kafka-configs.sh --bootstrap-server 192.168.8.128:9092 --describe --entity-type topics- 查看broker的配置bin/kafka-configs.sh --bootstrap-server 192.168.8.128:9092 --describe --entity-type brokers --entity-name 0> 说明:0是broker.id,因为entity-type为brokers,所以entity-name表示broker.id。
  2. alter相关用法- 给指定topic增加配置bin/kafka-configs.sh --bootstrap-server 192.168.8.128:9092 --alter --entity-type topics --entity-name TOPIC-TEST-AFEI --add-config retention.ms=600000- 给指定topic删除配置bin/kafka-configs.sh --bootstrap-server 192.168.8.128:9092 --alter --entity-type topics --entity-name TOPIC-TEST-AFEI --delete-config max.message.bytes

3-6 查看元数据日志

​ 使用过程中,如果遇到问题,可能需要查看元数据日志。在KRaft中,有两个命令行工具需要特别关注下。

kafka-dump-log.sh

kakfa-metadata-shell.log

。(元数据不是真正的数据,规范、定义真实数据的数据)

  1. kafka-dump-log.sh​ Kafka-dump-log.sh是一个之前就有的工具,用来查看Topic的的文件内容。- 查询Log文件bin/kafka-dump-log.sh --files kafka-logs/my-topic-0/00000000000000000000.log[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UZT3SuIx-1651065613020)(Kafka.assets/image-20211028164041786.png)]
  • 查询Log文件具体信息
bin/kafka-dump-log.sh --files kafka-logs/my-topic-2/00000000000000000000.log --print-data-log 

在这里插入图片描述

  • 查询index文件具体信息
bin/kafka-dump-log.sh --files kafka-logs/my-topic-2/00000000000000000000.index

在这里插入图片描述

  • 查询timeindex文件
bin/kafka-dump-log.sh --files kafka-logs/my-topic-2/00000000000000000000.timeindex

在这里插入图片描述
3.0版本中这个工具加了一个参数

–cluster-metadata-decoder

用来,查看元数据日志,如下所示:

 ./bin/kafka-dump-log.sh  --cluster-metadata-decoder --skip-record-metadata --files /opt/kraft/kraft-combined-logs/__cluster_metadata-0/*.log

在这里插入图片描述

./bin/kafka-dump-log.sh  --cluster-metadata-decoder --skip-record-metadata --files /opt/kraft/kraft-combined-logs/__cluster_metadata-0/*.log
Dumping /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false isControl: true position: 0 CreateTime: 1614382631640 size: 89 magic: 2 compresscodec: NONE crc: 1438115474 isvalid: true

baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false isControl: false position: 89 CreateTime: 1614382632329 size: 137 magic: 2 compresscodec: NONE crc: 1095855865 isvalid: true
 payload: {"type":"REGISTER_BROKER_RECORD","version":0,"data":{"brokerId":1,"incarnationId":"P3UFsWoNR-erL9PK98YLsA","brokerEpoch":0,"endPoints":[{"name":"PLAINTEXT","host":"localhost","port":9092,"securityProtocol":0}],"features":[],"rack":null}}
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false isControl: false position: 226 CreateTime: 1614382632453 size: 83 magic: 2 compresscodec: NONE crc: 455187130 isvalid: true
 payload: {"type":"UNFENCE_BROKER_RECORD","version":0,"data":{"id":1,"epoch":0}}
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false isControl: false position: 309 CreateTime: 1614382634484 size: 83 magic: 2 compresscodec: NONE crc: 4055692847 isvalid: true
 payload: {"type":"FENCE_BROKER_RECORD","version":0,"data":{"id":1,"epoch":0}}
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: true position: 392 CreateTime: 1614382671857 size: 89 magic: 2 compresscodec: NONE crc: 1318571838 isvalid: true

baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: false position: 481 CreateTime: 1614382672440 size: 137 magic: 2 compresscodec: NONE crc: 841144615 isvalid: true
 payload: {"type":"REGISTER_BROKER_RECORD","version":0,"data":{"brokerId":1,"incarnationId":"RXRJu7cnScKRZOnWQGs86g","brokerEpoch":4,"endPoints":[{"name":"PLAINTEXT","host":"localhost","port":9092,"securityProtocol":0}],"features":[],"rack":null}}
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: false position: 618 CreateTime: 1614382672544 size: 83 magic: 2 compresscodec: NONE crc: 4155905922 isvalid: true
 payload: {"type":"UNFENCE_BROKER_RECORD","version":0,"data":{"id":1,"epoch":4}}
baseOffset: 7 lastOffset: 8 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: false position: 701 CreateTime: 1614382712158 size: 159 magic: 2 compresscodec: NONE crc: 3726758683 isvalid: true
 payload: {"type":"TOPIC_RECORD","version":0,"data":{"name":"foo","topicId":"5zoAlv-xEh9xRANKXt1Lbg"}}
 payload: {"type":"PARTITION_RECORD","version":0,"data":{"partitionId":0,"topicId":"5zoAlv-xEh9xRANKXt1Lbg","replicas":[1],"isr":[1],"removingReplicas":null,"addingReplicas":null,"leader":1,"leaderEpoch":0,"partitionEpoch":0}}

2.kafka-metadata-shell.sh
​ kafka还提供了一个叫做

kafka-metadata-shell.sh

的工具,能够看到topic和partion的分布,这些信息原来是可以通过zk获取的,现在可以使用这个命令行获取。

[root@192 kraft]# ./bin/kafka-metadata-shell.sh  --snapshot /opt/kraft/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log
Loading...
Starting...
[ Kafka Metadata Shell ]>>ls /
brokers  configs  local  metadataQuorum  topicIds  topics
>>ls /topics
__consumer_offsets  test  topic-test-kraft
>>ls /topicIds
wSIr0K8RQr6FRCvVbmW8Kg  xYERkvINT1aYpmrUkRqhnA  zmpBmMGgS66uco28RoAdSQ
>>ls /brokers
1>>ls /configs
topic
>>cat /topics/test/0/data 
{"partitionId":0,
  "topicId":"xYERkvINT1aYpmrUkRqhnA",
  "replicas":[1],
  "isr":[1],
  "removingReplicas":[],
  "addingReplicas":[],
  "leader":1,
  "leaderEpoch":6,
  "partitionEpoch":6}>>exit

在这里插入图片描述

3-7 消费组管理

​ 对消费组的操作用到的是

bin/kafka-consumer-groups.sh

脚本,具体使用方法如下:

  • 列出消费组
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.8.128:9092 --list

在这里插入图片描述

  • 查看消费组详情
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.8.128:9092 --describe --group my-group

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cQzWTzDo-1651065801948)(Kafka.assets/image-20211022151331639.png)]

​ 各字段含义如下:
TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtopic名字分区id当前已消费的条数总条数未消费的条数消费id主机ip客户端id

  • 查看消费组中所有活动成员的列表
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.8.128:9092 --describe --group test --members

在这里插入图片描述

  • 查看分配给每个成员的分区
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.8.128:9092 --describe --group my-group --members --verbose

在这里插入图片描述
此外,–offsets这是默认的描述选项,与–describe选项输出相同。–state提供有用的组级信息。
在这里插入图片描述

  • 删除一个或多个消费组:使用–delete
> bin/kafka-consumer-groups.sh --bootstrap-server 192.168.8.128:9092 --delete --group my-group --group my-other-group

  Deletion of requested consumer groups('my-group', 'my-other-group') was successful.
  • 重置消费者的偏移量选项说明:--to-latest:重置偏移量为最新处--to-earliest:重置偏移量为最早处--to-offset <Long: offset>:重置偏移量为指定值
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.8.128:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest

3-8 导入导出

​ kafka connect 是一个可扩展的、可靠的在kafka和其他系统之间流传输的数据工具。简而言之就是他可以通过Connector(连接器)简单、快速的将大集合数据导入和导出kafka。可以接收整个数据库或收集来自所有的应用程序的消息到kafka的topic中,主要用来与其他中间件建立流式通道。


Kafka connect的核心组件:

  • Source:负责将外部数据写入到Kafka的topic
  • Sink:负责从topic读取数据到指定地方
  • Connectors:通过管理task来协调数据流的高级抽象
  • Converters:kafka connect和其他存储系统直接发送或者接受数据之间转换数据,converter会把bytes数据转换成kafka connect内部的格式,也可以把kafka connect内部存储格式的数据转变成bytes,converter对connector来说是解耦的,所以其他的connector都可以重用。

注意:读取后的数据的Schema是固定的,包含的列如下:
ColumnType说明keybinary消息的keyvaluebinary消息的valuetopicstring主题partitionint分区offsetlong偏移量timestamplong时间戳
Kafka connect的两种工作模式:

  • standalone:在standalone模式中,用单一进程负责执行所有连接操作,使用connect-standalone.sh脚本。
  • distributed:distributed模式具有高扩展性,以及提供自动容错机制。可以使用一个group.ip来启动很多worker进程,在有效的worker进程中它们会自动的去协调执行任务,使用connect-distributed.sh脚本启动。

Kafka connect客户端操作:
1.创建一些文本信息
在这里插入图片描述

2.开启两个连接器运行在独立模式
独立模式意味着运行一个单一的,本地的,专用的进程。使用的是bin/connect-standalone.sh这个脚本。

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

​ 执行这个脚本的时候会带三个配置文件(若执行的是connect-distributed.sh脚本,第一个参数应选择相应的connect-distributed.properties):connect-standalone.properties是Kafka Connect处理的配置,connect-standalone.properties配置文件如下:

#kafka服务地址
bootstrap.servers=192.168.8.128:9092
#把数据导入到kafka的某个topic时,topic中数据的key按照某种converter转化,默认是json格式
key.converter=org.apache.kafka.connect.json.JsonConverter
#把数据导入到kafka的某个topic时,topic中数据的value按照某种converter转化,默认是json格式
value.converter=org.apache.kafka.connect.json.JsonConverter
#指定topic中数据的key和value是否包含schema信息,消息由playload和schema组成
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#保存偏移量的路径
offset.storage.file.filename=/tmp/connect.offsets
#保存connector运行中offset到topic的频率
offset.flush.interval.ms=10000

​ connect-distributed.properties配置文件如下:

#kafka服务地址
bootstrap.servers=192.168.8.128:9092
#集群的id,要注意这个id不能和consumer group的id冲突
group.id=connect-cluster
#把数据导入到kafka的某个topic时,topic中数据的key按照某种converter转化,默认是json格式
key.converter=org.apache.kafka.connect.json.JsonConverter
#把数据导入到kafka的某个topic时,topic中数据的value按照某种converter转化,默认是json格式
value.converter=org.apache.kafka.connect.json.JsonConverter
#指定topic中数据的key和value是否包含schema信息
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#用于保存connector运行中offset的topic,当connector宕机时可以继续从某个offset开始运行
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#用于保存connector配置信息的topic(注意:此topic只能由一个partition)
config.storage.topic=connect-configs
config.storage.replication.factor=1
#用于保存connector和task状态的topic
status.storage.topic=connect-status
status.storage.replication.factor=1
#保存connector运行中offset到topic的频率
offset.flush.interval.ms=10000

​ connect-file-source.properties配置文件如下:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
# 指定读取的文件路径
file=/home/file/text.txt
# 指定将数据写入的topic
topic=connect-test

​ connect-file-sink.properties配置文件如下:

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
# 指定消息输出文件路径
file=/home/file/test.sink.txt
# 指定从该topic读取数据
topics=connect-test

​ Kafka connect核心组件有source和sink。source负责将外部数据写入到Kafka的topic中,sink负责从Kafka的topic读取数据并写入到指定地方。

​ 执行运行的shell命令,一旦进程开始,导入连接器读取

text.txt

文件内容写入到connect-test主题,导出连接器从主题connect-test读取消息写入到文件

test.sink.txt

,而且可以看到connect-test的topic已经创建了。
在这里插入图片描述
在这里插入图片描述
如果在text.txt追加内容,输出文件test.sink.txt也会从kafka的主题中消费消息。

标签: kafka java 后端

本文转载自: https://blog.csdn.net/weixin_43816711/article/details/124460960
版权归原作者 98seven 所有, 如有侵权,请联系我们删除。

“Kafka详解(中)——Kafka客户端操作”的评论:

还没有评论