0


Flume采集数据到Kafka操作详解

一、创建一个Kafka主题

kafka-topics.sh --create --zookeeper lxm147:2181 --topic events --partitions 1 --replication-factor 1

二、配置Flume

vim /opt/soft/flume190/conf/events/events.conf

events.sources=eventsSource
events.channels=eventsChannel
events.sinks=eventsSink

events.sources.eventsSource.type=spooldir
events.sources.eventsSource.spoolDir=/opt/flumelogfile/events
events.sources.eventsSource.deserializer=LINE
events.sources.eventsSource.deserializer.maxLineLength=320000
events.sources.eventsSource.includePattern=events_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
events.sources.eventsSource.interceptors=head_filter
events.sources.eventsSource.interceptors.head_filter.type=regex_filter
events.sources.eventsSource.interceptors.head_filter.regex=^event_id*
events.sources.eventsSource.interceptors.head_filter.excludeEvents=true

events.channels.eventsChannel.type=file
events.channels.eventsChannel.checkpointDir=/opt/flumelogfile/checkpoint/events

events.channels.eventsChannel.dataDirs=/opt/flumelogfile/data/events

events.sinks.eventsSink.type=org.apache.flume.sink.kafka.KafkaSink
events.sinks.eventsSink.batchSize=640
events.sinks.eventsSink.brokerList=LINE

events.sinks.eventsSink.brokerList=lxm147:9092
events.sinks.eventsSink.topic=events

events.sources.eventsSource.channels=eventsChannel
events.sinks.eventsSink.channel=eventsChannel

三、开启Flume

[root@lxm147 flume190]# ./bin/flume-ng agent --name events --conf ./conf/ --conf-file ./conf/events/events.conf  -Dflume.root.logger=INFO,console

四、开启Kafka消费者

kafka-console-consumer.sh --bootstrap-server lxm147:9092 --topic events

五、复制文件到Flume监控的source目录下

[root@lxm147 eventdata]# cp ./events.csv /opt/flumelogfile/events/events_2023-04-01.csv
[root@lxm147 eventdata]# wc -l events.csv # 查看events.csv文件的行数
3137973 events.csv

六、查看Flume是否能够成功采集

2023-04-21 12:15:34,040 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:384)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2023-04-21 12:15:34,041 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:497)] Preparing to move file /opt/flumelogfile/events/events_2023-04-01.csv to /opt/flumelogfile/events/events_2023-04-01.csv.COMPLETED
2023-04-21 12:16:03,096 (Log-BackgroundWorker-eventsChannel) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:230)] Start checkpoint for /opt/flumelogfile/checkpoint/events/checkpoint, elements to sync = 32673
2023-04-21 12:16:03,102 (Log-BackgroundWorker-eventsChannel) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:255)] Updating checkpoint metadata: logWriteOrderID: 1682056665561, queueSize: 0, queueHead: 137243
2023-04-21 12:16:03,110 (Log-BackgroundWorker-eventsChannel) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:1065)] Updated checkpoint for file: /opt/flumelogfile/data/events/log-1 position: 1447136656 logWriteOrderID: 1682056665561

七、采集后查看Kafka消费者主题

[root@lxm147 config]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list lxm147:9092 --topic events
events:0:3137972

采集成功!

八、采集数据错误解决办法

例如原数据是2000行,采集到kafka某个topic下只有1500行,如果重新云心flume会导致数据采集重复,可以删除topic重新进行采集。

1.Ctrl+C关闭flume

2.删除出错的topic并重新创建

kafka-topics.sh --delete --zookeeper lxm147:2181 --topic events

kafka-topics.sh --create --zookeeper lxm147:2181 --topic events --partitions 1 --replication-factor 1

3.删除对应Flume文件中指定目录下的内容

# 删除flume监控的目录下传输过的文件
rm -rf /opt/flumelogfile/uf/*

# 删除检查点监控的目录下的文件
rm -rf /opt/flumelogfile/checkpoint/uf/*

# 删除Flume对应的日志文件
rm -rf /opt/flumelogfile/data/uf/*

4. 重新开启Flume

[root@lxm147 flume190]# ./bin/flume-ng agent --name events --conf ./conf/ --conf-file ./conf/events/events.conf -Dflume.root.logger=INFO,console

5.重新复制文件到Flume监控的目录下

[root@lxm147 eventdata]# cp ./events.csv /opt/flumelogfile/events/events_2023-04-01.csv

6.采集完成后查看kafka-events的行数

[root@lxm147 config]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list lxm147:9092 --topic events
events:0:3137972

完成!!!

标签: kafka flume 分布式

本文转载自: https://blog.csdn.net/Helen_1997_1997/article/details/130287591
版权归原作者 雷神乐乐 所有, 如有侵权,请联系我们删除。

“Flume采集数据到Kafka操作详解”的评论:

还没有评论