Hadoop是一个开源的分布式离线数据处理框架,底层是用Java语言编写的,包含了HDFS、MapReduce、Yarn三大部分。
组件配置文件启动进程备注Hadoop HDFS需修改需启动
NameNode(NN)作为主节点
DataNode(DN)作为从节点
SecondaryNameNode(SNN)主节点辅助分布式文件系统Hadoop YARN需修改需启动
ResourceManager(RM)作为集群资源管理者
NodeManager(NM)作为单机资源管理者
ProxyServer代理服务器提供安全性
JobHistoryServer历史服务器记录历史信息和日志分布式资源调度Hadoop MapReduce需修改无需启动任何进程
MapReduce程序运行在YARN容器内分布式数据计算
Hadoop集群 = HDFS集群 + YARN集群
图中是三台服务器,每个服务器上运行相应的JAVA进程
HDFS集群对应的web UI界面:
http://namenode_host:9870
(namenode_host是namenode运行所在服务器的ip地址)
YARN集群对应的web UI界面:http://resourcemanager_host:8088
(resourcemanager_host是resourcemanager运行所在服务器的ip地址)
一、HDFS
1.1 HDFS简介
- HDFS的全称为Hadoop Distributed File System,是用来解决大数据存储问题的,分布式说明其是横跨多台服务器上的存储系统
- HDFS使用多台服务器存储文件,提供统一的访问接口,使用户像访问一个普通文件系统一样使用分布式文件系统
- HDFS集群搭建完成后有个抽象统一的目录树,可以向其中放入文件,底层实际是分块存储(物理上真的拆分成多个文件,默认128M拆分成一块)在HDFS集群的多个服务器上,具体位置是在hadoop的配置文件中所指定的
- HDFS只支持对文件进行移动,读取、删除、追加操作,不支持任意修改文件。(一次写入,多次读取。只能追加,不能修改。)
1.2 HDFS shell命令行
- 命令行界面(command-line interface,缩写:cli),指用户通过指令进行交互
- Hadoop操作文件系统shell命令行语法:
hadoop fs [generic options]
- 大部分命令与linux相同
hadoop fs -ls file:/// # 操作本地文件系统
hadoop fs -ls hdfs://node1:8020/ # 操作HDFS文件系统,node1:8020是NameNode运行所在的机器和端口号
hadoop fs -ls / #直接根目录,没有指定则默认加载读取环境变量中fs.defaultFS的值,作为要读取的文件系统
上传文件到HDFS指定目录下
hadoop fs -put[-f][-p]<localsrc><dst># 将本地文件传到HDFS文件系统中# -f 覆盖目标文件# -p 保留访问和修改时间,所有权和权限# <localsrc>本地文件系统中的文件(客户端所在机器),<dst>HDFS文件系统的目录
下载HDFS文件
hadoop fs -get[-f][-p]<src><localdst># 将本地文件传到HDFS文件系统中# -f 覆盖目标文件# -p 保留访问和修改时间,所有权和权限# <src>HDFS文件系统中的文件,<localdst>本地文件系统的目录
追加数据到HDFS文件中
hadoop fs -appendToFile<localsrc>...<dst># <localsrc>本地文件系统中的文件,<dst>HDFS文件系统的文件(没有文件则自动创建)# 该命令可以用于小文件合并
1.3 HDFS架构
HDFS包含3个进程NameNode、DataNode、SecondaryNameNode
(都是Java进程,可以在服务器上运行jps查看正在执行的java进程)
HDFS是主从模式(Master - Slaves),基础架构如下:
- NameNode:****维护和管理文件系统元数据,包括HDFS目录树结构,文件和块的存储位置、大小、访问权限等信息。NameNode是访问HDFS的唯一入口
- DataNode: 负责具体的数据块存储,定期向NameNode汇报心跳(默认每10分钟)信息
- SecondNameNode: 是NameNode的辅助节点,但不能替代NameNode。主要是进行元数据文件(FsImage和EditsLog)的合并并推送给NameNode。
- NameNode不持久化存储每个文件中各个块所在的DataNode的位置信息,这些信息在系统启动时从DataNode重建
- NameNode是Hadoop集群中的单点故障
- NameNode所在机器通常配置大内存(RAM),因为元数据都存在内存中,定时进行持久化存到磁盘中。
- DataNode所在机器通常配置大硬盘空间,因为数据存在DataNode中
HDFS集群部署举例:
node1、node2、node3表示三台服务器,形成一个集群:
node1服务器性能比较高,因此在node1上运行三个进程:NameNode、DataNode、SecondaryNameNode
在node2及node3上只运行DataNode进程
1.4 HDFS写数据流程
- HDFS客户端创建对象实例DistributeFileSystem(Java类的对象),该对象中封装了与HDFS文件系统操作的相关方法。
- 调用DistributeFileSystem对象的create()方法,通过RPC请求NameNode创建文件,NameNode执行各种检查判断:目标文件是否存在,客户端是否有权限等。检查通过后返回FSDataOutputStream输出流对象给客户端用于写数据。
- 客户端用FSDataOutputStream开始写数据
- 客户端写入数据时,将数据分成一个个数据包(packet 默认64k),内部组件DataStreamer请求NameNode挑选出适合存储数据副本的一组DataNode地址,默认是3副本存储(即3个DataNode)。DataStreamer将数据包流式传输(每一个packet 64k传输一次)到pipeline的第一个DataNode,第一个DataNode存储数据后传给第二个DataNode,第二个DataNode存储数据后传给第三个DataNode。
- 传输的反方向上,会通过ACK机制校验数据包传输是否成功
- 客户端完成数据写入后,在FSDataOutputStream输出流上调用close()方法关闭。
- DistributeFileSystem告诉NameNode文件写入完成。
二、Yarn
2.1 Yarn简介
Yarn是一个通用****资源管理系统和调度平台,可为上层应用提供统一的资源管理和调度。
通用: 不仅支持MapReduce程序,理论上支持各种计算程序,YARN只负责分配资源,不关心用资源干什么。
资源管理系统: 集群的硬件资源,和程序运行相关,比如内存、cpu等
调度平台: 多个程序同时申请计算资源如何分配,调度的规则
2.2 Yarn架构
Yarn与HDFS一样,也是主从模式,包含以下4个进程
- ResourceManager:****管理整个群集的资源,负责协调调度各个程序所需的资源。(申请资源必须找RM)
- NodeManager:****管理单个服务器的资源,负责调度单个服务器上的资源提供给应用程序使用。
NodeManager通过创建Container容器来分配服务器上的资源。
应用程序运行在NodeManager所创建的容器中。
一个服务器上可以创建多个Container容器,各Container容器之间相互独立,实现了一个服务器上跑多个程序。
Container容器是具体运行 Task(如 MapTask、ReduceTask)的基本单位。
一个NodeManager上会运行多个Container。
- ProxyServer代理服务器: ProxyServer默认继承在ResourceManager中,可以通过配置分离出来单独启动,可以提高YARN在开放网络中的安全性。
- JobHistoryServer历史服务器: 记录历史程序运行信息和日志,开放web ui提供用户通过网页访问日志。
YARN架构图:
ApplicationMaster(App Mstr): 应用程序内的“老大”,负责程序内部各阶段的资源申请,管理整个应用。(当YARN上没有程序运行,则没有这个组件)
一个应用程序对应一个ApplicationMaster。
ApplicationMaster 运行在 Container 中,是应用程序的第一个Container,之后会请求ResourceManager要更多的Container,来运行应用的各个任务(比如 MapTask、ReduceTask)。
YARN 中运行的每个应用程序都有一个自己独立的 ApplicationMaster。(以MapReduce为例,其中的MRAppMaster就是对应的具体实现,管理整个MapReduce程序)
YARN集群部署举例:
node1、node2、node3表示三台服务器,形成一个集群:
node1性能高,因此在node1上运行四个进程:ResourceManager、NodeManager、ProxyServer、JobHistoryServer
在node2及node3上只运行NodeManager进程
2.3 Yarn调度器和调度算法
Hadoop作业调度器主要有三种:FIFO Scheduler(先进先出调度器)、Capacity Scheduler(容量调度器)和Fair Scheduler(公平调度器)
Apache Hadoop 3.1.3默认的资源调度器是CapacityScheduler。
- FIFO Scheduler:单队列,根据提交作业的先后顺序,依次执行
- Capacity Scheduler:多队列,每个队列配置一定的资源,每个队列内部采用FIFO策略
- Fair Scheduler:资源在每个用户之间公平共享,充分利用资源
三、MapReduce
MapReduce是一个分布式计算框架,只能用来做离线计算,不能做实时计算。 MapReduce程序在运行时分为Map阶段和Reduce阶段。
3.1 MapReduce进程
MapReduce程序在运行时有以下三类进程:
- MRAppMaster:负责整个MR程序的过程调度及状态协调
- MapTask: 负责Map阶段的整个数据处理流程
- ReduceTask: 负责Reduce阶段的整个数据处理流程
- 在一个MR程序中MRAppMaster只有一个,MapTask和ReduceTask可以有一个也可以有多个
- 在一个MR程序中可以只有Map阶段
- 在整个MR程序中,数据都是以kv键值对的形式流转的
- 一个MapTask最终只输出一个文件,一个ReduceTask最终也只输出一个文件。
3.2 MapReduce编程规范(了解)
编写MapReduce程序其实就是按照MapReduce框架的要求编写符合自己业务逻辑的Java代码。
用户编写的程序分为三部分:Mapper、Reducer、Driver
- Mapper阶段:
(1)自定义一个继承Mapper的类,重写其中map()方法(这里对应具体业务逻辑)
(2)map()方法的输入是<k,v>键值对,k是偏移量,v对应输入数据中的一行(默认情况下),输出也是<k,v>键值对
(3)map方法对每个<k,v>调用一次(即逐行处理数据)
- Reducer阶段:
(1)自定义一个继承Reducer的类,重写其中reduce()方法(这里对应具体业务逻辑,比如加减乘除运算)
(2)reduce()方法的输入对应map()输出的<k,v>(这里的<k,v>其实是经过shuffle之后的<k,v>,即k相同的<k,v>作为一组输入给reduce方法。例如map()输出(a,1),(a,1),则reduce()输入为(a,[1,1]))
(3)reduce()方法对每一组相同k的<k,v>调用一次
- Driver阶段:
整个MapReduce程序的执行入口,这里写main方法,程序逻辑比较固定,用于指定输入文件路径、输出文件路径、指定自定义的Mapper、Reducer类、提交程序到Yarn集群等
3.3 MapReduce内部工作流程(重点)
MapReduce执行流程(简易版):
如下图,MapReduce包含了MapTask和ReduceTask,红框中是shuffle过程
map()方法之后reduce()方法之前的数据处理过程称为shuffle
Shuffle过程是横跨map和reduce两个阶段的,分别称为Map端的Shuffle和Reduce端的Shuffle
Shuffle中频繁涉及到数据在内存、磁盘之间的多次往复,是导致mapreduce计算慢的原因
MapTask内部执行流程:
Map阶段共2次排序,溢出之前进行快速排序,所有溢出文件合并后进行归并排序
(1) 把所要处理的文件进行逻辑切片(默认每128M一个切片),每一个切片由一个MapTask处理。(默认切片split大小=块block大小)
(2)按行读取切片中的数据,返回<key,value>对,key对应行数,value是本行的文本内容
(3)调用map方法处理数据,每个<key,value>调用一次map方法
(4)对map方法输出的<key,value>对进行分区partition,分区的数量就是reducetask运行的数量
(5)map方法输出的数据写入内存缓冲区,达到比例溢出到磁盘上。溢出spill之前会根据key按照字典序(a~z)对每个溢出文件内部进行快速排序sort(这一步会溢出很多排序好的小文件)
(6)对所有溢出的文件进行最终的merge合并,合并后再次归并排序,形成一个文件(文件中会有多个分区的数据,但一个maptask只输出一个文件)
ReduceTask内部执行流程:
Reduce阶段共1次排序,拉取所有的MapTask对应分区的数据,合并后进行归并排序
(1)ReduceTask会主动从MapTask复制拉取属于对应分区的数据
(2)把拉取来的数据,全部进行合并merge,即把分散的数据合并成一个大的数据,再对合并后的数据进行归并排序
(3)对排序后的key相同<key,value>作为一组调用一次reduce方法。
(4)最后把reduce方法输出的键值对写入到HDFS的文件中。
Shuffle执行流程:
(1)MapTask 收集我们的 map()方法输出的 kv 对,放到内存缓冲区中
(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件(溢出前会进行快排)
(3)多个溢出文件会被合并成大的溢出文件,然后进行归并排序
(4)ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据
(5)ReduceTask 会抓取到同一个分区的来自不同 MapTask 的结果文件,ReduceTask 会将这些文件再进行合并(归并排序)
(6)合并成大文件后,Shuffle 的过程也就结束了,后面进入 ReduceTask 的逻辑运算过程(从文件中取出一个一个的键值对 Group,调用用户自定义的 reduce()方法)
3.4 其他知识(了解)
3.4.1 序列化与反序列化
序列化: 把内存中的对象,转换成字节序列,便于存储到磁盘(持久化)和网络传输(内存 --> 磁盘)
反序列化: 把字节序列或磁盘的持久化数据,转换成内存中的对象
Hadoop中的序列化类型与Java中类型的对应关系如下(Java中的这些类型也可以序列化,只是Hadoop框架重写的这些类的序列化更轻量化,效率更高,便于传输)
PS:这些类型不满足需求时,可以自定义Bean对象实现序列化接口(Writable),重写对应的序列化与反序列化方法
3.4.2 切片split
在MapReduce程序执行开始时通过InputFormat(抽象类)的RecordReader方法读取数据。FileInputFormat抽象类继承于InputFormat类。
FileInputFormat切片机制:
- 每个文件单独切片
- 默认情况下,split切片大小=block块大小
- 每次切片,判断剩下的部分是否大于块的1.1倍,不大于1.1倍就划分成一块切片
TextInputFormat与CombineTextInputFormat:
- TextInputFormat、CombineTextInputFormat都是FileInputFormat的实现类。
- 默认使用TextInputFormat进行数据的读取,按行读取每条记录。<k,v>中k是偏移量,LongWritable类型,v是这行的内容,Text类型。
- 由于默认的TextInputFormat按文件切片,不管文件多小,都会是一个单独的切片,也会对应单独的一个MapTask,如果有多个小文件,会产生大量的MapTask,处理效率低,这时可以在Driver中指定使用CombineTextInputFormat,可以将小文件从逻辑上划分到一个切片中,将多个小文件交给一个MapTask处理。
3.4.3 分区partition
如果要按照条件输出到不同文件中时则要用到分区,n个分区则对应n个ReduceTask,可以在Driver类中使用job.setNumReduceTasks(n)来指定reduceTask的数量为n
分区是在Shuffle阶段中向环形缓冲区写入的时候执行
默认分区规则: 根据<key,value>中key的hashcode对ReduceTask的数量取模确定对应分区,key相同的在一个分区内。
自定义分区: 自定义类继承Partitioner方法,重写getPartition方法,可以指定key对应的分区,可以解决数据倾斜(人工指定分区)
3.4.4 Combiner预聚合
Combiner的父类是Reducer,使用Combiner在每一个MapTask中进行预聚合(局部汇总),可以减少ReduceTask所需聚合的数据量,提升计算效率
Combiner在Shuffle阶段中共执行两次:
第一次是对每个环形缓冲区溢出的文件进行预聚合
第二次是在所有溢出的文件合并且归并排序后再进行一次预聚合
最终保证每个MapTask输出的最终文件是聚合好的
Combiner和Reducer的区别:
Combiner是在每一个MapTask所在节点运行
Reducer是在接收到所有Mapper的输出结果后运行
Combiner相当于是把Reducer的聚合逻辑提前到每个Mapper中进行局部的聚合
Combiner的使用不能影响最终业务逻辑,例如求平均值则不能用,求和则可以用。
3.4.5 Reduce Join和Map Join
Reduce Join:
Map端: 来自不同表/文件的<k,v>对,打上标签用于区别不同的表,用连接字段作为key,其余部分和新加的标签作为value,输出后经过shuffle给reduce
Reduce端: 接收到一组k相同<k,v>对,在每一组中可以通过map端所增加标签区分表,编写合并的逻辑进行处理(a表与b表,连接字段相同的数据被分到一组,即一个reduce中)
Map Join:
在 Map 端缓存多张表,提前处理业务逻辑,这样增加 Map 端业务,减少 Reduce 端数据的压力,尽可能的减少数据倾斜。
3.4.6 压缩
压缩优点: 减少磁盘IO、减少磁盘存储空间
压缩缺点: 增加CPU开销(压缩、解压都需要CPU运行)
运算密集型的mapreduce程序,少用压缩
io密集型的mapreduce程序,多用压缩
压缩可以在MapReduce的任意阶段使用。
常见压缩格式:
压缩格式Hadoop自带算法文件扩展名是否可切片换成压缩格式后,原来程序是否需要修改Deflate是,直接使用Deflate.deflate否和文本处理一样,不需要修改Gzip是,直接使用Default.gz否和文本处理一样,不需要修改bzip2是,直接使用bzip2.bz2是和文本处理一样,不需要修改LZO否,需要安装LZO.lzo是需要建索引,还需要指定输入格式Snappy是,直接使用Snappy.snappy否和文本处理一样,不需要修改
压缩性能:
压缩比:Snappy < LZO < Gzip < bzip2
解压速度:Snappy > LZO > Gzip > bzip2
选用哪种压缩方式看三点:1.解压缩速度 2.压缩比 3.是否支持切片 根据不同场景选择不同压缩方式
要求压缩解压速度快,则一般使用snappy和lzo,若同时要求切片,则只能选用lzo
要求存储空间小,则一般使用gzip和bzip2,若同时要求切片,则只能选用bzip2
压缩算法原始文件大小压缩文件大小压缩速度解压速度gzip8.3GB1.8GB17.5MB58MB/sbzip28.3GB1.1GB2.4MB9.5MB/sLZO8.3GB2.9GB49.3MB74.6MB/sSnappy8.3GB4.1GB250MB500MB/s
搭建Hadoop集群时常用的配置文件:
配置文件说明core-site.xml配置Hadoop的基本属性hdfs-site.xml配置HDFS的基本属性yarn-site.xml配置Yarn的基本属性mapred-site.xml配置MapReduce的基本属性hadoop-env.sh配置Hadoop的环境变量,例如JAVA_HOMEworkers保存从节点(slave节点)的信息
版权归原作者 想当运维的程序猿 所有, 如有侵权,请联系我们删除。