文章目录
数据采集框架Flume
- 在一个完整的离线大数据处理系统中,除了hdfs+mapreduce+hive组成分析系统的核心之外,还需要数据采集、结果数据导出、任务调度等不可或缺的辅助系统,而这些辅助工具在hadoop生态体系中都有便捷的开源框架。
Flume基本介绍
概述
- Flume是一个分布式、可靠、高可用的海量日志采集、聚合和传输的系统。
- Flume可以采集文件,socket数据包、文件、文件夹、kafka等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中
- 一般的采集需求,通过对flume的简单配置即可实现
- Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景
运行机制
- Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成的
- 每一个agent相当于一个数据传递员,内部有三个组件:- Source:采集组件,用于跟数据源对接,以获取数据- Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据- Channel:传输通道组件,用于从 source 将数据传递到sink
Flume采集系统结构图
1. 简单结构
- 单个 Agent 采集数据
2. 复杂结构
- 两个 Agent 之间串联
- 多个 Agent 之间串联
- 多级 channel
Flume实战案例
- 需求:收集网络端口的数据,并将数据打印到linux的控制台上面。 - 在 node02 上,通过
telnet node03 44444
发送数据- 在 node03 上,通过 flume 接收 44444 端口的数据并打印到控制台
采集网络端口数据
1. Flume的安装部署
- Flume的安装非常简单,只需要解压即可。
- 上传安装包到数据源所在节点上,这里我们在第三台机器 node03 来进行安装
$ pwd
/bigdata/soft
$ tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /bigdata/install/
# 修改配置文件
$ cd /bigdata/install/apache-flume-1.9.0-bin/conf/
$ cp flume-env.sh.template flume-env.sh
$ vim flume-env.sh
exportJAVA_HOME=/usr/apps/jdk1.8.0_241
- 解决 jar 包冲突:apache-flume-1.9.0-bin、hadoop-3.1.4都有guava包,但是版本不一致,会造成冲突
- 将 flume 中低版本的 guava 包替换成 hadoop 中高版本的 guava 包
$ pwd
/bigdata/install/apache-flume-1.9.0-bin
$ rm -rf lib/guava-11.0.2.jar
$ cp /bigdata/install/hadoop-3.1.4/share/hadoop/common/lib/guava-27.0-jre.jar lib/
2. 开发配置文件
- 根据数据采集的需求配置采集方案,描述在配置文件中(文件名可任意自定义)
- 需求:配置我们的网络收集的配置文件,从某socket端口采集数据,采集到的数据打印到console控制台
- 在flume的conf目录下新建一个配置文件(采集方案)
$ pwd
/bigdata/install/apache-flume-1.9.0-bin
$ vim conf/netcat-logger.conf
- 内容如下:
# 1. 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2. 描述和配置source组件:r1
a1.sources.r1.type = netcat
# 当前节点的ip地址
a1.sources.r1.bind = node03
a1.sources.r1.port =44444# 3. 描述和配置sink组件:k1
a1.sinks.k1.type = logger
# 4. 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
# channel中存储的event的最大个数
a1.channels.c1.capacity =1000# channel每次从source获得的event最多个数或一次发往sink的event最多个数
a1.channels.c1.transactionCapacity =100# 5. 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 对应类型组件的官网文档 - netcat-tcp-source- logger-sink- memory-channel
3. 启动
- 指定采集方案配置文件,在相应的节点上启动flume agent
- 先用一个最简单的例子来测试一下程序环境是否正常
$ pwd
/bigdata/install/apache-flume-1.9.0-bin
$ bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
-c conf 指定flume自身的conf目录中的配置文件
-f conf/netcat-logger.con 指定我们所描述的采集方案
-n a1 指定我们这个agent的名字
-Dflume.root.logger=INFO,console 将info级别的日志打印到控制台
4. 使用 telnet 测试
- 在node02机器上面安装telnet客户端,用于模拟数据的发送
sudo yum -y install telnet
telnet node03 44444# 使用telnet模拟数据发送
采集目录到HDFS
1. 需求分析
- 需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去。
- 根据需求,首先定义三大要素 - 数据源组件:即 source,使用 spooldir 监控文件目录- 下沉组件:即 sink,使用 HDFS 文件系统- 通道组件:即 channel,使用 file channel,也可以使用内存 channel
- spooldir特性: - 监视一个目录,只要目录中出现新文件,就会采集文件中的内容- 采集完成的文件,会被agent自动添加一个后缀:COMPLETED- 此source可靠,不会丢失数据,即使 flume 重启或被 kill- 注意:所监视的目录中不允许有同名的文件;且文件被放入spooldir后,就不能修改 - ① 如果文件放入spooldir后,又向文件写入数据,会打印错误及停止- ② 如果有同名的文件出现在spooldir,也会打印错误及停止
2. 开发配置文件
vim conf/spooldir.conf
,内容如下:
# 1. Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2. Describe/configure the source# 注意:不能往监控目中重复丢同名文件
a1.sources.r1.type = spooldir
# 监控的路径
a1.sources.r1.spoolDir = /bigdata/install/flumedatas
# Whether to add a header storing the absolute path filename# 文件绝对路径放到header
a1.sources.r1.fileHeader =true# 3. Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
# 采集到的数据写入到此路径
a1.sinks.k1.hdfs.path = hdfs://node01:8020/spooldir/files/%y-%m-%d/%H%M/
# 指定在hdfs上生成的文件名前缀
a1.sinks.k1.hdfs.filePrefix = events-
# timestamp向下取整round down
a1.sinks.k1.hdfs.round =true# 按10分钟,为单位向下取整;如55分,舍成50;38 -> 30
a1.sinks.k1.hdfs.roundValue =10# round的单位
a1.sinks.k1.hdfs.roundUnit = minute
# 每3秒滚动生成一个文件;默认30;(0 = never roll based on time interval)
a1.sinks.k1.hdfs.rollInterval =3# 每x字节,滚动生成一个文件;默认1024;(0: never roll based on file size)
a1.sinks.k1.hdfs.rollSize =20# 每x个event,滚动生成一个文件;默认10; (0 = never roll based on number of events)
a1.sinks.k1.hdfs.rollCount =5# 每x个event,flush到hdfs
a1.sinks.k1.hdfs.batchSize =1# 使用本地时间
a1.sinks.k1.hdfs.useLocalTimeStamp =true# 生成的文件类型,默认是Sequencefile;可选DataStream,则为普通文本;可选CompressedStream压缩数据
a1.sinks.k1.hdfs.fileType = DataStream
# 4. Use a channel which buffers events in memory
a1.channels.c1.type = memory
# channel中存储的event的最大数目
a1.channels.c1.capacity =1000# 每次传输数据,从source最多获得event的数目或向sink发送的event的最大的数目
a1.channels.c1.transactionCapacity =100# 5. Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 对应类型组件的官网文档- spooling directory source- hdfs sink- memory channel
- Channel参数解释:- capacity:默认该通道中最大的可以存储的 event 数量- trasactionCapacity:每次最大可以从 source 中拿到或者送到 sink 中的 event 数量- keep-alive:event 添加到通道中或者移出的允许时间
3. 启动&测试
bin/flume-ng agent -c conf -f conf/spooldir.conf -n a1 -Dflume.root.logger=INFO,console
- 将不同的文件上传到下面目录里面去,注意文件不能重名
cd /bigdata/install/flumedatas
# vim a.txt 加入如下内容
ab cd ef
english math
hadoop alibaba
- 然后观察flume的console动静、hdfs webui生成的文件
- 如果将同名文件再次放到
/bigdata/install/flumedatas
就会报错:
cp a.txt /bigdata/install/flumedatas
采集文件到HDFS
1. 需求分析
- 比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs
- 根据需求,首先定义三大要素 - 数据源组件:即 source,使用
exec 'tail -F file'
监控文件内容更新- 下沉组件:即 sink,使用 HDFS 文件系统- 通道组件:即 channel,使用 file channel,也可以使用内存 channel
2. 开发配置文件
vim conf/tail-file.conf
,内容如下:
# 1. Name the components on this agent
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# 2. Describe/configure tail -F source1
agent1.sources.source1.type =exec
agent1.sources.source1.command =tail -F /bigdata/install/flumedatas/taillogs/access_log
agent1.sources.source1.channels = channel1
# 3. Describe sink
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://node01:8020/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
# 允许打开的文件数;如果超出5000,老文件会被关闭
agent1.sinks.sink1.hdfs.maxOpenFiles =5000
agent1.sinks.sink1.hdfs.batchSize=100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize =102400
agent1.sinks.sink1.hdfs.rollCount =1000000
agent1.sinks.sink1.hdfs.rollInterval =60
agent1.sinks.sink1.hdfs.round =true
agent1.sinks.sink1.hdfs.roundValue =10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp =true# 4. Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
# 向channel添加一个event或从channel移除一个event的超时时间
agent1.channels.channel1.keep-alive =120
agent1.channels.channel1.capacity =500000
agent1.channels.channel1.transactionCapacity =600# 5. Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
- 对应类型组件的官网文档 - hdfs sink- memory channel
3. 启动&测试
bin/flume-ng agent -c conf -f conf/tail-file.conf -n agent1 -Dflume.root.logger=INFO,console
- 写一个脚本,定义追加一些内容到指定文件:
vim tail-file.sh
#!/bin/bashwhiletruedodate>> /bigdata/install/flumedatas/taillogs/access_log;sleep0.5;done
- 启动脚本
chmod u+x tail-file.sh
./tail-file.sh
实现断点续传
- 不管是上面的 spoolDir 还是 exec Source dir 都有一个缺陷就是没法实现断点续传的功能,为此在flume1.7当中特地新增加一个 source 叫做 tail-dir source,专门用于解决断点续传的问题,tail-dir source可以监控文件或者文件夹,允许我们使用正则表达式的方式来对我们的文件或者文件夹进行监听。
1. 需求分析
- 采集需求,使用tail-dir source监听某个目录下的多个文件,并且实现文件的断点续传功能
2. 开发配置文件
vim conf/tail-dir.conf
,内容如下:
# 1. Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2. Describe/configure the source
a1.sources.r1.type = TAILDIR
# 以json格式,记录读取的每个文件及读取的position
a1.sources.r1.positionFile = /bigdata/install/apache-flume-1.9.0-bin/taildir_position.json
# 每个filegroup代表一系列待tail的文件
a1.sources.r1.filegroups = f1
# 指定filegroup的绝对路径
a1.sources.r1.filegroups.f1 = /bigdata/install/flumedatas/dirfile/.*log.*
# 此项用于控制从一个文件连续读取数据的批次;比如有A、B、C多个文件,如果向A文件写入的频率非常高,导致一直循环的从A中采集获取数据,而B、C的数据不被处理;可将此值调低;每个批次由属性batchSize控制,默认500行
a1.sources.r1.maxBatchCount =1000# 3. Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://node01:8020/taildir/files/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round =true
a1.sinks.k1.hdfs.roundValue =10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval =3
a1.sinks.k1.hdfs.rollSize =5000
a1.sinks.k1.hdfs.rollCount =50000# 每x个event flush到hdfs
a1.sinks.k1.hdfs.batchSize =1000
a1.sinks.k1.hdfs.useLocalTimeStamp =true#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# 4. Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity =1000
a1.channels.c1.transactionCapacity =100# 5.Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 对应类型组件的官网文档 - tail dir
3. 启动&测试
bin/flume-ng agent -c conf -f conf/tail-dir.conf -n a1 -Dflume.root.logger=INFO,console
- node03 执行以下命令创建文件到指定文件夹下
echo"Hello World">> /bigdata/install/flumedatas/dirfile/file.log
echo"How are you">> /bigdata/install/flumedatas/dirfile/file1.log
echo"How old are you">> /bigdata/install/flumedatas/dirfile/file2.log
- 可观察下
taildir_position.json
文件中记录的内容
$ cat /bigdata/install/apache-flume-1.9.0-bin/taildir_position.json
[{"inode":50817895,"pos":12,"file":"/bigdata/install/flumedatas/dirfile/file.log"},{"inode":50817897,"pos":12,"file":"/bigdata/install/flumedatas/dirfile/file1.log"},{"inode":50817898,"pos":16,"file":"/bigdata/install/flumedatas/dirfile/file2.log"}]
两个 agent 级联
1. 需求分析
- 第一个agent负责收集文件当中的数据,通过网络发送到第二个agent当中去;
- 第二个agent负责接收第一个agent发送的数据,并将数据保存到hdfs上面去。
2. node02安装flume
- 将node03机器上面解压后的flume文件夹拷贝到node02机器上面去
scp -r apache-flume-1.9.0-bin/ node02:$PWD
3. 开发配置文件
- node02 上开发 flume 配置文件:
vim conf/tail-avro-logger.conf
,内容如下
# 1. Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2. Describe/configure the source
a1.sources.r1.type =exec
a1.sources.r1.command =tail -F /bigdata/install/flumedatas/taillogs/access_log
a1.sources.r1.channels = c1
# 3. Describe the sink# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = node03
a1.sinks.k1.port =4141# 每一批次发送的event的数目
a1.sinks.k1.batch-size =10# 4. Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity =1000
a1.channels.c1.transactionCapacity =100# 5. Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 对应类型组件的官网文档 - avro sink
- node02 上开发 flume 配置文件:
vim conf/avro-hdfs.conf
,内容如下
# 1. Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2. Describe/configure the source# source中的avro组件是一个接收者服务
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = node03
a1.sources.r1.port =4141# 3. Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node01:8020/avro/hdfs/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round =true
a1.sinks.k1.hdfs.roundValue =10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval =3
a1.sinks.k1.hdfs.rollSize =200
a1.sinks.k1.hdfs.rollCount =5
a1.sinks.k1.hdfs.batchSize =1
a1.sinks.k1.hdfs.useLocalTimeStamp =true# 生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# 4. Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity =1000
a1.channels.c1.transactionCapacity =100# 5. Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 对应类型组件的官网文档 - avro source
4. 启动&测试
- 启动顺序:先启动下游agent,目前数据是从 node02 上的 agent 发往 node03 上的 agent,所以先启动 node03 的 agent
# node03
bin/flume-ng agent -c conf -f conf/avro-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
# node02
bin/flume-ng agent -c conf -f conf/tail-avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
- 将node03下面的脚本和数据拷贝到node02即可,node03机器上执行以下命令
# node03
$ pwd
/bigdata/install/flumedatas/taillog
$ scp -r tail-file.sh node02:$PWD# node02
./tail-file.sh
- 查看HDFS目录
/avro/hdfs
更多source和sink组件
- Flume支持众多的source和sink类型,详细手册可参考官方文档
Flume进阶
高可用Flume-NG配置案例
- 高可用的Flume NG集群,架构图如下:
1. 角色分配
名称HOST角色Agent1node01Web ServerCollector1node02AgentMstr1Collector2node03AgentMstr2
- 图中所示,Agent1数据分别流入到Collector1和Collector2,Flume NG本身提供了Failover机制,可以自动切换和恢复。在上图中,有3个产生日志服务器分布在不同的机房,要把所有的日志都收集到一个集群中存储。
2. 集群搭建
- 前面我们已经在 node02、node03 安装好了 flume,接下来将在 node01 上也安装上 flume
# node01mkdir -p /bigdata/install/flumedatas/taillogs
# node03cd /bigdata/install
scp -r apache-flume-1.9.0-bin/ node01:$PWD
$ pwd
/bigdata/install/flumedatas/taillogs
$ scp -r tail-file.sh node01:$PWD
- 开发 node01 机器的 agent 配置文件:
vim conf agent.conf
,内容如下
# agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
## set gruop
agent1.sinkgroups = g1
## set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity =1000
agent1.channels.c1.transactionCapacity =100# 配置source
agent1.sources.r1.channels = c1
agent1.sources.r1.type =exec
agent1.sources.r1.command =tail -F /bigdata/install/flumedatas/taillogs/access_log
# interceptor 拦截器;与source结合,对event进行修改或丢弃
agent1.sources.r1.interceptors = i1 i2
# 静态拦截器在所有的event的header中,增加一个kv对,key是下边属性key对应的值,value是属性value对应的值
agent1.sources.r1.interceptors.i1.type = static
# 被创建的header的名字
agent1.sources.r1.interceptors.i1.key = Type
# 静态的值;key与value对应
agent1.sources.r1.interceptors.i1.value = LOGIN
# timestamp拦截器对event的header中增加kv对,key是timestamp,value是对应的时间戳的值
agent1.sources.r1.interceptors.i2.type = timestamp
## set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port =52020## set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port =52020## set sink group
agent1.sinkgroups.g1.sinks = k1 k2
## sink processor处理器;可用于sink的负载均衡或故障转移
agent1.sinkgroups.g1.processor.type = failover
# priority值高的sink,拥有较高的权限;并且必须是唯一不重复的
agent1.sinkgroups.g1.processor.priority.k1 =10
agent1.sinkgroups.g1.processor.priority.k2 =1# maxpenalty 对于故障的节点最大的黑名单时间 (in millis 毫秒)
agent1.sinkgroups.g1.processor.maxpenalty =10000
- 对应类型组件的官网文档 - static interceptor静态拦截器- timestamp拦截器- sink processor处理器
3. 配置flume collection
- node02、node03 机器修改配置文件
vim conf/collector.conf
,内容相同
# set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
## set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity =1000
a1.channels.c1.transactionCapacity =100## set source
a1.sources.r1.type = avro
a1.sources.r1.bind =0.0.0.0
a1.sources.r1.port =52020
a1.sources.r1.channels = c1
# 拦截器
a1.sources.r1.interceptors = i1
#a1.sources.r1.interceptors.i1.type = static#a1.sources.r1.interceptors.i1.key = Collector#a1.sources.r1.interceptors.i1.value = node02# 在header中添加的kv对的key默认是host
a1.sources.r1.interceptors.i1.type =host
a1.sources.r1.interceptors.i1.hostHeader=hostname
## set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://node01:8020/flume/failover/%{hostname}
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
4. 启动&测试
- 顺序启动
# node03
bin/flume-ng agent -n a1 -c conf -f conf/collector.conf -Dflume.root.logger=DEBUG,console
# node02
bin/flume-ng agent -n a1 -c conf -f conf/collector.conf -Dflume.root.logger=DEBUG,console
# node01
bin/flume-ng agent -n agent1 -c conf -f conf/agent.conf -Dflume.root.logger=DEBUG,console
- node01 启动文件产生脚本
$ pwd
/bigdata/install/flumedatas/taillogs
$ ./tail-file.sh
- 然后去hdfs查看生成文件
- 将node02的agent停掉,自动切换到node03上的agent
- 再将node02的agent启动,由于node02的优先级高,自动切换回node02上的agent
- 下面我们来测试下Flume NG集群的高可用(故障转移)。场景如下: - 我们在Agent1节点上传文件,由于我们配置Collector1的权重比Collector2大,所以 Collector1优先采集并上传到存储系统。然后我们kill掉Collector1,此时有Collector2负责日志的采集上传工作,之后,我 们手动恢复Collector1节点的Flume服务,再次在Agent1上传文件,发现Collector1恢复优先级别的采集工作。
版权归原作者 yangwei_sir 所有, 如有侵权,请联系我们删除。