###如若发现错误,或代码敲错,望能评论指正!!!
通过百度网盘分享的文件:Hadoop相关需要的软件
链接:https://pan.baidu.com/s/1XzDvyhP4_LQzAM1auQCSrg?pwd=tph5
提取码:tph5
VMware下安装CentOS
一、先安装一个虚拟机
安装好后要右键,找到用管理员的方式打开
也可以设置成每次打开都是以管理员身份运行
二、安装一个CentOS,这里使用的是7版本的
三、打开VMware,创建新的虚拟机
来到这个界面可以等待60秒,也可以按下tab键下一步。
我这里选择英文,各位可以选择中文。
继续添加
设置密码
到这里就安装好啦
四、ping本地与百度的设置
1.重启服务
service network restart
2.修改配置文件
vi /etc/sysconfig/network-scripts/ifcfg-eth0
如果进入的是空表,用cd进入这里,然后用“ll”查看那个带ifcfg的文件,进入编辑
我这里是ifcfg-ens33
cd /etc/sysconfig/network-scripts
ll
如果你这里是$符号请用这个命令,使用超级用户,变成#
DEVICE是指设备名
HWADDR是指网卡地址
ONBOOT是设置系统启动时是否激活网卡
BOOTPROTO的值可以设置为dhcp、none、bootp、static
dhcp 设置网卡绑定的时候通过DHCP协议的方法来获得地址
none 设置网卡绑定的时候不使用任何协议
bootp 设置网卡绑定的时候使用BOOTP协议
static 设置网卡绑定的时候使用静态协议
将该文件的ONBOOT=no改为ONBOOT=yes,将BOOTPROTO=dhcp改为BOOTPROTO=static
添加IP地址IPADDR、子网掩码NETMASK、网关GATEWAY以及DNS1等信息
这是我未修改的样子(不会如何修改的跟保存的请百度)
点击编辑里面的虚拟网络编辑器
要用管理员身份使用软件才能更改设置
当使用管理员身份后这里会有三个,选NAT模式
这里最后一段要选不一样的,最好是3位数
这里两个要一样
修改完保存退出后重启一下服务
service network restart
查看IP
ifconfig
3.ping一下百度跟本地,看一下连接是否通畅
ping www.baidu.com
ping 192.168.235.233
ctel+c退出ping
4.安装yum源
执行cd /etc/yum.repos.d 进入etc/yum.repos.d目录
cd /etc/yum.repos.d
查看yum.repos.d目录下的文件
CentOS-Base.repo 是网络的
CentOS-Media.repo是本地的
5.执行重命名命令
如果要用网络下载,就改名全部(改名是为了禁用,需要用的话就把名字改回去)
CentOS-Base.repo CentOS-Debuginfo.repo CentOS-fasttrack.repo CentOS-Vault.repo CentOS-Media.repo
mv CentOS-Media.repo CentOS-Media.repo.bak
vi CentOS-Media.repo.bak
将baseurl的值修改为:file:///media/ ,将gpgcheck的值改为0 ,将enabled的值修改为 1
修改前:
修改后
6.挂载
执行以下命令进行挂载
mount /dev/dvd /media #6.8版本
mount /dev/cdrom /media #7版本
如果没有第一条的话,修改第二条
7.更新yum源。
yum clean all
8.用阿里云安装软件
打开阿里云登录
curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
wget -O /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
9.安装Java
使用xshell7
使用ctel+alt+f打开共享文件
放到opt目录下
在命令行安装Java
进入opt目录安装
rpm -ivh jdk-7u80-linux-x64.rpm
五.搭建Hadoop完全分布式集群
1.把Hadoop安装包hadoop-xxxxx / 上传到虚拟机master的/opt目录下
然后进入opt目录下输入下面语句,将hadoop安装包解压到虚拟机上
tar -zxf hadoop-xxxx -C /usr/local
Hadoop配置涉及的文件都在/usr/local/hadoop-2.6.4/etc/hadoop/目录下
2.修改core-site.xml文件
vi core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://master:8020</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/var/log/hadoop/tmp</value>
</property>
</configuration>
然后在hadoop目录下创建一个tmp文件夹
mkdir tmp
3.修改hadoop-env.sh
vi hadoop-env.sh
4.修改yarn-env.sh文件
vi yarn-env.sh
5.复制mapred-site.xml.template 并命名为mapred-site.xml并修改
cp mapred-site.xml.template mapred-site.xml
修改mapred-site.xml文件
vi mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!-- jobhistory properties -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>master:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>master:19888</value>
</property>
</configuration>
6.修改yarn-site.xml文件
vi yarn-site.xml
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>master</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>${yarn.resourcemanager.hostname}:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>${yarn.resourcemanager.hostname}:8030</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>${yarn.resourcemanager.hostname}:8088</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.https.address</name>
<value>${yarn.resourcemanager.hostname}:8090</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>${yarn.resourcemanager.hostname}:8031</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>${yarn.resourcemanager.hostname}:8033</value>
</property>
<property>
<name>yarn.nodemanager.local-dirs</name>
<value>/data/hadoop/yarn/local</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.nodemanager.remote-app-log-dir</name>
<value>/data/tmp/logs</value>
</property>
<property>
<name>yarn.log.server.url</name>
<value>http://master:19888/jobhistory/logs/</value>
<description>URL for job history server</description>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>4096</value>
</property>
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.nodmanager.resource.cpu-vcores</name>
<value>1</value>
</property>
</configuration>
7.修改slaves文件
vi slaves
slave1
slave2
slave3
8.修改hdfs-site.xml文件
vi hdfs-site.xml
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///data/hadoop/hdfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///data/hadoop/hdfs/data</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>master:50090</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
</configuration>
9.返回最初目录,进入/etc,修改hosts文件
在最后面添加以下代码,主要符合自己的ip跟主机名
192.168.235.233 master master.centos.com
192.168.235.234 slave1 slave1.centos.com
192.168.235.235 slave2 slave2.centos.com
192.168.235.236 slave3 slave3.centos.com
在自己的电脑上hosts也添加
·
10.修改完后关机,进行克隆。
右击master,管理,克隆
11.打开虚拟机slave1
(1)执行以下命令,删除70-persistent-net.rules
rm -rf /etc/udev/rules.d/70-persistent-net.rules
(2)执行命令:
ifconfig -a
查看HWADDR,记录HWADDR(每个机器的此值是不一样的)
找不到另一种方法(如果是可视化的)
(3)修改/etc/sysconfig/network-scripts/ifcfg-eth0文件,修改HWADDR、IPADDR以及注释掉UUID开头的一行代码,并修改HWADDR的内容为实际的地址
vi /etc/sysconfig/network-scripts/ifcfg-eth0
如果进入的是空表,用cd进入这里,然后用“ll”查看那个带ifcfg的文件,进入编辑
我这里是ifcfg-ens33
cd /etc/sysconfig/network-scripts
ll
如果你这里是$符号请用这个命令,使用超级用户,变成#
图形化界面也可以在这里修改
修改后重启服务:
service network restart
(4)修改主机名,执行命令
6版本的:
vi /etc/sysconfig/network
修改机器名为 slave1.centos.com
7版本的:
vi /etc/hostname
修改为 slave1.centos.com
修改用户名:
(5)使用reboot命令重启虚拟机。
(6)验证slave1是否配置成功。在master下执行ping slave1 如果ping得通,说明配置成功。
注意要master跟slave1两个虚拟机都打开才能互ping
12.重复(1)~(5)的相关步骤,克隆master到slave2、slave3,并修改slave2、slave3的相关配置。
13.配置SSH免密登录
(1)在master虚拟机上使用ssh-keygen产生公钥(id_rsa.pub)与私钥(id_rsa)两个文件
输入以下命令执行后,接着按三次‘Enter’
ssh-keygen -t rsa
ssh-keygen用来生成RSA类型的密钥以及管理该密钥,参数“ -t ”用于指定要创建的SSH密钥类型为RSA
三次回车
1.保存密钥
2.不对私钥设口令
3.确认不设口令
(2)用ssh-copy-id将公钥复制到远程机器中(4个都要)
ssh-copy-id -i /root/.ssh/id_rsa.pub master
(3)验证SSH是否能够无密钥登录
在master下分别输入ssh slave1、ssh slave2、ssh slave3,exit 退出
14.时间同步
(1)在每个节点输入以下代码安装NTP
yum install -y ntp
安装成功
(2)设置master节点为NTP服务主节点,那么配置如下
使用如下命令来打开/etc/ntp.conf文件,注释掉server开头的就行,并添以下内容。
vim /etc/ntp.conf
restrict 192.168.0.0 mask 255.255.255.0 nomodify notrap
server 127.127.1.0
fudge 127.127.1.0 stratum 10
(3)分别在slave1 slave2 slave3 中配置NTP,同样修改/etc/ntp.conf文件,注释掉server开头的,并添加以下内容
server master
(4)执行命令,永久性关闭防火墙,主从节点都要关闭
6版本:
service iptables stop & chkconfig iptables off
7版本:
查看防火墙状态
systemctl status firewalld.service
绿的running表示防火墙开启
执行开机禁用防火墙自启命令 :
systemctl disable firewalld.service
执行后当前状态下还是启动的,需要重启才永久关闭。
重启后查看
注意:主从节点都要关闭
slave1
关机后重启查看slave1:
重复上面步骤把slave2、slave3的防火墙也永久关闭
关于防火墙的其他命令:
执行关闭命令: systemctl stop firewalld.service (开机启动)
再次执行查看防火墙命令:systemctl status firewalld.service
执行开机禁用防火墙自启命令 : systemctl disable firewalld.service (开机不启动)
启动:systemctl start firewalld.service
防火墙随系统开启启动 : systemctl enable firewalld.service
(5)启动ntp服务
①.在master结点执行命令
service ntpd start & chkconfig ntpd on
查看是否启动成功
systemctl status ntpd
重启后查看
systemctl status ntpd
②.在slave1、slave2、slave3上分别执行下面代码,即可同步时间
ntpdate master
slave1:
slave2:
slave3:
③.在slave1、slave2、slave3上分别执行下面命令,即可永久启动NTP服务。
service ntpd start & chkconfig ntpd on
重启后查看
15.启动关闭集群
做完Hadoop的所有配置后,即可执行格式化NameNode操作,该操作会在NameNode所在机器初始化一些HDFS的相关配置,并且该操作在集群搭建过程中只需执行一次,执行格式化之前可以先配置环境变量。
(1)配置环境变量,在master、slave1、slave2、slave3上修改 文件,添加以下内容,保存退出后执行source /etc/profile来使配置生效
export HADOOP_HOME=/usr/local/hadoop-2.6.4
export PATH=$HADOOP_HOME/bin:$PATH:/usr/java/jdk1.7.0_80/bin
master:
slave1:
slave2:
slave3:
格式化只需执行命令 hdfs namenode -format ,若出现 Storage directory /data/hadoop/hdfs/name has been successfully formatted 提示,则格式化成功。
hdfs namenode -format
格式化完成之后即可启动集群,启动集群只需要在master节点直接进入Hadoop安装目录,分别执行以下命令即可。
(2)启动命令集群
进入Hadoop安装目录
cd $HADOOP_HOME
启动HDFS相关服务
sbin/start-dfs.sh
启动YARN相关服务
sbin/start-yarn.sh
启动日志相关服务
sbin/mr-jobhistory-daemon.sh start historyserver
集群启动之后,在主节点master,子节点slave1、slave2、slave3分别执行 jps 命令,出现以下信息,表示集群启动成功。
slave1
slave2
slave3
(3)关闭集群命令
同理,关闭集群只需要在master节点直接进入Hadoop安装目录,分别执行下面命令即可。
进入Hadoop安装目录
cd $HADOOP_HOME
关闭YARN相关服务
sbin/stop-yarn.sh
关闭HDFS相关服务
sbin/stop-dfs.sh
关闭日志相关服务
sbin/mr-jobhistory-deamon.sh stop historyserver
(4)创建一键启动程序
(hadoop自带的一键启动关闭)
在/usr/local/hadoop-2.6.4/sbin 目录下创建一个脚本
vi myfb.sh
里面填写
#!/bin/bash
case $1 in
"start"){
echo "-----------start my hdfs---------------"
$HADOOP_HOME/sbin/start-dfs.sh
echo "-----------my hdfs started-------------"
echo "------------start my yarn--------------"
$HADOOP_HOME/sbin/start-yarn.sh
echo "------------my yarn started-------------"
echo "---------------start history server------------"
$HADOOP_HOME/sbin/mr-jobhistory-daemon.sh start historyserver
echo "---------------my history server started------------"
};;
"stop"){
echo "--------------stop my dfs-----------------"
$HADOOP_HOME/sbin/stop-dfs.sh
echo "--------------my dfs stopped-----------------"
echo "-------------stop yarn---------------"
$HADOOP_HOME/sbin/stop-yarn.sh
echo "-------------my yarn stopped---------------"
echo "-------------------stop history server------------- "
$HADOOP_HOME/sbin/mr-jobhistory-daemon.sh stop historyserver
echo "-------------------my history server stopped------------- "
};;
esac
给权限(给到权限后会变颜色)
一键启动集群
一键关闭集群
16.监控集群
服务 Web接口 默认端口
NameNode http://namenode_host:port/ 50070
ResourceManager http://resourcemanager_host:port/ 8088
MapReduce JobHistory Server http://jobhistoryserver_host:port/ 19888
为了能够顺利在浏览器打开Hadoop集群相关服务的监控界面,需要朽败本地hosts文件。hosts是一个没有扩展名的系统文件,其作用就是将一些常用的网址域名与其对应的IP地址建立一个关联“数据库”,当用户在浏览器中输入一个网址时,系统会首先自动从hosts文件中寻找对应的IP地址。一旦找到,系统会立即打开对应网页。添加下面内容。
192.168.235.233 master master.centos.com
192.168.235.234 slave1 slave1.centos.com
192.168.235.235 slave2 slave2.centos.com
192.168.235.236 slave3 slave3.centos.com
(1)HDFS监控
在浏览器的地址栏输入“http://master:50070”,按回车键即可看到HDFS的监控界面(需要先启动集群)
http://master:50070
#或者192.168.235.233:50070
①Overview记录了NameNode的启动时间、版本号、编译版本等一些信息。
②Summary是集群信息,提供了当前集群环境的一些有用信息,从图中可知所有DataNode节点的基本存储信息,例如硬盘大小以及有多少被HDFS使用等一些数据信息,同时还标注了当前集群环境中DataNode的信息,对活动状态的DataNode也专门做了记录。
③NameNode Storage提供了NameNode的信息,最后的State标示此节点为活动节点,可正常提供服务。
http://master:50070 HDFS监控1
如上图所示,选择Utilities >> Browse the file system 菜单命令可以查看HDFS上的文件信息
在浏览器中访问“http://master:50070/dfshealth.jsp”得到下图所示的页面,
http://master:50070/dfshealth.jsp
单击“Browse the filesystem”超链接可以打开文件存储目录,如下图所示。
“NameNode Logs”为用户提供NameNode节点的log信息,如下图所示。
(2)YARN监控
在浏览器的地址栏输入“http://master:8088”,即可看到YARN的监控界面,如下图所示。
http://master:8088
(3)日志监控
在浏览器的地址栏输入“http://master:19888”,即可看到日志的监控界面,如下图所示。
http://master:19888
17HDFS文件处理
可以通过Hadoop dfs –help命令来查看HDFS Shell命令的说明。大部分的HDFS Shell和Linux的shell相似。
一般的shell命令格式为:
command 是命令
genericOptions 是一般的参数
commandOptions 是命令参数
在hadoop dfs –ls /input 这条命令中,command是dfs,genericOptions对应-ls等参数,commandOptions对应于/input 这个路径参数。
更多相关命令如下:
1.-appendToFile
(1)将本地文件附加到集群 本地文件一一定要存在 集群上可以没有 集群帮用户创建,在/usr/local/hadoop-2.6.4/etc/hadoop路径下编辑一个文件vi bbb.txt 并写下面内容“this is bbb.txt”,然后使用下面命令上传到集群(前提是已启动了集群)。
hdfs dfs -appendToFile /bbb.txt /bbb.txt
(2)将多个本地文件附加到集群文件
最后一个路径 是集群文件的路径 其他前面的都是本地文件
查看结果
2.-cat
查看刚刚上传到集群的文件
hdfs dfs -cat /bbb.txt
4.-Chmod
修改权限
4.-copyFromLocal
从本地复制一份到集群上
5.-copyFromLocal
从集群复制到本地
6.-get
从集群上获得到本地
7.-cp
复制
8.-count
查看文件数量
9.-df
查看使用空间
10.-du
查看文件长度(显示hdfs对应路径下每个文件夹和文件的大小)
11.-moveFromLocal
从本地移动到集群
12.-rm /-rmdir(删除空文件夹)
删除集群上的文件
13.- tail
显示文件最后1k内容
14.-test -d/-e
查看文件或者目录的反馈(这里能正常执行,说明命令是正确的)
15.-mkdir
创建文件夹
16.-mv
移动文件
17.touchz
创建文件
18.-text
将文本文件或某些格式的非文本文件通过文本格式输出
19.-stat
返回对应路径的状态信息
%b(文件大小),%o(Block大小),%n(文件名),%r(副本个数),%y(最后一次修改日期和时间)
20.-put
将本地文件放到hdfs某个目录
21.-getmerge
把两个文件的内容合并起来(注:合并后的文件位于当前目录,不在hdfs中,是本地文件)
22.-grep
从hdfs上过滤包含某个字符的行内容
六、Hadoop实现MapReduce执行wordcount与使用Eclipse创建MapReduce工程执行wordcount
1.Hadoop实现MapReduce执行wordcount
(1)VMware里面启动集群,在根目录下面创建一个文本,里面填写一些文本
(2)在集群里面创建一个input文件夹
(3)然后把本地的a.txt文本上传到集群上的input文件夹里面
hdfs dfs -put /a.txt /input/a.txt
(4)输入命令进行词频统计
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar wordcount /input /output
(5)然后就会在集群里面产生一个output文件,里面有两个文本
(6)执行命令查看词频统计结果
hdfs dfs -cat /output/part-r-00000
注意:误删了output里面的文本的话的不能恢复的,可以删掉原来的output文件,重新创建一个,重新进行词频统计即可。
(7)如果想要做另一个文本的词频统计,需要把后面的output改个名字,就是从新统计的意思
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar wordcount /input /output1
2.使用Eclipse创建MapReduce工程执行wordcount
(1)添加Hadoop插件
找到eclipse的安装路径,然后将插件移动到这个路径下
(2)增加Map/Reduce功能区
打开eclipse进行以下操作
(3)增加Hadoop集群的连接
单击下图所示界面右下方的蓝色小象图标(其右上方右+号),就会弹出连接Hadoop集群的配置窗口。
在VMware的Hadoop集群里输入以下代码,查看hdfs端口号(先启动集群)
hdfs getconf -confKey fs.default.name
配置namenode节点的ip(自己的虚拟机IP)地址及端口。
相关的Hadoop集群的连接信息有以下各项。
Location name:命名新建的Hadoop连接名称,如Hadoop Cluster。
Map/Reduce Master:填写Hadoop集群的ResourceManager的IP和端口。
DFS Master:填写Hadoop集群的NameNode的IP地址和连接端口。
填写完以上信息后,单击“Finish”按钮。
(4)浏览HDFS上的目录及文件
在配置完Hadoop集群连接后,确认Hadoop集群已经启动,就可以在Eclipse界面浏览HDFS上的目录及文件,如下图所示。还可以通过鼠标来执行文件操作,例如文件的上传和删除等。需要注意的事,每次执行操作后,需要刷新HDFS列表,从而获得文件目录的最新状态。
3.新建MapReduce工程
(1)导入MapReduce运行依赖的相关JAR包
在主菜单上单击“Window”并选择“Preferences”,例如下图所示Preference界面中,选择“Hadoop Map/Reduce”,单击“Browse...”按钮,再选中Hadoop的安装文件夹路径(相应版本的Hadoop安装包需要预先解压再本地电脑上)。最后单击“Apply”按钮并单击确定。
(2)创建MapReduce工程
从菜单栏开始,单击“File”菜单,选择“New”命令,在出现的选项中单击“Project”项,再选择“Map/Reduce Project”选项。
在"MapReduce Project"的创建界面中,填写工程名“MemberCount”,然后单击“Finish”按钮。
在主界面左侧的“Project Explorer”栏,可以看到已经创建好的工程MemberCount,Map Reduce编程所需要的JAR包已经全部自动导入。新工程已创建完成,接下来就可以正式进行MapReduce编程工作了。
创建完成MapReduce工程
4.编写wordcount的代码 ,描述他是怎么工作的。并且上传到集群完成一次完整的Wordcount
(1)创建一个class文件并命名为WordCount
(2)内容代码与解析如下
①应用程序Driver分析
这里的Driver程序主要指的是main函数,在main函数里面进行MapReduce程序的一些初始化设置,并提交任务,等待程序运行完成。总结为MapReduce任务初始化的通用代码。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
②Mapper模式分析
在MapReduce程序中,最重要的代码实现就是Mapper模块中的map函数以及Reducer模块中的reduce函数。这里先看Mapper,也就是源码中的TokenizerMapper
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
③Reducer模式分析
继续分析Reduce,即源码中的IntSumReducer
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
这是wordcount的完整代码
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
④以下为待会任务需要修改后的完整代码
注意class名称与包名是否一致
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class dailyAccessCount {
public static class MyMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String array[] = line.split(","); //指定空格为分隔符,组成数组
String keyOutput = array[1]; //提取数组中的访问日期做为Key
context.write(new Text(keyOutput), one); //组成键值对
}
}
public static class MyReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0; //定义累加器,初始值为0
for (IntWritable val : values) {
sum += val.get(); // 将相同键的所有值进行累加
}
result.set(sum);
context.write(key,result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "Daily Access Count");
job.setJarByClass(dailyAccessCount.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
⑤注意:确保自己电脑上的java版本跟Hadoop上的版本一致,我这里的是jdk-7u80-windows-x64(1.7)
写完代码后,把项目代码调整为1.7版本
⑥编译生成JAR包文件,并提交Hadoop集群执行
上传到Hadoop集群服务器节点
这里我放到这个路径 /usr/local/hadoop-2.6.4
需要做统计的文本也上传到Hadoop,并上传到集群(130万行数据的文本)
在Hadoop集群服务器的终端,以hadoop jar命令提交任务
注意输出的文件名每次不能相同
hadoop jar $HADOOP_HOME/wordcount1.jar /user/root/user_login.txt /user/root/AccessCount
可以看到数据量的达到130万行的量级
做10次
⑦Combiner
Combiner是一个运行在Map端的“迷你Reduce”过程,它只处理单台机器生成的数据。声明的Combiner继承的是Reduce,其方法实现原理和Reduce的实现原理基本相同,不同的是,Combiner操作发生在Map端,或者说Combiner运行在每一个运行Map任务的节点上。它会接收特定节点上的Map输出作为输入,对Map输出的数据先做一次合并,再把结果发送到Reducer。需要注意的是,Combiner的加入不影响原逻辑,即Combiner不影响最终运行结果,影响的只是效率。
值得一提的是,并非所有的MapReduce程序都可以加入Combiner,仅当Reduce输入的键值对类型与Reduce输出的键值对类型一样,并且计算逻辑不影响最终计算结果时,才可以在MapReduce程序中加入Combiner。
前面讲过Combiner继承的是Reducer,所以声明Combiner的时候必须继承Reducer,在Combiner类里面重写Reduce方法。
除了声明Combiner类外,还需要在驱动类里面配置Combiner类
job.setCombinerClass(LogCountCombiner.class);
有时候甚至不必特意声明一个Combiner类。当Combiner和Reduce的实现逻辑相同的时候,可以不用声明Combiner类,而在驱动类里面添加代码
job.setCombinerClass(IntSumReducer.class);
完整代码如下:(注意修改包名和类名是否相同)
package test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class dailyAccessCount {
public static class MyMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String array[] = line.split(","); //指定空格为分隔符,组成数组
String keyOutput = array[1]; //提取数组中的访问日期做为Key
context.write(new Text(keyOutput), one); //组成键值对
}
}
public static class MyReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0; //定义累加器,初始值为0
for (IntWritable val : values) {
sum += val.get(); // 将相同键的所有值进行累加
}
result.set(sum);
context.write(key,result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "Daily Access Count");
job.setJarByClass(dailyAccessCount.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setCombinerClass(MyReducer.class); //Combiner逻辑与Reducer相同时配置Combiner类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
或者
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.examples; //包
import java.io.IOException; //数据的输入与输出
import java.util.StringTokenizer; //对String字符处理
import org.apache.hadoop.conf.Configuration; //基础conf
import org.apache.hadoop.fs.Path; //文件系统
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text; //数据输入输出(IO)
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer; //MapReduce(对数据的拆分,对数据的归类)
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper /*泛型类*/
<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1); //写整型的时候,初值为1
private Text word = new Text(); //文本对象word
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
/*字符串分隔,把单词拆出来
Hello Hadoop Hello world
hello 1 hadoop 1 hello 1 world 1
(用空格替换逗号跟句号)或者重写Tokenizer,把逗号句号替换添加进去
*/
while (itr.hasMoreTokens()) { //做一个循环判断,是否还有Token(分隔符)
word.set(itr.nextToken()); //如果有,就再读一个,从当前分隔符读到下一个分//隔符,实际就是读了一个单词
context.write(word, one); //输出这个单词
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result); //输出结果 hello 2 hadoop 1 world 1
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); //实例化conf(读取命令行的命令,以数组的方式返回路径)
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
} //判断IO的路径,如果只有一个路径,长度是小于2的,你的输出(1)包含两//个(输入和输出IO),防止出现读取的异常。
Job job = new Job(conf, "word count"); //所有的工作都是用Job来完成的,实例化Job
job.setJarByClass(WordCount.class); //打jar包
job.setMapperClass(TokenizerMapper.class); //指定Map类型
job.setCombinerClass(IntSumReducer.class); //指定Combiner
job.setReducerClass(IntSumReducer.class); //指定Reduce的类型
job.setOutputKeyClass(Text.class); //设置最终key的类型
job.setOutputValueClass(IntWritable.class); //设置最终value的类型
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} //任务提交
}
同样需要进行打包
设置接口
上传到集群
继续做在Hadoop集群服务器的终端,以hadoop jar命令提交任务(本次添加了Combiner)
注意输出的文件名每次不能相同
做10次
hadoop jar $HADOOP_HOME/wordcount2.jar /user/root/user_login.txt /user/root/AccessCount
这里可以看到所使用的文本跟上面没加Combiner的文本是同一个,所以数据量是130万行
总结:wordcount加了combiner的代码后,运行效率提高了。
代码总结:
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.examples; //包
import java.io.IOException; //数据的输入与输出
import java.util.StringTokenizer; //对String字符处理
import org.apache.hadoop.conf.Configuration; //基础conf
import org.apache.hadoop.fs.Path; //文件系统
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text; //数据输入输出(IO)
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer; //MapReduce(对数据的拆分,对数据的归类)
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
//在MapReduce程序中,最重要的代码实现就是Mapper模块中的map函数以及Reducer模块中的reduce函数。这里先看Mapper,也就是源码中的TokenizerMapper
public static class TokenizerMapper
extends Mapper /*泛型类*/
<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1); //写整型的时候,初值为1
private Text word = new Text(); //文本对象word
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
/*字符串分隔,把单词拆出来
Hello Hadoop Hello world
hello 1 hadoop 1 hello 1 world 1
(用空格替换逗号跟句号)或者重写Tokenizer,把逗号句号替换添加进去
*/
while (itr.hasMoreTokens()) { //做一个循环判断,是否还有Token(分隔符)
word.set(itr.nextToken()); //如果有,就再读一个,从当前分隔符读到下一个分//隔符,实际就是读了一个单词
context.write(word, one); //输出这个单词
}
}
}
//继续分析Reduce,即源码中的IntSumReducer
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result); //输出结果 hello 2 hadoop 1 world 1
}
}
//这里的Driver程序主要指的是main函数,在main函数里面进行MapReduce程序的一些初始化设置,并提交任务,等待程序运行完成。总结为MapReduce任务初始化的通用代码。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); //实例化conf(读取命令行的命令,以数组的方式返回路径)
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
} //判断IO的路径,如果只有一个路径,长度是小于2的,你的输出(1)包含两//个(输入和输出IO),防止出现读取的异常。
Job job = new Job(conf, "word count"); //所有的工作都是用Job来完成的,实例化Job
job.setJarByClass(WordCount.class); //打jar包
job.setMapperClass(TokenizerMapper.class); //指定Map类型
//有时候甚至不必特意声明一个Combiner类。当Combiner和Reduce的实现逻辑相同的时候,可以不用声明Combiner类,而在驱动类里面添加代码
job.setCombinerClass(IntSumReducer.class); //指定Combiner
job.setReducerClass(IntSumReducer.class); //指定Reduce的类型
job.setOutputKeyClass(Text.class); //设置最终key的类型
job.setOutputValueClass(IntWritable.class); //设置最终value的类型
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} //任务提交
}
/***
* ,%%%%%%%%,
* ,%%/\%%%%/\%%
* ,%%%\c "" J/%%%
* %. %%%%/ o o \%%%
* `%%. %%%% _ |%%%
* `%% `%%%%(__Y__)%%'
* // ;%%%%`\-/%%%'
* (( / `%%%%%%%'
* \\ .' |
* \\ / \ | |
* \\/ ) | |
* \ /_ | |__
* (___________))))))) 攻城湿
*/
版权归原作者 鸷鸟之不群 所有, 如有侵权,请联系我们删除。