目录
七、Flume
Flume是一个分布式高性能、高可靠的数据传输工具,它可用简单的方式将不同数据源的数据导入某个或多个数据中心,典型应用是将众多生产机器日志数据实时导入HDFS。
(一)Flume简介
1、Flume逻辑结构
Flume核心思想是数据流,即数据从哪来到哪去,中间不需用经过谁。
用户可以将Flume看成是两台机器之间通过网络互相传送数据,Flume定制了大量的数据源(Thrift、Shell)与数据汇(Thrift、HDFS、Hbase),通过使用“管道”,Flume能够确保不会丢失一条数据,提供了数据高可靠性。
2、Flume组成
(1)Source
它负责读取原始数据,目前Flume支持大量类型。用户可以自定义Source,使用时在配置文件里声明即可。
(2)Channel
它负责将从Source端传来的数据存入Channel。Flume的分用、复用和过滤功能即在于此。
(3)Sink
它负责从Channel中取出并发送数据。Sink内部都是使用netty来发送数据的。
(二)Flume入门
1、Flume部署
集群中只有一台机器部署Flume就可以接收数据了,此外下面的例题中还要有一台机器做为数据源,负责向Hadoop集群发送数据,故须在cMaster与iClient上部署Flume。
(1)部署Flume接收端:
[root@cMaster ~]# sudo yum install flume-ng-agent #在cMaster上部署Flume
(2)部署Flume发送端:
[root@iClient ~]# sudo yum install flume-ng-agent #在iClient上部署Flume
2、Flume访问接口
Flume提供了命令行接口和程序接口,无论是命令行还是程序接口,都必须使用Flume配置文档,这也是Flume架构思想之一——配置型工具。
【例6】 按要求完成问题:
① 进入Flume命令行,查看常用命令。
② 要求发送端iClient使用telnet向cMaster发送数据,而接收端cMaster开启44444端口接收数据,并将收到的数据显示于命令行。
③ 要求发送端iClient将本地文件“/home/joe/source.txt”发往接收端cMaster,而接收端cMaster将这些数据存入HDFS。
④ 根据问题③,接收端cMaster开启接收数据的Flume服务,既然此服务能接收iClient发来的数据,它必然也可以接收iHacker机器(黑客)发来的数据,问如何尽量减少端口攻击,并保证数据安全。
解:
对于问题①,直接在iClient上执行如下命令即可:
[root@iClient ~]# flume-ng #查看Flume常用命令
对于问题②,首先需要在cMaster上按要求配置并开启Flume,接着在iClient上使用telnet向cMaster发送数据,具体过程参见如下几步。
在cMaster上以root权限,新建文件“
/etc/flume/conf/flume.conf
”,并填入如下内容:
# 命令此处agent名为a1,并命名此a1的sources为r1,channels为c1,sinks为k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 定义sources相关属性:即此sources在cMaster上开启44444端口接收以netcat协议发来的数据
a1.sources.r1.type = netcat
a1.sources.r1.bind = cMaster
a1.sources.r1.port = 44444
# 定义channels及其相关属性,此处指定此次服务使用memory暂存数据
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 定义此sink为logger类型sink:即指定sink直接将收到的数据输出到控制台
a1.sinks.k1.type = logger
# 将sources关联到channels,channels关联到sinks上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
接着在cMaster上使用此配置以前台方式开启Flume服务:
[root@cMaster ~]# flume-ng agent -c /etc/flume-ng/ -f /etc/flume-ng/conf/flume.conf -n a1
此时,接收端cMaster已经配置好并开启了,接下来需要开启发送端,在iClient上执行:
[root@iClient ~]# telnet cMaster 44444
此时向此命令行里随意输入数据并回车,telnet会将这些数据发往cMaster,再次回到cMaster上执行命令的那个终端,会发现刚才在iClient里输入的数据发送到了cMaster的终端里。如果想退出iClient终端里的telnet,按
Ctrl+]
组合键(即同时按住Ctrl键和]键),回到telnet后输入“quit”命令回车即可,至于退出cMaster上的Flume,直接按
Ctrl+C
组合键。
对于问题③,首先,在cMaster上新建文件“
/etc/flume-ng/conf/flume.conf.hdfs
”,并填入如下内容:
# 命令此处agent名为a1,并命名此a1的sources为r1,channels为c1,sinks为k1
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 定义sources类型及其相关属性
# 即此sources为avro类型,且其在cMaster上开启4141端口接收avro协议发来的数据
a1.sources.r1.type = avro
a1.sources.r1.bind = cMaster
a1.sources.r1.port = 4141
# 定义channels类型其实相关属性,此处指定此次服务使用memory暂存数据
a1.channels.c1.type = memory
# 定义此sink为HDFS类型的sink,且此sink将接收的数据以文本方式存入HDFS指定目录
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/joe/flume/cstorArchive
a1.sinks.k1.hdfs.fileType = DataStream
# 将sources关联到channels,channels关联到sinks上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
接着,在iClient上新建文件“
/root/ businessLog
”,并填入如下内容:
ccccccccccccccccccccc
ssssssssssssssssssssssss
tttttttttttttttttttttttttttttttttt
ooooooooooooooooooo
rrrrrrrrrrrrrrrrrrrrrrrrrrrrr
iClient上还要新建文件“
/etc/flume-ng/conf/flume.conf.exce
”,并填入如下内容:
# 命令此处agent名为a1,并命名此a1的sources为r1,channels为c1,sinks为k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 定义sources类型及其相关属性,此sources为exce类型
# 其使用Linux cat命令读取文件/root/businessLog,接着将读取到的内容写入channel
a1.sources.r1.type = exec
a1.sources.r1.command = cat /root/businessLog
# 定义channels及其相关属性,此处指定此次服务使用memory暂存数据
a1.channels.c1.type = memory
# 定义此sink为avro类型sink,即其用avro协议将channel里的数据发往cMaster的4141端口
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = cMaster
a1.sinks.k1.port = 4141
# 将sources关联到channels,channels关联到sinks上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
至此,发送端iClient和接收端的Flume都已配置完成。现在需要做的是在HDFS里新建目录,并分别开启接收端Flume服务和发送端Flume服务,步骤如下。
在cMaster上开启Flume,其中“flume-ng … a1”命令表示使用
flume.conf.hdfs
配置启动Flume,参数a1即是配置文件里第一行定义的那个a1。
[root@cMaster ~]# sudo -u joe hdfs dfs -mkdir flume #HDFS里新建目录/user/joe/flume
[root@cMaster ~]# sudo -u joe flume-ng agent -c /etc/flume-ng/ -f /etc/flume-ng/conf/flume.conf.hdfs -n a1
最后,在iClient上开启发送进程,与上一条命令类似,这里的a1,即
flume.conf.exce
定义的a1:
[root@iClient ~]# flume-ng agent -c /etc/flume-ng/ -f /etc/flume-ng/conf/flume.conf.exce -n a1
此时,用户在iClient端口里打开“cMaster:50070”,依次进入目录“
/user/joe/flume/cstorArchive
”,将会查看到从iClient上传送过来的文件。
问题④属于开放性问题,请参考官方文档,讨论并解决。
八、Mahout
Mahout是基于Hadoop平台的机器学习工具,它提供了大量机器学习算法的MR实现,此外,它还提供了大量针对数据预处理的工具类,通过数据预处理工具类与机器学习算法的结合,能够很方便地实现从模型构建到性能测试等一系列步骤。
(一)Mahout简介
目前Mahout主要包含分类、聚类和协同过滤三种类型算法,需要注意的是Mahout算法处理的数据类型必须是矩阵类型的二进制数据,若数据为文本类型,用户须通过Mahout提供的数据转换工具完成转换,接着提交给相关算法,用户可以把Mahout看成一个Hadoop客户端,只是这个客户端包含了大量的机器学习Jar包。
(二)Mahout入门
1、Mahout部署
作为Hadoop的一个客户端,Mahout只要在集群中或集群外某台客户机上部署即可,实验中选择在iClient上部署Mahout。
[root@iClient ~]# sudo yum install mahout
2、Mahout访问接口
Mahout提供了程序和命令行接口,通过参考Mahout已有的大量机器学习算法,程序员也可实现将某算法并行化。
【例7】 要求以joe用户运行Mahout示例程序naivebayes,实现下载数据,建立学习器,训练学习器,最后使用测试数据针对此学习器进行性能测试。
解:
首先须下载训练数据集和测试数据,接着运行训练MR和测试MR,但是,Mahout里的算法要求输入格式为Value和向量格式的二进制数据,故中间还须加一些步骤,将数据转换成要求格式的数据,下面的脚本naivebayes.sh可以完成这些动作。
#!/bin/sh
#新建本地目录,新建HDFS目录
mkdir -p /tmp/mahout/20news-bydate /tmp/mahout/20news-all && hdfs dfs -mkdir mahout
#下载训练和测试数据集
curl http://people.csail.mit.edu/jrennie/20Newsgroups/20news-bydate.tar.gz \
-o /tmp/mahout/20news-bydate.tar.gz
#将数据集解压、合并,并上传至HDFS
cd /tmp/mahout/20news-bydate && tar xzf /tmp/mahout/20news-bydate.tar.gz && cd
cp -R /tmp/mahout/20news-bydate/*/* /tmp/mahout/20news-all
hdfs dfs -put /tmp/mahout/20news-all mahout/20news-all
#使用工具类seqdirectory 将文本数据转换成二进制数据
mahout seqdirectory -i mahout/20news-all -o mahout/20news-seq -ow
#使用工具类seq2sparse 将二进制数据转换成算法能处理的矩阵类型二进制数据
mahout seq2sparse -i mahout/20news-seq -o mahout/20news-vectors -lnorm -nv -wt tfidf
#将总数据随机分成两部分,第一部分约占总数据80%,用来训练模型
#剩下的约20%作为测试数据,用来测试模型
mahout split -i mahout/20news-vectors/tfidf-vectors --trainingOutput mahout/20news-train-vectors \
--testOutput mahout/20news-test-vectors \
--randomSelectionPct 40 --overwrite --sequenceFiles -xm sequential
#训练Naive Bayes模型
mahout trainnb -i mahout/20news-train-vectors -el -o mahout/model -li mahout/labelindex -ow
#使用训练数据集对模型进行自我测试(可能会产生过拟合)
mahout testnb -i mahout/20news-train-vectors -m mahout/model -l mahout/labelindex \
-ow -o mahout/20news-testing
#使用测试数据对模型进行测试
mahout testnb -i mahout/20news-test-vectors -m mahout/model -l mahout/labelindex \
-ow -o mahout/20news-testing
脚本写得较为简陋,执行时,切记须在iClient上,以joe用户身份执行,且只能执行一次。再次执行时,先将所有数据全部删除,执行方式如下:
[root@iClient ~]# cp naivebayes.sh /home/joe
[root@iClient ~]# chown joe.joe naivebayes.sh
[root@iClient ~]# sudo –u joe chmod +x naivebayes.sh
[root@iClient ~]# sudo –u joe sh naivebayes.sh
脚本执行时,用户可以打开Web界面“cMaster:8088”,查看正在执行的Mahout任务;还可以通过Web界面“cMaster:50070”,定位到“
/user/joe/mahout/
”查看目录变化。
版权归原作者 Francek Chen 所有, 如有侵权,请联系我们删除。