前言
项目效果展示
项目源码免费获得请私信博主,绝对免费!
目录
- Linux基础命令:往期博客Linux课堂篇3_Linux目录结构、快捷键、常用基础命令
- Hadoop3.2.1介绍与环境搭建
- Hive3.1.2介绍与环境搭建
- 数据的爬取与清洗
- 项目搭建
一、Hadoop3.2.1介绍与环境搭建
大数据指的是在一定的时间范围内无法使用常规软件进行存储、计算的数据集合。通俗理解:描述的是人类在信息爆炸时代所产生的海量数据。
广义上来讲,大数据的一个生态圈,包括很多其他的软件(如spark/kafka/hive/hbase/zookeeper/等等),是一个适合大数据的分布式存储和计算平台。
架构发展:单体—>集群—>分布式
- 分布式:将多台服务器集中在一起,每台服务器都实现总体的不同业务,做不同的事。 举例:饭店有3个厨师,做菜分为洗菜、切菜、下锅3个步骤。3个人进行分工,有个人专门洗菜,有个人专门切菜,有个人专门下锅,从而完成做菜这个事情。
- 集群:指将多台服务器集中在一起,每台服务器都实现相同的业务,做相同的事情。 举例:饭店有3个厨师,做菜分为洗菜、切菜、下锅3个步骤。3个人同时洗菜,完成洗菜再同时切菜,完成切菜再同时下锅,从而完成做菜这个事情。
集群不一定是分布式,分布式一定是集群。
1、Hadoop发展史
演化发展:Lucene->Nutch->Hadoop。
- 阶段1:Lucene–Doug Cutting用Java语言编写的开源软件,实现与Google类似的全文搜索功能,它提供了全文检索引擎的架构,包括完整的查询引擎和索引引擎 。在2001年年底成为apache基金会的一个子项目。对于大数量的场景,Lucene面对与Google同样的困难,存储数据困难,检索速度慢。
- 阶段2:可以说Google是hadoop的思想之源(Google在大数据方面的三篇论文)谷歌的三驾马车:GFS —>HDFS。Map-Reduce —>MR。BigTable —>Hbase
- 阶段3:Lucene学习和模仿Google解决这些问题的办法,演变成了微型版Nutch。2003-2004年,Google公开了部分GFS和Mapreduce思想的细节,以此为基础Doug Cutting等人用了2年业余时间实现了DFS和Mapreduce机制,使Nutch性能飙升 。2005 年Hadoop 作为Nutch的一部分正式引入Apache基金会。2006 年 3 月份,Map-Reduce和Nutch Distributed File System (NDFS) 分别被纳入称为 Hadoop 的项目中 。
版本:Hadoop 三大发行版本: Apache、Cloudera 、Hortonworks。Apache版本最原始(最基础)的版本,对于入门学习最好。
组成部分:hadoop是由Hadoop Common、Hadoop Ozone、HDFS、Mapreduce以及Yarn 5个Module组成。
2、HDFS概念
随着数据量越来越大,在一个文件系统下无法存储海量数据,普通硬件支持的操作系统即使扩展磁盘也会遇到瓶颈,迫切需要水平横向扩展来解决数据存储问题,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。HDFS只是分布式文件管理系统中的一种,其它的比如TFS、fastdfs。
Hadoop HDFS(hadoop distribute file system ):一个高可靠、高吞吐量的分布式文件系统。一说到分布式,就要想到需要有多台机器担任不同角色,承担不同职责,完成共同的目标。这里的hdfs的角色分为namenode、datanode、secondarynamenode,完成存储文件的目标。在从hdfs读取文件或者是写文件的时候,客户端必须先找到namenode,然后再找datanode。
(1) Namenode,简称nn,即名称节点,它是集群的老大,一个java进程,启动在某个机器之上,用于存储文件的元数据,元数据就是描述数据(文件)的数据,例如文件名,文件目录结构,文件属性(生成时间、副本数、权限),以及每个文件的块列表,还有块所在的datanode等。
(2) Datanode,简称dn,即数据节点,它是集群的小弟,也是一个java进程,一个集群中,可以在多台机器上启动datanode进程。其作用是在本地文件系统存储文件块数据,以及块数据的校验和。
(3) Secondarynamenode,第二名称节点,简称2nn,用来辅助namenode,也是一个java进程,每隔一段时间获取hdfs元数据的快照。
优点
- (1)高容错 数据自动保存多个副本。它通过增加副本的形式,提高容错性; 某一个副本丢失以后,其它副本有备份,能够自动恢复。
- (2)块存储 当我们向hdfs存储文件的时候,hdfs会将文件物理上切块存储,例如存储一个200M的文件,hdfs将该文件切分为两块:128M、72M,将这两块进行存储,而不是将该文件作为一个整体进行存储。 a. 最佳传输损耗原理:当读取一个文件的时候,所需要的时间分为两部分:寻址时间、传输时间。根据目前的技术水平,机械磁盘的寻址时间普遍在10ms左右,而传输时间取决于要读取文件的大小,读取的文件越大,所需要的传输时间越长,那么,一次读取多少文件效率最高呢?根据经验寻址时间占传输时间1%的时候,是传输一个文件效率最高的。
- (3)适合处理大数据集 数据规模:能够处理数据规模达到GB、TB、甚至PB级别的数据。 文件规模:能够处理百万规模以上的文件数量,数量相当之大。
- (4)可靠性 因为具备高容错性,所以集群可以部署在廉价的PC机上,并能够保证数据不易丢失。
缺点
- (1)不适合低延迟的数据访问 访问hdfs数据的速度相对较慢,比如像mysql那样毫秒级的读写数据,hdfs是做不到的
- (2)不适合存储大量小文件 在读取小文件时,寻址时间会超过传输时间,不符合最佳传输损耗原理,效率太低,因此不会采用hdfs存储大量小文件。 存储大量小文件的话,它会占用NameNode更多的内存来存储文件的元数据。这样是不可取的,因为的内存总是有限的。在hdfs里面,每个block的元数据会占用150字节的内存空间。例如同样存储10M的内容,采取副本数为3的备份机制,这10M放在一个文件中,所用到的元数据占用的内存为1503=450字节,如果把这10M放到10个文件中,则占用15010*3=4500字节,很明显,存储相同的内容,第二种方式占用的内存更多。
- (3)不支持并发写入以及随机修改 一个文件只能有一个用户写,不允许多个线程同时写; 仅支持数据append(追加),不支持文件的随机修改。
HDFS架构设计:HDFS和现有的分布式文件系统有很多共同点。但同时,它和其他的分布式文件系统的区别也是很明显的。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用,这得益于它优秀的架构设计。Hdfs在写入数据的时候只能单线程写 读数据的时候可以并发读
1、写数据流程分析
(1)客户端向hdfs上传文件,首先向NameNode请求上传文件,NameNode会做一些验证,比如检查目标文件是否已存在,父目录是否存在。
(2)NameNode经过验证后,向客户端返回是否可以上传。
(3)如果客户端收到可以上传的回复,则会向namenode请求第一个 block上传到哪几个datanode服务器上。
(4)NameNode返回客户端可用的3个datanode的节点地址,分别为datanode1、datanode2、datanode3。
(5)客户端通过FSDataOutputStream模块请求datanode1上传数据,datanode1收到请求会继续调用datanode2,然后datanode2调用datanode3,将这个通信管道建立完成。
(6)datanode1、datanode2、datanode3逐级应答客户端。
(7)客户端开始往datanode1上传第一个block,以packet为单位,datanode1收到一个packet不仅存储到本地磁盘还会将该packet复制一份传给datanode2,datanode2传给datanode3;
当一个block传输完成之后,客户端再次请求NameNode上传第二个block的服务器。(重复执行3-7步)。
2、读数据流程分析
(1)客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。
(2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。
(3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以packet为单位来做校验)。
(4)客户端以packet为单位接收,先在本地缓存,然后写入目标文件。
(5)客户端将所有的块下载下来之后,在本地将所有的块拼接成一个文件。
3、MapReduce概念
Hadoop MapReduce:一个分布式的离线并行计算框架。MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。简单理解就是简化我们需要使用分布式机器进行大数据量计算统计任务操作的工具
MapReduce将计算过程分为两个阶段:Map(映射)和Reduce(归约)。当数据量比较小的时候,一台机器能够处理,但是当数据量非常大了,例如10T,这个时候,我们就需要考虑将数据分给多台机器来处理,比如分给10台机器,这个过程我们可以理解为map的过程。当这10台机器处理完毕了,都生成了各自的结果,最后需要将结果汇总,那么汇总的这个过程,可以成之后reduce的过程。
(1) Map阶段并行处理输入数据
(2) Reduce阶段对Map结果进行汇总
MapReduce进程:程序由单机版扩成分布式版时,会引入大量的复杂工作,如运算至少分为两个过程,先并行计算,然后统一汇总,这两个阶段如何启动如何协调,数据找程序还是程序找数据,任务由谁分配怎么分配,如何处理容错,如何监控,出错如何重试…,MapReduce 把大量分布式程序都会涉及的到的内容都封装起来,由三类进程去管理,让开发人员可以将精力集中于业务逻辑。以下便是这三类进程。
- MrAppMaster:负责整个程序的过程调度及状态协调。
- MapTask:负责map阶段的整个数据处理流程。计算方向
- ReduceTask:负责reduce阶段的整个数据处理流程。汇总方向
MapReduce的思想核心是“分而治之”,用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
- Mapper阶段 :该阶段由框架把输入数据以kv对的形式读取进来,我们需要将其处理之后再组装为kv对写出到reduce。 (1)用户自定义的Mapper要继承自己的父类 (2)Mapper的输入数据是KV对的形式(KV的类型可自定义) (3)Mapper中的业务逻辑写在map()方法中 (4)Mapper的输出数据是KV对的形式(KV的类型可自定义) (5)map()方法(MapTask进程)对每一个<K,V>调用一次
- Reducer阶段:该阶段接收来自mapper写出的kv对数据,在这里经过处理后,再组装好kv对写出到结果文件 (1)用户自定义的Reducer要继承自己的父类 (2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV (3)Reducer的业务逻辑写在reduce()方法中 (4)ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
- Driver阶段:相当于yarn集群的客户端,用于提交我们整个程序到yarn集群,提交的是封装了MapReduce程序相关运行参数的job对象。然后由yarn去执行我们写好的mapper、reduce,分别生成MapTask、ReduceTask。
1、单词计数案例
完整源码见github:https://github.com/sichaolong/spring-demo/tree/main/springboot-mapreduce-demo
2、序列化之统计手机号码案例
序列化概念
- 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。
- 反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。
java和hadoop序列化对比
- Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable),特点如下:
- 紧凑 紧凑的格式能让我们充分利用网络带宽,而带宽是数据中心最稀缺的资源。
- 快速进程通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的。
- 可扩展协议为了满足新的需求变化,所以控制客户端和服务器过程中,需要直接引进相应的协议,这些是新协议,原序列化方式能支持新的协议报文
- 互操作能支持不同语言写的客户端和服务端进行交互。
常用数据序列化类型
Java类型 Hadoop Writable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable
自定义序列化对象(实现接口Writable):Hadoop给我们准备的常用类型已经具备hadoop的序列化标准了,可以进行序列化传输,而我们自定义bean对象要想序列化传输,必须实现序列化接口,需要注意以下几项:
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
(3)重写序列化方法,(注意序列化的字段的顺序必须与反序列化的顺序一致)
(4)重写反序列化方法
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。
案例:统计手机号年消费,统计每一个手机号全年的总话费(注意,虚拟网包月费属于赠送费,不计入在内)
1、需求统计每一个手机号全年的总话费(注意,虚拟网包月费属于赠送费,不计入在内)。
2、数据准备
输入数据:txt文件
A 13939119984357820201901
R 13539142240368130201902
C 13436755071520182845201901
S 1761259147882081610201902
E 1363674466651052015201903
E 1363674466652083310201902
F 1343677395463062715201911
E 1363674466661083310201912
F 1343677395462161315201906
G 1343920555552072310201903
G 1343920555552372310201907
H 139331399853158828201907
I 133291427405608330201907
T 13535755061230582540201908
K 1781359167862581220201901
L 1373634459562222210201903
M 1383676465593044010201911
N 1363667396433255015201912
O 1363687356351022310201911
P 1353877495232233315201909
Q 1363921559262052510201902
U 1353928276552092910201903
数据说明:
期望输出结果:
3、分析
基本思路:
Map阶段:
a.读取一行数据,切分字段;
b.抽取手机号、套餐基本费、语音通信费、短信彩信费、流量费;
c.以手机号为key,bean对象为value输出,即context.write(手机号,bean)。
Reduce阶段:
a.累加套餐基本费、语音通信费、短信彩信费、流量费得到总花费;
b.实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输;
c. MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key。
4、编写mapreduce程序
(1)编写流量统计的bean对象
package com.bigdata.phonefee;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
// 如果要将自定义类的对象放到keyout,或者valueout的位置上,hadoop会将这些对象进行序列化和反序列化,
// 因此必要让我们自定义的类具备hadoop序列化的要求
// 1 实现Writable接口
public class Phone implements Writable {
private Long baseFee;
private Long voiceFee;
private Long msgFee;
private Long flowFee;
private Long sumFee;
// 2 准备空参构造
public Phone(){}
public Phone(Long baseFee, Long voiceFee, Long msgFee, Long flowFee, Long sumFee){
this.baseFee = baseFee;
this.voiceFee = voiceFee;
this.msgFee = msgFee;
this.flowFee = flowFee;
this.sumFee = sumFee;}
// 设置参数的便利方法
public void setFee(Long baseFee, Long voiceFee, Long msgFee, Long flowFee){
this.baseFee = baseFee;
this.voiceFee = voiceFee;
this.msgFee = msgFee;
this.flowFee = flowFee;
this.sumFee = baseFee + voiceFee + msgFee + flowFee;}
public Long getBaseFee(){return baseFee;}
public void setBaseFee(Long baseFee){
this.baseFee = baseFee;}
public Long getVoiceFee(){return voiceFee;}
public void setVoiceFee(Long voiceFee){
this.voiceFee = voiceFee;}
public Long getMsgFee(){return msgFee;}
public void setMsgFee(Long msgFee){
this.msgFee = msgFee;}
public Long getFlowFee(){return flowFee;}
public void setFlowFee(Long flowFee){
this.flowFee = flowFee;}
public Long getSumFee(){return sumFee;}
public void setSumFee(Long sumFee){
this.sumFee = sumFee;}
// 3 准备序列化的方法,指定将哪些属性 进行序列化和反序列化
// 不是Write方法
public void write(DataOutput out) throws IOException {
out.writeLong(baseFee);
out.writeLong(voiceFee);
out.writeLong(msgFee);
out.writeLong(flowFee);
out.writeLong(sumFee);}
// 4 准备反序列化的方法,注意:序列化和反序列化的属性的顺序要一致
public void readFields(DataInput in) throws IOException {
this.baseFee = in.readLong();
this.voiceFee = in.readLong();
this.msgFee = in.readLong();
this.flowFee = in.readLong();
this.sumFee = in.readLong();}
// 5 toString
@Override
public String toString(){return baseFee+"\t"+voiceFee+"\t"+msgFee+"\t"+flowFee+"\t"+sumFee;}}
(2)编写mapper
package com.bigdata.phonefee;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
// 因为我们要在reduce对某个手机号的多条消费数据进行累加
//因此,我们就想让同个手机号的多条数据进入到同一个分组,进而调用一次reduce方法
// 进而才有机会对同个手机号的多条数据进行累加,手机号用Text表示
// 因为每条数据有多个消费情况,例如基本费,语音通信费,短信费,用IntWritable这样的数据类型,不足以封装
// 因此,创建一个Phone的实体类,用该类封装这些数据
public class PhonefeeMapper extends Mapper<LongWritable, Text, Text, Phone>{
Text k = new Text();
Phone v= new Phone();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 将每行数据转成 String , A 13939119984357820201901
String line = value.toString();
// 2 按照tab分割 [A,13939119984,3,5,7,8,20,201901]
String[]split= line.split("\t");
// 3 挑出手机号,各项话费
String phoneNum = split[1];
String baseFee = split[2];
String voiceFee = split[3];
String msgFee = split[4];
String flowFee = split[5];
// 4 封装kv
k.set(phoneNum);
//v.setBaseFee(Long.parseLong(baseFee));
//v.setVoiceFee(Long.parseLong(voiceFee));
v.setFee(Long.parseLong(baseFee),Long.parseLong(voiceFee),Long.parseLong(msgFee),Long.parseLong(flowFee));
// 5 将kv写出
context.write(k,v);}}
(3)编写reducer
package com.bigdata.phonefee;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
// 让手机号担任keyout
// 让手机号全年的话费情况当valueout
// <手机号,总话费Phone>
public class PhonefeeReduce extends Reducer<Text, Phone,Text,Phone>{
Phone v= new Phone();
@Override // 分组调用
protected void reduce(Text key, Iterable<Phone> values, Context context) throws IOException, InterruptedException {
//<13636744666, 51052018>
//<13636744666, 52083318>
//<13636744666, 61083318>
// 累加各项费用
long sumBaseFee =0;
long sumVoiceFee =0;
long sumMsgFee =0;
long sumFlowFee =0;for(Phone value : values){
Long baseFee = value.getBaseFee();
Long voiceFee = value.getVoiceFee();
Long msgFee = value.getMsgFee();
Long flowFee = value.getFlowFee();
sumBaseFee = sumBaseFee + baseFee;
sumVoiceFee = sumVoiceFee + voiceFee;
sumMsgFee = sumMsgFee + msgFee;
sumFlowFee = sumFlowFee + flowFee;}
v.setFee(sumBaseFee,sumVoiceFee,sumMsgFee,sumFlowFee);
// 将kv写出
context.write(key,v);}}
(4)编写驱动
package com.bigdata.phonefee;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PhonefeeDriver {
public static void main(String[] args) throws Exception {
// 1 创建一个配置对象
Configuration conf = new Configuration();
// 2 通过配置对象创建一个job
Job job = Job.getInstance(conf);
// 3 设置job的mr的路径(jar包的位置)
job.setJarByClass(PhonefeeDriver.class);
// 4 设置job的mapper类 reduce类
job.setMapperClass(PhonefeeMapper.class);
job.setReducerClass(PhonefeeReduce.class);
// 5 设置job的mapper类的keyout,valueout
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Phone.class);
// 6 设置job的最终输出的keyout,valueout
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Phone.class);
// 7 设置job的输入数据的路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
// 8 设置job的输出数据的路径 得保证,输出目录不能事先存在,否则报错,
Path outPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)){
fs.delete(outPath,true);}
FileOutputFormat.setOutputPath(job,outPath);
// 9 提交job到yarn集群
boolean b = job.waitForCompletion(true);
System.out.println("是否运行成功:"+b);}
4、Yarn概念
Hadoop Yarn:作业调度与集群资源管理的平台。Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。
Yarn也是分布式的平台,也是分角色的,用于接收用户提交的mapreduce作业,并将这些作业分配给集群内的机器执行。该架构由resourcemanager、nodemanager组成,resourcemanager是集群的老大,掌管整个集群的资源,是指挥者,不负责具体干活,而nodemanager是集群的小弟,掌管某个机器的资源,是具体干活的。
架构:YARN主要由ResourceManager、NodeManager、ApplicationMaster(AM)和Container等组件构成,如图所示:
执行流程:用户先把mapreduce作业提交给resourcemanager,然后resourcemanager会找到某个负载较低的nodemanager,在其机器上生成一个applicationMaster进程,将该mapreduce作业交给applicationMaster进程,由该进程全程调度指挥执行。
(1) ResourceManager(rm):是一个java进程,是整个集群的老大,启动在某台机器上。处理客户端请求、启动/监控ApplicationMaster、监控NodeManager、资源分配与调度。
(2) NodeManager(nm):是一个java进程,启动在某台机器上,是某台机器的老大。单个节点上的资源管理、处理来自ResourceManager的命令、处理来自ApplicationMaster的命令。
(3) ApplicationMaster:是一个java进程,启动在某台机器上,是某个mapreduce任务的老大。数据切分、为应用程序申请资源,并分配给内部任务、任务监控与容错。
(4) Container:对任务运行环境的抽象,用于运行mapreduce任务。封装了CPU、内存等多维资源以及环境变量、启动命令等任务运行相关的信息。
工作机制
(1)Mr程序提交到客户端所在的节点。
(2)Yarnrunner向Resourcemanager申请一个Application。
(3)rm将该应用程序的资源路径返回给yarnrunner。
(4)该程序将运行所需资源提交到HDFS上。
(5)程序资源提交完毕后,申请运行mrAppMaster。
(6)RM将用户的请求初始化成一个task。
(7)其中一个NodeManager领取到task任务。
(8)该NodeManager创建容器Container,并产生MRAppmaster。
(9)Container从HDFS上拷贝资源到本地。
(10)MRAppmaster向RM 申请运行maptask资源。
(11)RM将运行maptask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动maptask,maptask对数据分区排序。
(13)MrAppMaster等待所有maptask运行完毕后,向RM申请容器,运行reduce task。
(14)reduce task向maptask获取相应分区的数据。
(15)程序运行完毕后,MR会向RM申请注销自己。
注:MRAppMaster是MapReduce的ApplicationMaster实现,它能使MapReduce程序在Yarn上执行。
任务调度器:当多个任务来临,Yarn怎么分配每个任务的处理?Yarn提供了多种调度器和可配置的策略供我们选择。目前,Hadoop作业调度器主要有三种:
- 先进先出调度器(FIFO):FIFO Scheduler把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推。
- 容量调度器(Capacity Scheduler):在FIFO 调度器中,小任务会被先提交的大任务阻塞。该调度器解决这个问题,分为多个任务队列
- 动态公平调度器(Fair Scheduler):在FIFO 调度器中,小任务会被大任务阻塞。而对于Capacity调度器,有一个专门的队列用来运行小任务,但是为小任务专门设置一个队列会预先占用一定的集群资源,这就导致大任务的执行时间会落后于使用FIFO调度器时的时间。在Fair调度器中,我们不需要预先占用一定的系统资源,Fair调度器会为所有运行的job动态的调整系统资源。当第一个大job提交时,只有这一个job在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair调度器会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。
Hadoop3.2.1默认的资源调度器是Capacity Scheduler,可以在yarn-default.xml文件中查看。
Yarn HA集群:ResourceManager(RM)负责管理群集中的资源和调度应用程序(如MR、Spark等)。在Hadoop 2.4之前,YARN群集中的ResourceManager存在SPOF(Single Point of Failure,单点故障)。为了解决ResourceManager的单点问题,YARN设计了一套Active/Standby模式的ResourceManager HA(High Availability,高可用)架构。在运行期间有多个ResourceManager同时存在来增加冗余进而消除这个单点故障,并且只能有一个ResourceManager处于Active状态,其他的则处于Standby状态,当Active节点无法正常工作,其余Standby状态的几点则会通过竞争选举产生新的Active节点。也就是配合Zookeeper完成主从模式+自动故障转移
配合Zookeeper完成自动故障转移原理(手动的直接执行yarn命令即可):YARN这个Active/Standby模式的RM HA架构在运行期间,会有多个RM同时存在,但只能有一个RM处于Active状态,其他的RM则处于Standby状态,当Active节点无法正常提供服务,其余Standby状态的RM则会通过竞争选举产生新的Active节点。以基于ZooKeeper这个自动故障切换为例,切换的步骤如下:
主备切换,RM使用基于ZooKeeper实现的ActiveStandbyElector组件来确定RM的状态是Active或Standby。
创建锁节点,在ZooKeeper上会创建一个叫做ActiveStandbyElectorLock的锁节点,所有的RM在启动的时候,都会去竞争写这个临时的Lock节点,而ZooKeeper能保证只有一个RM创建成功。创建成功的RM就切换为Active状态,并将信息同步存入到ActiveBreadCrumb这个永久节点,那些没有成功的RM则切换为Standby状态。
注册Watcher监听,所有Standby状态的RM都会向/yarn-leader-election/cluster1/ActiveStandbyElectorLock节点注册一个节点变更的Watcher监听,利用临时节点的特性,能够快速感知到Active状态的RM的运行情况。
准备切换,当Active状态的RM出现故障(如宕机或网络中断),其在ZooKeeper上创建的Lock节点随之被删除,这时其它各个Standby状态的RM都会受到ZooKeeper服务端的Watcher事件通知,然后开始竞争写Lock子节点,创建成功的变为Active状态,其他的则是Standby状态。
Fencing(隔离),在分布式环境中,机器经常出现假死的情况(常见的是GC耗时过长、网络中断或CPU负载过高)而导致无法正常对外进行及时响应。如果有一个处于Active状态的RM出现假死,其他的RM刚选举出来新的Active状态的RM,这时假死的RM又恢复正常,还认为自己是Active状态,这就是分布式系统的脑裂现象,即存在多个处于Active状态的RM,可以使用隔离机制来解决此类问题。
YARN的Fencing机制是借助ZooKeeper数据节点的ACL权限控制来实现不同RM之间的隔离。这个地方改进的一点是,创建的根ZNode必须携带ZooKeeper的ACL信息,目的是为了独占该节点,以防止其他RM对该ZNode进行更新。借助这个机制假死之后的RM会试图去更新ZooKeeper的相关信息,但发现没有权限去更新节点数据,就把自己切换为Standby状态。
5、Common、Ozone概念
Hadoop Common:支持其他Hadoop模块的通用实用程序。提供其他模块所用的一系列工具类以及封装rpc通信框架。
Ozone:新的对象存储系统,可用于小文件和大文件存储,设计的目的是为了填充社区在对象存储方面的不足,同时能够提供百亿甚至千亿级文件规模的存储。OZone与HDFS有着很深的关系,在设计上也对HDFS存在的不足做了很多改进,使用HDFS的生态系统可以无缝切换到OZone。Ozone同时支持hadoop102.x和hadoop103.x集群,能够和运行其上的Hive,Spark 等应用无缝集成。但是至今并未部署在生产环境中。
6、Hadoop3.2.1分布式环境搭建
部署模式,使用Vmware虚拟机安装三台ContOS7.6的服务器,搭建伪集群模式。
- 单机 (Standalone):默认情况下,Hadoop被配置成以非分布式模式运行的一个独立Java进程。这对调试非常有帮助。
- 伪分布式(Pesudo Distributed):Hadoop可以在单节点上以所谓的伪分布式模式运行,此时每一个Hadoop守护进程都作为一个独立的Java进程运行。
- 集群 (Cluster):会将N个节点组成Hadoop集群,主节点和从节点分开部署在不同的机器上
- HA高可用 (High Available):主要用于生产环境部署,即高可用(7*24小时不中断服务)。解决hadoop的单点故障
Hadoop搭建与整合springboot步骤见博客:大数据疫情可视化平台2_虚拟机搭建Hadoop3.2.1环境、springboot整合hdfs
springboot整合MapReduce的举栗:https://github.com/sichaolong/spring-demo/tree/main/springboot-mapreduce-demo
二、Hive介绍与环境搭建
1、Hive概念
Hive是基于Hadoop的一个数据仓库管理工具,可以将结构化的数据文件映射为一张表,并提供类SQL查询功能。可以理解为一个将SQL转换为MapReduce的任务的工具,甚至更进一步可以说hive就是一个MapReduce的客户端。
- 采用类SQL语法去操作数据,提供快速开发的能力。
- 避免了去写MapReduce,减少开发人员的学习成本。
- 功能扩展很方便。
2、Hive架构
Hive通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回的结果输出到用户交互接口。
基于Hadoop使用HDFS进行存储,使用MapReduce进行计算。
3、Hive3.1.2环境搭建
见博客:大数据疫情可视化平台3_虚拟机搭建Hive3.1.2环境(JDK11需要降为JDK8)
三、数据的爬取与清洗
第一步是疫情数据爬取,新建Maven子模块epidemic-situation-data
第二步是对爬取的疫情数据清洗,新建Maven子模块epidemic-situation-data-etl
第三步就是将清洗的数据上传到Hive
修改文件
part-r-00000
的名称为
gn
使用远程工具将清洗后的数据上传到 服务器的
/home/xiaosi/data/hive-3.1.2/etl
位置,如果mobax上传不成功可以使用xftp
# 1、启动hadoop
start-all.sh
打开可视化界面:http://hadoop101:9870/explorer.html#/# 2、启动hive
(1)先执行 hive --service metastore启动metastore服务(该窗口会一直阻塞)
(2)新开窗口启动hive服务:hive --service hiveserver2
(3)建表
create table gn(cname string,cdate string, diagnose int,cureNum int, deathNum int,newAdd int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';(4)加载数据
load data local inpath '/home/xiaosi/data/hive-3.1.2/etl/gn' into table default.gn;(5) 查看数据
select * from gn;
四、项目搭建
新建Maven子模块epidemic-situation-data-china-hive,整合echarts图表。
然后三台虚拟机,启动Hadoop、Hive以及测试
# 启动hdfs和yarn
start-all.sh
# 测试hdfs是否启动成功:http://hadoop101:9870/dfshealth.html#tab-overview# 测试yarn是否启动成功:http://hadoop102:8088/cluster# 启动hive
hive --service metastore
hive --service hiveserver2
# 测试hive是否启动成功以及是否可以java客户端连接(hadoop101:10000是在hive的core-site.xml文件配置的)
beeline -u jdbc:hive2://hadoop101:10000/defaul
启动项目,访问localhost:10086测试
版权归原作者 scl、 所有, 如有侵权,请联系我们删除。