一、创建一个Kafka主题
kafka-topics.sh --create --zookeeper lxm147:2181 --topic events --partitions 1 --replication-factor 1
二、配置Flume
vim /opt/soft/flume190/conf/events/events.conf
events.sources=eventsSource
events.channels=eventsChannel
events.sinks=eventsSink
events.sources.eventsSource.type=spooldir
events.sources.eventsSource.spoolDir=/opt/flumelogfile/events
events.sources.eventsSource.deserializer=LINE
events.sources.eventsSource.deserializer.maxLineLength=320000
events.sources.eventsSource.includePattern=events_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
events.sources.eventsSource.interceptors=head_filter
events.sources.eventsSource.interceptors.head_filter.type=regex_filter
events.sources.eventsSource.interceptors.head_filter.regex=^event_id*
events.sources.eventsSource.interceptors.head_filter.excludeEvents=true
events.channels.eventsChannel.type=file
events.channels.eventsChannel.checkpointDir=/opt/flumelogfile/checkpoint/events
events.channels.eventsChannel.dataDirs=/opt/flumelogfile/data/events
events.sinks.eventsSink.type=org.apache.flume.sink.kafka.KafkaSink
events.sinks.eventsSink.batchSize=640
events.sinks.eventsSink.brokerList=LINE
events.sinks.eventsSink.brokerList=lxm147:9092
events.sinks.eventsSink.topic=events
events.sources.eventsSource.channels=eventsChannel
events.sinks.eventsSink.channel=eventsChannel
三、开启Flume
[root@lxm147 flume190]# ./bin/flume-ng agent --name events --conf ./conf/ --conf-file ./conf/events/events.conf -Dflume.root.logger=INFO,console
四、开启Kafka消费者
kafka-console-consumer.sh --bootstrap-server lxm147:9092 --topic events
五、复制文件到Flume监控的source目录下
[root@lxm147 eventdata]# cp ./events.csv /opt/flumelogfile/events/events_2023-04-01.csv
[root@lxm147 eventdata]# wc -l events.csv # 查看events.csv文件的行数
3137973 events.csv
六、查看Flume是否能够成功采集
2023-04-21 12:15:34,040 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:384)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2023-04-21 12:15:34,041 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:497)] Preparing to move file /opt/flumelogfile/events/events_2023-04-01.csv to /opt/flumelogfile/events/events_2023-04-01.csv.COMPLETED
2023-04-21 12:16:03,096 (Log-BackgroundWorker-eventsChannel) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:230)] Start checkpoint for /opt/flumelogfile/checkpoint/events/checkpoint, elements to sync = 32673
2023-04-21 12:16:03,102 (Log-BackgroundWorker-eventsChannel) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:255)] Updating checkpoint metadata: logWriteOrderID: 1682056665561, queueSize: 0, queueHead: 137243
2023-04-21 12:16:03,110 (Log-BackgroundWorker-eventsChannel) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:1065)] Updated checkpoint for file: /opt/flumelogfile/data/events/log-1 position: 1447136656 logWriteOrderID: 1682056665561
七、采集后查看Kafka消费者主题
[root@lxm147 config]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list lxm147:9092 --topic events
events:0:3137972
采集成功!
八、采集数据错误解决办法
例如原数据是2000行,采集到kafka某个topic下只有1500行,如果重新云心flume会导致数据采集重复,可以删除topic重新进行采集。
1.Ctrl+C关闭flume
2.删除出错的topic并重新创建
kafka-topics.sh --delete --zookeeper lxm147:2181 --topic events
kafka-topics.sh --create --zookeeper lxm147:2181 --topic events --partitions 1 --replication-factor 1
3.删除对应Flume文件中指定目录下的内容
# 删除flume监控的目录下传输过的文件
rm -rf /opt/flumelogfile/uf/*
# 删除检查点监控的目录下的文件
rm -rf /opt/flumelogfile/checkpoint/uf/*
# 删除Flume对应的日志文件
rm -rf /opt/flumelogfile/data/uf/*
4. 重新开启Flume
[root@lxm147 flume190]# ./bin/flume-ng agent --name events --conf ./conf/ --conf-file ./conf/events/events.conf -Dflume.root.logger=INFO,console
5.重新复制文件到Flume监控的目录下
[root@lxm147 eventdata]# cp ./events.csv /opt/flumelogfile/events/events_2023-04-01.csv
6.采集完成后查看kafka-events的行数
[root@lxm147 config]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list lxm147:9092 --topic events
events:0:3137972
完成!!!
本文转载自: https://blog.csdn.net/Helen_1997_1997/article/details/130287591
版权归原作者 雷神乐乐 所有, 如有侵权,请联系我们删除。
版权归原作者 雷神乐乐 所有, 如有侵权,请联系我们删除。