0


大数据技术之Hadoop(八)——Flume日志采集系统

http://链接: https://pan.baidu.com/s/19cSqan67QhB_x3vdnXsANQ?pwd=fh35 提取码: fh35http://xn--gzu811i//pan.baidu.com/s/19cSqan67QhB_x3vdnXsANQ?pwd=fh35%20%E6%8F%90%E5%8F%96%E7%A0%81:%20fh35

一、Flume的概述

1、Flume的认识

    Flume原是Cloudera公司提供的一个高可用的、高可靠的、分布式海量日志采集、聚合和传输系统,而后纳人到了Apache旗下,作为一个顶级开源项目。Apache Flume不仅只限日志数据的采集,由于Flume 采集的数据源是可定制的,因此Flume 还可用于传输大量事件数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息以及几乎任可能的数据源。

    当前 Flume 分为两个版本:Flume 0.9x版本,统称Flume-og(original generation)和 Flume 1.x版本,统称Flumeng(nextgeneration)。由于早期的Flume-og存在设计不合理、代码臃肿、不易扩展等问题,因此在Flume纳入到Apache旗下后,开发人员对Clouden Flume 的代码进行了重构,同时对 Flume 功能进行了补充和加强,并重命名为 Apache Flume于是就出现了 Flume-ng 与Flume-og两种截然不同的版本。而在实际开发中,多数使用目前比较流行的Flumeng 版本进行 Flume 开发。

2、Flume的运行机制

    Flume的核心是把数据从数据源(Web Server)通过数据采集器(Source)收集过来,再将收集的数据通过缓冲通道(Channel)汇集到指定的接收器(Sink)。可参考下面官方文档图片。

    Flume 基本架构中有一个 Agent(代理),它是Flume 的核心角色,meAgent是一个JVM进程,它承载着数据从外部源流向下一个目标的3个核心组件Source、Channel、Sink。

(1)Source(数据采集器)

    用于源数据的采集,从一个Web服务器采集源数据,然后将采集到的数据写入到 Channel 中并流向 Sink;

(2)Channel(缓冲通道)

    底层是一个缓冲队列,对Source中的数据进行缓存,将数据高效,准确地写人 Sink,待数据全部到达 Sink后,Flume 就会删除该缓存通道中的数据;

(3)Sink(接收器)

    接收并汇集流向 Sink 所有数据,根据需求,可以直接进行集中式存储(采用HDFS进行存储),也可以继续作为数据源传人其他远程服务器或者 Source 中。

    在整个数据传输的过程中,Flume将流动的数据封装到一个event(事件)中,它是 Flume 内部数据传输的基本单元。一个完整的event 包含 headers和body,其中 headers 包含了一些标识信息,而body中就是Flume 收集到的数据信息。

3、Flume的日志采集系统结构

(1)简单结构

    当需要采集数据的生产源比较单一、简单时,可以直接使用一个 Agent 来进行数据采集并最终存储。

(2)复杂结构

    当需要采集数据的数据源分布在不同的服务器上时,使用一个Agent 进行数据采集不再适用,这时就可以根据业务需要部署多个 Agent 进行数据采集并最终存储。也就是说,对每一个需要收集数据的 Web 服务端都搭建了一个 Agent 进行数据采集,接着再将这多个 Agent 中的数据作为下一个 Agent 的 Source 进行采集并最终集中存储到 HDFS 中。除此之外,在开发过程工作中可能遇到从同一个服务端采集数据,然后通过多路复用流分别传输并存储到不同目的地情况,根据具体需求,将一个 Agent 采集的数据通过不同的 Channel 分别流向了不同的 Sink,然后再进行下一阶段的传输或存储。

二、Flume的基本使用

1、系统要求

    作为 Apache 旗下的一个顶级项目,想要使用 Flume 进行开发,必须满足一定的系统要求,这里以官方说明为准,具体要求如下。

(1)安装Java 1.8 或更高版本 Java 运行环境(针对本次使用的 Flume 1.8版本);

(2)为Source(数据采集器)、Channel(缓冲通道)和Sink(接收器)的配置提供足够的内存空间;

(3)为Channel(缓冲通道)和 Sink(接收器)的配置提供足够的磁盘空间;

(4)保证Agent(代理)对要操作的目录有读写权限。

    上述系统要求中,Java运行环境的版本与将要安装使用的Flume版本是对应的,如果使用Flume 1.6 版本,则要求使用Java 1.6及以上运行环境,由于本章后续将以Flume 1.8.0 为准,所以要求安装 Java 1.8 及以上运行环境。

2、Flume安装

(1)下载Flume

    下载flume到 /export/software/ 目录中

https://dlcdn.apache.org/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz

(2)解压

进入目录/export/software/,执行命令
tar -xzvf apache-flume-1.8.0-bin.tar.gz -C /export/servers/

(3)重命名

进入目录/export/servers/,执行命令
mv apache-flume-1.8.0-bin flume

(4)配置Flume环境

配置 flume-env.sh
cd /export/servers/flume/conf
cp flume-env.sh.template flume-env.sh

vi flume-env.sh #编辑文件,增加如下行
export JAVA_HOME=/export/servers/jdk

配置 /etc/profile
vi /etc/profile #编辑文件,增加如下行
export FLUME_HOME=/export/servers/flume
export PATH=$PATH:$FLUME_HOME/bin

3、Flume的入门使用

(1)配置Flume采集方案

    在/export/servers/flume/conf的目录下配置netcat-logger.conf,相关代码如下
# 示例配置方案: 单节点 Flume 配置

#为agent各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 描述并配置sinks组件(采集后数据流出的类型)
a1.sinks.k1.type = logger

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source 和 sink 通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a、采集方案的名称可以自定义,但为了方便管理和使用,通常会根据数据源类型和收集的结果类型进行命名。如 netcat-logger.conf 表示采集 netcat 类型数据源并最终作为 logger 日志信息收集。

b、采集方案文件的位置可以自定义存放,在使用的时候会要求指定配置方案的具体位置,为了方便统一管理,通常会将采集方案统一存放。如本案例中,会将所有自定义的采集方案文件保存在/export/servers/flume/conf目录下。

c、采集方案中的sources、channels、sinks是在具体编写时根据业务需求进行配置的,不能随意定义。Flume支持采集的数据类型可以通过查看官网进行详细了解(地址https://flume.apache.org/FlumeUserGuide.html ),同时针对不同的 sources type、channelstype和 sinks type 需要编写不同的配置属性。

    注意:配置采集方案中,在编写Source、Sink 与Channel 关联绑定时特别容易出错,如文件netcat-logger.conf中所示的al.sources.rl.channels = c1 和 al.sinks. kl.channel = cl, sources 的 channels 比 sinks 的 channel 多了一个s。这是因为,在一个 Agent 中,同一个Source 可以有多个Channel,所以配置时使用 channels(channel 的复数形式);而同一个 Sink 只能为一个 Channel 服务,所以配置时必须使用 Channel。

(2)指定采集方案启动Flume

进入 /export/servers/flume 目录,使用指定采集方案启动FLume
flume-ng agent --conf conf/ --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
    执行上述指令后,就会使用前面编写的采集方案 netcat-logger.conf来启动Flume.该 Flume系统会根据采集方案的配置监听当前主机 localhost下44444 端口发送的neteat类型源数据,并将信息收集接收到类型为 logger 的 Sink中。

接下来,对上述指令中的各部分内容进行说明,具体如下所示。

a、flume-ng agent:表示使用flume-ng启动一个agent;

b、-confconf/:-conf选项指定了Flume自带的配置文件路径,可用-c简写格式;

c、-conf-file conf/netcat-logger.conf:-conf-file 选项指定了开发者编写的采集方案,可用-f简写格式,需要注意配置文件所在路径,建议读者使用绝对路径指定采集方案,否则将提示文件不存在的错误;

d、-nameal:表示启动的agent名称为al,该名称al 必须与采集方案中agent的名称保持一致;

e、-Dflume.root.logger=INFO,console:表示将采集处理后的信息通过logger日志的信息输出到控制台进行展示。

(3)Flume数据采集测试

如果出现”-bash: telnet: command not found",请使用以下指令安装telnet工具。
yum -y install telnet

使用telnet 连接到本地主机 localhost端口44444,用来持续发送数据信息作为Flume将要采集的源数据。
telnet localhost 44444

登录后发送信息:
hello
OK(收到信息)

world
OK(收到信息)

三、Flume采集方案配置说明

1、Flume Source

    在编写 Flume 采集方案时,首先必须明确的是采集的数据源类型、出处;接着,根据这些信息与Flume 已提供的支持的 Flume Source 进行匹配,选择对应的数据采集器类型(source. type);然后,再根据选择的数据采集器类型,匹配必要和非必要的数据采集器属性。Flume提供的并支持的 Flume Source 有很多,具体的可前往官网查看https://flume.apache.org/FlumeUserGuide.html#flume-sourceshttps://flume.apache.org/FlumeUserGuide.html#flume-sources        这里列举一些常用的 Flume Source。

(1)Avro Source

    监听 Avro 端口并从外部 Avro 客户端流中接收 event 数据,当与另外一个 Flume Agent 上的 Avro Sink 配对时,它可以创建分层集合拓扑,利用 Avro Source 可以实现多级流动、扇出流、扇入流等效果。

Avro Source常用属性(加粗部分为必须属性)
属性名称默认值说明channels——type——组件类型名必须是 avrobind——要监听的主机名或 IP 地址port——要监听的服务端口threads——要生成的工作线程的最大数目sslfalse将此设置为 true 以启用 SSL 加密,则必须指定 keystore 和 keystore-passwordkeystore——SSL 所必需的通往 Java 密钥存储路径keystore-password——SSL 所必需的 Java 密钥存储的密码

使用 Avro Source 采集器配置一个名称为 a1 的 Agent。
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=avro
a1.sources.r1.channels=c1
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=4141

(2)Spooling Directory Source

    Spooling Directory Source 允许对指定磁盘上的文件目录进行监控来提取数据,它将查看文件的指定目录的新增文件,并将文件中的数据读取出来。

Spooling Directory Source常用属性(加粗部分为必须属性)
属性名称默认值说明channels——type——组件类型名必须是spooldirspoolDir——从中读取文件的目录fileSuffix. COMPLETED附加到完全摄取的文件后缀deletePolicynever何时删除已完成的文件:never 或 immediatefileHeaderfalse是否添加存储绝对路径文件名的标头includePattern^. * $正则表达式,指定要包含的文件ignorePattern^ $正则表达式,指定要忽略的文件

使用 Spooling Directory Source 采集器配置一个名称为 a1 的 Agent。
a1.channels=ch-1
a1.sources=src-1
a1.sources.src-1.type=spooldir
a1.sources.src-1.channels=ch-1
a1.sources.src-1.spoolDir=/var/log/apache/flumeSpool
a1.sources.src-1.fileHeader=true

(3)Taildir Source

    Taildir Source 用于观察指定的文件,几乎可以实时监测到添加到每个文件的新行。如果文件正在写入新行,则此采集器将重试采集它们以等待写入完成。

Taildir Source常用属性(加粗部分为必须属性)
属性名称默认值说明channels——type——组件类型名必须是 TAILDIRfilegroups——以空格分隔的文件组列表。每个文件组都指定了要监控的一系列文件**filegroups. <filegroupName>**——文件组的绝对路径。正则表达式(而不是文件系统模式)只能用于文件名idle Timeout120000关闭非活动文件的时间(ms)。如果关闭的文件附加了新行,则此源将自动重新打开它writePosInterval3000写入位置文件上每个文件的最后位置的间隔时间(ms)batchSize100一次读取和发送到通道的最大行数。使用默认值通常效果较好backoffSleepIncrement1000当最后一次尝试未找到任何新数据时,每次重新尝试轮询新数据之间的最大时间延迟fileHeaderfalse是否添加存储绝对路径文件名的标头fileHeaderKeyfile将绝对路径文件名附加到 event header 时使用的 header 关键字

使用 Taildir Source 采集器配置一个名称为 a1 的 Agent。
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=TAILDIR
a1.sources.r1.channels=c1
a1.sources.r1.positionFile=/var/log/flume/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/var/log/test1/example.log
a1.sources.r1.headers.f1.headerKet1=value1
a1.sources.r1.filegroups.f2=/var/log/test2/. * log. *
a1.sources.r1.headers.f2.headersKey1=value2
a1.sources.r1.headers.f2.headersKey2=value2-2
a1.sources.r1.fileHeader=true

(4)HTTP Source

    HTTP Source 可以通过 HTTP POST 和 GET 请求方式接收 event 数据,GET 通常只能用于测试使用。HTTP 请求会被实现了 HTTPSourceHandler 接口的 handler(处理器)可插拔插件转成 Flume events,这个 handler 接收 HttpServletRequest,返回 Flume events列表。一个 HTTP请求处理的所有事件都在一个事务中提交给通道,从而允许在诸如file channel 之类的 channel 上提高效率。如果 handler 抛出异常,source 会返回400;如果 channel 满了或者 source 不能再向 channel 追加 event,source 会返回 503。在一个 POST 请求发送的所有的 events 都被认为是一个批次,会在一个事务中插入 channel。

HTTP Source常用配置属性(加粗部分为必须属性)
属性名称默认值说明channels——type组件类型名必须是 httpport——采集源要绑定的端口bind0.0.0.0要监听绑定的主机名或 IP 地址handlerorg.apache.flume.source.http.JSONHandlerhandler 类的全路径名handler. *——配置 handler 的参数

使用 HTTP Source 采集器配置一个名称为a1 的 Agent。
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=http
a1.sources.r1.port=5140
a1.sources.r1.channels=c1
a1.sources.r1.handler=org.example.rest.RestHandler
a1.sources.r1.handler.nickname=random props

2、Flume channel

    Channels 通道是event 在Agent 上暂存的存储库,Source 向 Channel 中添加event,Sink 在读取完数据后再删除它。在配置Channels时,需要明确的是将要传输的 sources 数据源类型;接着,根据这些信息并结合开发中的实际需求,选择Flume 已提供支持的 Flume Channels;然后,再根据选择的Channel类型,配置必要和非必要的Channel属性。

    这是官方文档

https://flume.apache.org/FlumeUserGuide.html#flume-channelshttps://flume.apache.org/FlumeUserGuide.html#flume-channels

(1)Memory Channel

    Memory Channel 会将 event 存储在具有可配置最大尺寸的内存队列中,它非常适用于需要更高吞吐量的流量,但是在 Agent发生故障时会丢失部分阶段数据。

Memory Channel常用配置属性(加粗部分为必须属性)
属性名称默认值说明type——组件类型名必须是memorycapacity100存储在 Channel 中的最大 even数transactionCapacity100Channel 将从 Source 接收或向 Sink传递的每一个事务中的最大 event数keep-alive3
添加或删除 event 的超时时间(s)
byteCapacityBufferPercentage20
定义 byteCapacity与Channel中所有event的估计总大小之间的缓冲区百分比,以计算 header中的数据
byteCapacity(见说明)
允许此Channel 中所有event的最大内存字节数总和。该统计仅计算Eventbody,这也是提供byteCapacityBufferPercentage 配置参数的原因。默认计算值,等于JVM可用的最大内存的 80%(即命令行传递的-Xmx 值的 80%)

使用Memory Channel 通道配置一个名称为 a1 的 Agent。
a1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=10000
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000

(2)File channel

    File channel 是 Flum 的持久通道,它将所有的 event 写入磁盘,因此不会丢失进程或机器关机、崩溃时的数据。File channel 通过在一次事务中提交多个 event 来提高吞吐量,做到了只要事务被提交,那么数据就不会有丢失。

File channel常用配置属性(加粗部分为必须属性)
属性名称默认值说明type——组件类型名必须是filecheckpointDir/.flume/file-channel/checkpoint检测点文件所存储的目录useDualCheckpointsfalse备份检测点如果设置为 true,backupCheckpointDir 必须设置backupCheckpointDir——备份检查点目录。此目录不能与数据目最或检查点目录相同dataDirs/.flume/file-channel/data数据存储所在的目录设置transactionCapacity10000事务容量的最大值设置checkpointInterval30000检测点之间的时间值设置(ms)maxFileSize2146435071一个单一日志的最大值设置(以字节为单位)capacity100000Channel的最大容量

使用 File channel 通道配置一个名称为 a1 的 Agent。
a1.channels=c1
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/mnt/flume/checkpoint
a1.channels.c1.dataDirs=/mnt/flume/data

3、Flume Sinks

    Flume Sources 采集到的数据通过 Channels 就会流向 Sink 中,此时的 Sink类似一个区结的递进中心,它需要根据后续需求进行配置,从而最终选择是将数据直接进行集中式存储(例如,直接存储到 HDFS中),还是继续作为其他 Agent 的 Source 进行传输。在配置Sinks时,需要明确的就是将要传输的数据目的地、结果类型;接着,根据这些实际需求信息,选择Flume已提供支持的 Flume Sinks;然后,再根据选择的 Sinks类型,配置必要和非必要的 Sinks 属性。

    具体可前往官方文档查看

https://flume.apache.org/FlumeUserGuide.html#flume-sinkshttps://flume.apache.org/FlumeUserGuide.html#flume-sinks

(1)HDFS Sink

    HDFS Sink 将 event 写入 Hadoop 分布式文件系统(HDFS),它目前支持创建文本和序列文件,以及两种类型的压缩文件。HDFS Sink 可以基于经过的时间或数据大小或 event数量来周期性地滚动文件(关闭前文件并创建新文件),同时,它还通过属性(如 event发生的时间戳或机器)来对数据进分桶/分区。HDFS 目录路径可能包含将由 HDFS 接收器替换的格式化转义序列,以生用于存储 event 的目录/文件名,使用 HDFS Sink 时需要安装 Hadoop,以便 Flume 可以有用Hadoop jar 与HDFS 集群进行通信。

HDFS Sink常用配置属性(加粗部分为必须属性)
属性名称默认值说明channel——type——组件类型名必须是hdfs
hdfs.path
——HDFS目录路径(如hdfs://namenode/ume/webdata/)hdfs.filePrefixFlumeData为在 hdis 目录中由 Flume 创建的文件指定前缀hdís.roundfalse
是否应将时间戳向下舍人(如果为 true.则影响除%;之外的

所有基于时间的转义序列)
hdfs.roundValue1
舍人到此最高倍数(在使用 hdfs.roundUnit 配置的单位中),小于当前时间
hdfs.roundUnitsecond舍人值的单位(秒、分钟或小时)hdfs. rollInterval30滚动当前文件之前等待的秒数(0==根据时间间隔从不滚动)hdfs.rollSize1024触发滚动的文件大小,以字节为单位(0:永不基于文件大小滚动)hdfs.rollCount10在滚动之前写入文件的事件数(0==从不基于事件数滚动)hdfs.batchSize100在将文件刷新到 HDFS之前写人文件的 event 数hdfs.useLocalTimeStampfalse
替换转义序列时,请使用本地时间(而不是event beader 中的时间戳)

使用 HDFS Sink 配置一个名称为 a1 的 Agent。
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.path=/flume/events/%y-%m-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix=events-
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit=minute

(2)Logger Sink

    Logger Sink用于记录 INFO 级别 event,它通常用于调试。Logger Sink 接收器的不同之处是它不需要在“记录原始数据”部分中说明额外配置。

Logger Sink常用配置属性(加粗部分为必须属性)
属性名称默认值说明channel——type——组件类型名必须是 loggermaxBytes ToLog16要记录的 event body 的最大字节数

使用 Logger Sink 配置一个名称为 a1 的 Agent。
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1

(3)Avro Sink

    Avro Sink 形成了 Flume 的分层收集支持的一半,发送到此接收器的 Flume event 将转换为 Avro event 并发送到配置的主机名/端口对上,event 将从配置的 Channel中批量获取配置的批处理大小。

Avro Sink常用配置属性(加粗部分为必须属性)
属性名称默认值说明channel——type——组件类型名必须是 avrohostname——要监听的主机名或IP地址port——要监听的服务端口batch-size100要一起批量发送的 event 数connect-timeout20000允许第一次(握手)请求的时间量(ms)request-timeout20000在第一个之后允许请求的时间量

使用 Avro Sink 配置一个名称为 a1 的 Agent。
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=avro
a1.sinks.k1.channel=c1
a1.sinks.k1.hostname=10.10.10.10
a1.sinks.k1.port=4545

四、Flume的可靠性保证

    前面讲解的Flume 人门使用中,配置的采集方案是通过唯一一个Sink作为接收器接收后续需要的数据,但有时候会出现当前 Sink故障或者数据收集请求量较大的情况,这候单一的 Sink 配置可能就无法保证 Flume 开发的可靠性。为此,Flume 提供了Flume Sink Processors(Flume Sink 处理器)来解决上述问题。

    Sink 处理器允许开发者定义一个 Sink groups(接收器组),将多个 Sink 分组到一个实体中,这样Sink处理器就可以通过组内的多个Sink为服务提供负载均衡功能,或者是在某个Sink出现短暂故障的时候实现从一个 Sink 到另一个 Sink的故障转移。

1、负载均衡

    负载均衡接收器处理器(Load balancing sink processor)提供了在多个 Sink 上进行负载均衡流量的功能,它维护了一个活跃的Sink索引列表,必须在其上分配负载。Load balancing sink processor 支持使用 round_robin (轮询)和 random (随机)选择机制进行流量分配,其默认选择机制为 round_robin,但可以通过配置进行覆盖。还支持继承 AbstractSinkSelector 的自定义类来自定义选择机制。

    在使用时选择器(selector)会根据配置的选择机制挑选下一个可用的Sink并进行调用。对于round_robin和random两种选择机制,如果所选 Sink无法收集 event,则处理器会通过其配置的选择机制选择下一个可用 Sink。这种实现方案不会将失败的Sink列人黑单,而是继续乐观地尝试每个可用的 Sink。如果所有 Sink 都调用失败,则选择器将故障传播到接收器运行器(sink runner)。

    如果启用了 backoff 属性,则 Sink处理器会将失败的Sink列人黑名单。当超时结束时,如果Sink仍然没有响应,则超时会呈指数级增加,以避免在无响应的 Sink 上长时间等待时卡住。在禁用backoff 功能的情况下,在 round_robin 机制下,所有失败的 Sink 将被传递到 Sink 队列中的下一个 Sink后,因此不再均衡。

Load balancing sink processor提供的配置属性(加粗部分为必须属性)。
属性名称默认值说明sinks——以空格分隔的参与 sink 组的 sink 列表processor.typedefault组件类型名必须是 load_balanceprocessor.backofffalse设置失败的 sink 进人黑名单processor.selectorround_robin选择机制。必须是 round_robin、random或是继承自AbstractSinkSelector 的自定义选择机制类全路径名processor.selector.maxTimeOut30000
失败 sink 放置在黑名单的超时时间,失败sink在指
指定时间后仍无法启用,则超时时间呈指数增加
processor.type 属性的默认值为 default,这是因为 Sink 处理器的 processor.type 提供了3 种处理机制:default(默认值)、failover 和 load_balance。其中, default 表示配置单独一个sink,配置和使用非常简单,同时也不强制要求使用 sink group 进行封装;另外的 failover 和 load_balance 就分别代表故障转移和负载均衡情况下的配置属性。

使用 Load balancing sink processor 配置一个名称为 al 的 Agent。
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=kl k2
a1.sinkgroups.g1.processor.type=load_balance 
al.sinkgroups.g1.processor.backoff-true
al.ainkgroups.g1.processor.selector=random

(1)搭建并配置Flume机器

在hadoop01上,将Flume同步到hadoop02、hadoop03上
scp -r /export/servers/flume hadoop02.bgd01:/export/servers/
scp -r /export/servers/flume hadoop03.bgd01:/export/servers/

scp -r /etc/profile hadoop02.bgd01:/etc/profile
scp -r /etc/profile hadoop03.bgd01:/etc/profile

分别在hadoop02、hadoop03上执行如下命令,立即刷新配置
source /etc/profile

(2)配置Flume采集方案

a、exec-avro.conf

    在hadoop01.bgd01上配置第一级采集配置,在/export/servers/flume/conf/目录下编写采集方案exec-avro.conf。
# 配置load balancing sink processor一级采集方案
a1.sources = r1

#用空格分别配置2个sink
a1.sinks = k1 k2
a1.channels = c1

# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/123.log

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 设置sink1,流向Hadoop02,由Hadoop02上的Agent进行采集
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02.bgd01
a1.sinks.k1.port = 52020

# 设置sink2,流向Hadoop03,由Hadoop03上的Agent进行采集
a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop03.bgd01
a1.sinks.k2.port = 52020

# 配置sink组及处理器策略
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
a1.sinkgroups.g1.processor.timeout = 10000

b、netcat-logger.conf

     分别在hadoop02.bgd01和hadoop03.bgd01上配置第二级采集配置,在/export/servers/flume/conf/目录下编写采集方案netcat-logger.conf。
# 配置load balancing sink processor二级采集方案的一个Sink分支

#为agent各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop02.bgd01
a1.sources.r1.port = 52020

# 描述并配置sinks组件(采集后数据流出的类型)
a1.sinks.k1.type = logger

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source 和 sink 通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 配置load balancing sink processor二级采集方案的一个Sink分支

#为agent各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop03.bgd01
a1.sources.r1.port = 52020

# 描述并配置sinks组件(采集后数据流出的类型)
a1.sinks.k1.type = logger

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source 和 sink 通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
    两个采集方案内容的唯一区别就是 source.bind 的不同,hadoop02.bgd01 机器的source.bind=hadoop02.bgd01,而hadoop03.bgd01 机器的 source.bind=hadoop03.bgd01,在 r述两个文件中,均设置了一个名为 al的Agent,在该 Agent 内部设置了 source. type= avro、source.bind=hadoop02.bgd01/hadoop03.bgd01 以及 source. port=52020,特意用来对接在 hddoop01.bgd01中前一个Agent收集后到 Sink的数据类型和配置传输的目标;最后,又设置了二采集方案的 sink.type=logger,将二次收集的数据作为日志收集打印。

(2)启动Flume系统

1、分别在hadoop02、hadoop03上,进入 /export/servers/flume 目录,使用netcat-logger采集方案启动FLume
flume-ng agent --conf conf/ --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console

2、在hadoop01上,进入 /export/servers/flume 目录,使用exec-avro.conf采集方案启动FLume
flume-ng agent --conf conf/ --conf-file conf/exec-avro.conf --name a1 -Dflume.root.logger=INFO,console

hadoop02.bgd01

hadoop03.bgd01

hadoop01.bgd01

(3)Flume系统负载均衡测试

在hadoop01.bgd01的root目录创建logs目录
mkdir -p /root/logs

在hadoop01上,重新打开一个终端,执行如下命令:
while true; do echo "access access ..." >> /root/logs/123.log; sleep 1; done

hadoop02.bgd01

hadoop03.bgd01

2、故障转移

    故障转移接收器处理器(Failover Sink Processor)维护一个具有优先级的sink列表,保证在处理 event 只要有一个可用的 sink 即可。故障转移机制的工作原理是将故障的sink降级到故障池中,在池中为它们分配一个冷却期,在重试之前冷却时间会增加,当 sink 成功发送 event后,它将恢复到活跃池中。sink具有与之相关的优先级,数值越大,优先级越高。如果在发送 event 时 sink 发生故障,则会尝试下一个具有最高优先级的 sink来继续发送event。如果未指定优先级,则根据配置文件中指定 sink 的顺序确定优先级。

Failover Sink Processor配置属性(加粗部分为必须属性)。
属性名称默认值说明sinks——以空格分隔的参与 sink 组的 sink 列表processor.typedefault组件类型名必须是 failover**processor.priority.<sinkName>**——设置 sink 的优先级取值processor.maxpenalty30000失败 sink 的最大退避时间

使用 Failover Sink Processor 配置一个名称为al的Agent。

al.sinkgroups=g1
a1.sinkgroups.g1.sinks=kl k2
al.sinkgroups.g1.processor.type=failover 
al.sinkgroups.g1.processor.priority.k1=5
a1.sinkgroups.gl.processor.priority.k2-10 al.sinkgroups.g1.processor.maxpenalty=10000

(1)配置Flume采集方案

a、avro-logger-memory.conf

    在hadoop01.bgd01上配置第一级采集配置,在/export/servers/flume/conf/目录下编写采集方案avro-logger-memory.conf。
# 配置load balancing sink processor一级采集方案
a1.sources = r1

#用空格分别配置2个sink
a1.sinks = k1 k2
a1.channels = c1

# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/456.log

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 设置sink1,流向Hadoop02,由Hadoop02上的Agent进行采集
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02.bgd01
a1.sinks.k1.port = 52020

# 设置sink2,流向Hadoop03,由Hadoop03上的Agent进行采集
a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop03.bgd01
a1.sinks.k2.port = 52020

# 配置sink组及处理器策略
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1=5
a1.sinkgroups.g1.processor.priority.k2=10
a1.sinkgroups.g1.processor.timeout = 10000

b、exec-avro-failover.conf

    分别在hadoop02.bgd01和hadoop03.bgd01上配置第二级采集配置,在/export/servers/flume/conf/目录下编写采集方案exec-avro-failover.conf。
# 配置load balancing sink processor二级采集方案的一个Sink分支

#为agent各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop02.bgd01
a1.sources.r1.port = 52020

# 描述并配置sinks组件(采集后数据流出的类型)
a1.sinks.k1.type = logger

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source 和 sink 通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 配置load balancing sink processor二级采集方案的一个Sink分支

#为agent各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop03.bgd01
a1.sources.r1.port = 52020

# 描述并配置sinks组件(采集后数据流出的类型)
a1.sinks.k1.type = logger

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source 和 sink 通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(2)启动Flume系统

1、分别在hadoop02、hadoop03上,进入 /export/servers/flume 目录,使用avro-logger-memory.conf采集方案启动FLume
flume-ng agent --conf conf/ --conf-file conf/avro-logger-memory.conf --name a1 -Dflume.root.logger=INFO,console

2、在hadoop01上,进入 /export/servers/flume 目录,使用exec-avro-failover.conf采集方案启动FLume
flume-ng agent --conf conf/ --conf-file conf/exec-avro-failover.conf --name a1 -Dflume.root.logger=INFO,console

hadoop01.bgd01

hadoop02.bgd01

hadoop03.bgd01

(3)Flume系统故障转移测试

在hadoop01.bgd01上,重新打开一个终端,执行如下命令:
while true; do echo "access access ..." >> /root/logs/456.log; sleep 1; done

hadoop02.bgd01

hadoop03.bgd01

五、Flume拦截器

    FlumeInterceptors(拦截器)主要用于实现对Flume系统数据流中 event 的修改操作。在使用 Flume 拦截器时,只需要参考官方配置属性在采集方案中选择性地配置即可,当涉及配置多个拦截器时,拦截器名称中间需要用空格分隔,并且拦截器的配置顺序就是拦微顺序。这里只简述常用的几种的,具体可前往官方文档查看。

https://flume.apache.org/FlumeUserGuide.html#flume-interceptorshttps://flume.apache.org/FlumeUserGuide.html#flume-interceptors

1、Timestamp interceptor

    TimestampInterceptor(时间戳拦截器)会将流程执行的时间插入到event的header头部。此拦截器插人带有 timestamp 键(或由 header 属性指定键名)的标头,其值为对应时间戳。如果配置中已存在时间戳时,此拦截器可以保留现有的时间戳。

Timestamp Interceptor常用配置属性(加粗部分为必须属性)。
属性名称默认值说明type——组件类型名必须是timestampheadertimestamp用于放置生成的时间戳的标头的名称preserveExistingfalse如果时间戳已存在,是否应保留,true或false

为名称为a1的Agent中配置Timestamp interceptor。
a1.sources=r1
a1.channels=c1
a1.sources.r1.channels=c1
a1.sources.r1.type=seq
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp

2、 Static interceptor

    Static Interceptor(静态拦截器)允许用户将具有舒志值的静态头附加到所有even,当前实现不支持一次指定多个 header头,但是用户可以定又多个 Static Intercrptor 来为每一个拦截器都追加一个 header。

Static Interceptor常用配置属性(加粗部分为必须属性)。
属性名称默认值说明type——组件类型名必须是staticpreserveExistingtrue如果配置的header已存在,是否应保服keykey应创建的 header 的名称valuevalue应创建的header 对应的静态值

为名称是al的Agent 中配置 Static Intercepior。
a1.sources=r1
a1.channels=c1
a1.sources.r1.channels=c1
a1.sources.r1.type=seq
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=static
a1.sources.r1.interceptors.i1.key=datacenter
a1.sources.r1.interceptors.i1.value=BET_JING

3、Search and Replace Interceptor

    Search and Replace Interceptor(查询和替换拦戴器)基于Java正则表达式提供了简用的用于字符串的搜索和替换功能,同时还具有进行回溯/群组捕捉功能。此拦截器的便用与Matcher.replaceAllO方法具有相同的规则。

Sarch and Replace Intereeptor常用配置属性(加租部分为必须属性)
属性名称默认值说明type——组件类型名必须是 search_replanesearchPattern——要查询或替换的模式replaceString——替换的字符串charsetUTF-8event body 的字符集,默认为 UTF-8

为名称为al的Agent 中配置 Search and Replace Interceptor的示例如下。
al.sources=r1 
a1.channels=cl
a1.sources.r1.channels=cl 
a1.sources.r1.type=seg
a1.sources.avroSrc.interceptors=i1
al.sources.avroSrc.interceptors.i1.type=search_replace 
# 影除 event body 中的前导字母数字字符
a1.sources.avroSrc.interceptors.i1.searchPattern=^[A-Za-z0-9_]+ al.sources.avroSrc.interceptors.i1.replacesString=

六、案例——日志采集

1、配置采集方案

(1)exec-avro_logCollection.conf

    分别在hadoop02.bgd01和hadoop03.bgd01上配置同样的采集目录,在/export/servers/flume/conf/目录下编写采集方案exec-avro_logCollection.conf。
# 配置 Agent 组件

# 用3个source采集不同的日志类型数据
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# 描述并配置第一个sources组件(数据类型、采集数据源的应用地址、包括自带的拦截器)
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

# 描述并配置第二个sources组件(数据类型、采集数据源的应用地址、包括自带的拦截器)
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /root/logs/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx

# 描述并配置第三个sources组件(数据类型、采集数据源的应用地址、包括自带的拦截器)
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /root/logs/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200000
a1.channels.c1.transactionCapacity = 100000

# 描述并配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop01.bgd01
a1.sinks.k1.port = 41414

# 将Source、Sink 与Channel 进行关联绑定
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1

a1.sinks.k1.channel = c1

(2)avro-hdfs_logCollection.conf

    在hadoop01.bgd01上配置第二级日志采集方案,在/export/servers/flume/conf/目录下编写采集方案exec-hdfs_logCollection.conf。
# 配置load balancing sink processor二级采集方案的一个Sink分支

#配置 Agent 组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述并配置sources组件(数据类型、采集数据源的应用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop01.bgd01
a1.sources.r1.port = 41414

# 描述并配置拦截器,用于后续%Y%m%d获取时间
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

# 描述并配置sinks组件(采集后数据流出的类型)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://hadoop01.bgd01:9000/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text

#生成的文本不按条数生成
a1.sinks.k1.hdfs.rollCount = 0

#生成的文本不按时间生成
a1.sinks.k1.hdfs.rollInterval = 0

#生成的文本按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760

#批量写入HDFS的个数
a1.sinks.k1.hdfs.batchSize = 20

#Flume操作HDFS的线程数(包括新建、写入)
a1.sinks.k1.hdfs.threadsPoolSize = 10

#操作HDFS超时时间
a1.sinks.k1.hdfs.callTimeout = 30000

# 将source 和 sink 通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、启动hadoop集群

start-dfs.sh  #启动HDFS
start-yarn.sh #启动YARN

3、启动Flume系统

在hadoop01上启动Flume系统
打开终端。进入 /export/servers/flume 目录,使用avro-hdfs_logCollection.conf采集方案启动FLume。命令如下:
flume-ng agent --conf conf/ --conf-file conf/avro-hdfs_logCollection.conf --name a1 -Dflume.root.logger=INFO,console

分别在hadoop02、hadoop03上,进入 /export/servers/flume 目录,使用exec-avro_logCollection.conf采集方案启动FLume。命令如下:
flume-ng agent --conf conf/ --conf-file conf/exec-avro_logCollection.conf --name a1 -Dflume.root.logger=INFO,console

4、 日志采集系统测试

1、在hadoop02上,创建目录/root/logs;然后打开3个终端,分别执行执行如下命令,用来产生日志数据:
while true; do echo "access access ..." >> /root/logs/access.log; sleep 1; done
while true; do echo "nginx nginx ..." >> /root/logs/nginx.log; sleep 1; done
while true; do echo "web web ..." >> /root/logs/web.log; sleep 1; done

2、在hadoop03上,创建目录/root/logs;然后打开3个终端,分别执行执行如下命令,用来产生日志数据:
while true; do echo "access access ..." >> /root/logs/access.log; sleep 1; done
while true; do echo "nginx nginx ..." >> /root/logs/nginx.log; sleep 1; done
while true; do echo "web web ..." >> /root/logs/web.log; sleep 1; done
    回到在hadoop01.bgd01上启动Flume系统的终端窗口,观察日志采集信息。

    在hadoop01.bgd01上,打开FireFox浏览器,在地址栏输入地址 http://hadoop01.bgd01:50070(集群IP/主机名+端口),进入到Hadoop集群UI,可以看到集群下新添加了一个Source目录。单击进入Source目录,查看内部文件存储结构。


参考书籍

《Hadoop大数据技术原理与应用》

标签: 大数据 hadoop flume

本文转载自: https://blog.csdn.net/weixin_63507910/article/details/128653006
版权归原作者 雨诺风 所有, 如有侵权,请联系我们删除。

“大数据技术之Hadoop(八)——Flume日志采集系统”的评论:

还没有评论