0


Flume采集Kafka并把数据sink到OSS

安装环境

  1. Java环境, 略 (Flume依赖Java)
  2. Flume下载, 略
  3. Scala环境, 略 (Kafka依赖Scala)
  4. Kafak下载, 略
  5. Hadoop下载, 略 (不需要启动, 写OSS依赖)

配置Hadoop

下载JindoSDK(连接OSS依赖), 下载地址Github
解压后配置环境变量

  1. export JINDOSDK_HOME=/usr/lib/jindosdk-x.x.x
  2. export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:${JINDOSDK_HOME}/lib/*

修改Hadoop配置,

  1. core-site.xml
  1. <property>
  2. <name>fs.oss.credentials.provider</name>
  3. <value>com.aliyun.jindodata.oss.auth.SimpleCredentialsProvider</value>
  4. </property>
  5. <property>
  6. <name>fs.oss.accessKeyId</name>
  7. <value>xxxx</value>
  8. </property>
  9. <property>
  10. <name>fs.oss.accessKeySecret</name>
  11. <value>xxxx</value>
  12. </property>
  13. <property>
  14. <name>fs.oss.endpoint</name>
  15. <value>xxxxx</value>
  16. </property>
  17. <property>
  18. <name>fs.AbstractFileSystem.oss.impl</name>
  19. <value>com.aliyun.jindodata.oss.JindoOSS</value>
  20. </property>
  21. <property>
  22. <name>fs.oss.impl</name>
  23. <value>com.aliyun.jindodata.oss.JindoOssFileSystem</value>
  24. </property>

配置可参考非EMR集群接入OSS-HDFS服务快速入门

配置Flume

此部分全文最关键, 请仔细看

  1. 基础配置部分, Flume配置
  1. a1.sources = source1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
  5. a1.sources.source1.channels = c1
  6. a1.sources.source1.kafka.bootstrap.servers = xxx
  7. a1.sources.source1.kafka.topics = test
  8. a1.sources.source1.kafka.consumer.group.id = flume-sink-group # 消费者组, 云组件需要先在管理后台创建
  9. a1.sources.source1.kafka.consumer.auto.offset.reset = earliest # 从头消费Kafka里数据
  10. a1.sinks.k1.channel = c1
  11. a1.sinks.k1.type = hdfs
  12. a1.sinks.k1.hdfs.path = oss://xxx/test/%Y%m%d # 自动按天分文件夹
  13. a1.sinks.k1.hdfs.fileType=DataStream
  14. a1.channels.c1.type = memory
  15. a1.channels.c1.capacity = 1000
  16. a1.channels.c1.transactionCapacity = 1000

可参考使用Flume同步EMR Kafka集群的数据至OSS-HDFS服务
2. 进阶配置, 根据自己情况按需配置

  1. a1.sinks.k1.hdfs.rollInterval = 600 # 5分钟切换一个新文件
  2. a1.sinks.k1.hdfs.rollSize = 134217728 # 或者文件大小达到128M则切换新文件
  3. a1.sinks.k1.hdfs.rollCount = 0 # 写入多少条数据切换新文件, 0为不限制

我这里是为了防止sink的文件过于零碎, 但因为使用的memory channel, 缓存时间过长容易丢数据
3. Flume JVM参数
默认启动时-Xmx20m, 过于小了, 加大堆内存可以直接放开

  1. flume-env.sh

  1. JAVA_OPTS

的注释

  1. export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
  1. Channel问题 如果对数据一致性要求较高, 可以把memory channel改用file channel, 请自行研究

XX启动!

几条测试命令

  1. bin/zookeeper-server-start.sh config/zookeeper.properties # 启动zookeeper
  2. bin/kafka-server-start.sh config/server.properties # 启动kafak服务
  3. bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 # 启动flume
  4. bin/kafka-console-producer.sh --topic flume-test --bootstrap-server localhost:9092 # 启动一个生产者写测试数据
标签: flume kafka 大数据

本文转载自: https://blog.csdn.net/u012355401/article/details/134606088
版权归原作者 不住在隔壁的老王 所有, 如有侵权,请联系我们删除。

“Flume采集Kafka并把数据sink到OSS”的评论:

还没有评论