目录
前言
MapReduce是hadoop的核心组件之一,hadoop要分布式包括两部分,一是分布式文件系统hdfs,一是分布式计算框,就是mapreduce,二者缺一不可,也就是说,可以通过mapreduce很容易在hadoop平台上进行分布式的计算编程
sftp命令:Windows下登录Hadoop102
xftp root@hadoop102
,
lcd
切换Windows路径,
cd
切换Linux路径,
get
从Linux下载到Windows,
put
从Windows上传到Linux
一、概述
1. MapReduce定义、优缺点
🧮MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop 的数据分析应用”的核心框架。MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上
🧮优点:
- 易于编程。用户只关心业务逻辑。实现框架的接口
- 良好扩展性:可以动态增加服务器,解决计算资源不够问题
- 高容错性。任何一台机器挂掉,可以将任务转移到其他节点
- 适合海量数据计算(TB/PB)几千台服务器共同计算
🧮缺点:
- 不擅长实时计算。像MySQL毫秒级别的不擅长
- 不擅长流式计算。静态数据读取计算
- 不擅长DAG有向无环图计算(上一个计算任务的输出成为下一个计算任务的输入,迭代计算)
2. MapReduce核心思想、进程
🧮MapReduce核心思想图示:
“分而治之”是MapReduce的核心思想,它表示把一个大规模的数据集切分成很多小的单独的数据集,然后放在多个机器上同时处理。MapReduce把整个并行运算过程高度抽象到两个函数上,一个是map另一个是reduce。Map函数就是分而治之中的“分”,reduce函数就是分而治之中的“治”
🧮MapReduce进程:一个完整的MapReduce程序在分布式运行时有三类实例进程
- MrAppMaster:负责整个程序的过程调度及状态协调(负责任务管理)
- MapTask:负责Map阶段的整个数据处理流程
- ReduceTask:负责Reduce阶段的整个数据处理流程
3. MapReduce编程——WordCount
🧮官方WordCount源码:采用IDEA反编译源码,发现WordCount案例有Map类、Reduce类和驱动类。且数据的类型是 Hadoop自身封装的序列化类型
# 打开一个cmd,下载含有WordCount案例的jar包sftp root@hadoop102
lcd F:\cd /opt/module/hadoop/share/hadoop/mapreduce
get hadoop-mapreduce-examples-3.1.3.jar
IDEA轻松反编译jar包👉点此前往,不再赘述…
🧮常用数据序列化类型:
java 类型Hadoop Writable 类型BooleanBooleanWritableByteByteWritableIntIntWritableFloatFloatWritableLongLongWritableDoubleDoubleWritableString****TextMapMapWritableArrayArrayWritableNullNullWritable
🧮MapReduce****编程规范:用户编写的程序分成三个部分: Mapper、Reducer 和 Driver
- Mapper阶段
用户自定义的Mapper要继承自己的父类
Mapper的输入数据是K-V对的形式(K-V的类型可自定义)
Mapper中的业务逻辑写在map()方法中
Mapper的输出数据是K-V对的形式(K-V的类型可自定义)
map()方法(MapTask进程)对每一个<K,V>调用一次
- Reducer阶段
用户自定义的Reducer要继承自己的父类
Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
Reducer的业务逻辑写在reduce()方法中
ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
- Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象
🧮MapReduce案例实操:
继上次完成maven配置后,IDEA进入maven_workspace,在java文件夹下创建
com.ygy.mapreduce.wordcount
包,创建如下三个java类
🌰WordCountMapper类的代码如下(注意导包):
importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;/**
* public class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
*
* KEYIN,map阶段输入key的类型:LongWritable
* VAULEIN,map阶段输入value类型:Text
* KEYOUT,map阶段输出的key类型:Text
* VALUEOUT,map阶段输出的value类型:IntWritable
*/publicclassWordCountMapperextendsMapper<LongWritable,Text,Text,IntWritable>{// 固定格式(outKey,1)privateText outKey =newText();privateIntWritable outValue =newIntWritable(1);// 重写 map方法@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,IntWritable>.Context context)throwsIOException,InterruptedException{// 1.读取一行输入的数据// ygy ygyString line = value.toString();// 2.以空格为分隔符切割数据// ygy// ygyString[] words = line.split(" ");// 3.循环写出for(String word : words){// 封装outKey
outKey.set(word);// 写出
context.write(outKey,outValue);}}}
context.write()
方法是 Apache Hadoop 中的一个重要方法,主要用于输出结果。它是
org.apache.hadoop.mapreduce.Reducer
类的一个成员方法,通常在 Reducer 的 reduce() 方法中调用。它的作用是将计算得到的键/值对写入到输出上下文中,在使用
context.write()
方法之前将键和值序列化为字节数组,因为 Hadoop 中的 MapReduce 框架会将这些字节数组传输到输出端
🌰WordCountReducer类的编写(注意导包):
importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;/**
* public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
*
* KEYIN,reduce阶段输入key的类型:Text
* VAULEIN,reduce阶段输入value类型:LongWritable
* KEYOUT,reduce阶段输出的key类型:Text
* VALUEOUT,reduce阶段输出的value类型:IntWritable
*/publicclassWordCountReducerextendsReducer<Text,IntWritable,Text,IntWritable>{// 固定格式(outKey,1)privateIntWritable outValue =newIntWritable();// 重写 reduce方法@Overrideprotectedvoidreduce(Text key,Iterable<IntWritable> values,Reducer<Text,IntWritable,Text,IntWritable>.Context context)throwsIOException,InterruptedException{// 1.定义一个计数器int sum =0;// ygy,(1,1,1,1,1,1,1,1,1,1,1,1,1,1,1)// 2.循环values,累加求和for(IntWritable value : values){
sum += value.get();}// 3.封装outValue
outValue.set(sum);// 4.写出
context.write(key,outValue);}}
🌰WordCountDriver类的编写(注意导包,最后执行main方法即可):
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importjava.io.IOException;publicclassWordCountDriver{publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,ClassNotFoundException{// 1.创建Job对象Configuration configuration =newConfiguration();Job job =Job.getInstance(configuration);// 2.设置jar存储位置
job.setJarByClass(WordCountDriver.class);// 3.关联Map和Reduce类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);// 4.设置Mapper阶段输出数据的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);// 5.设置最终数据输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);// 6.设置输入路径和输出路径// input.txt在F:\\input\\inputword路径下// 输入到F:\\output下,不能有outputword这个文件夹FileInputFormat.setInputPaths(job,newPath("F:\\input\\inputword"));FileOutputFormat.setOutputPath(job,newPath("F:\\output\\outputword"));// 7.提交jobboolean result = job.waitForCompletion(true);System.exit(result ?0:1);}}
在Hadoop中,一个Job代表一个计算任务,它是Hadoop分布式计算框架中的基本单位。一个Job可以包含多个MapReduce任务,MapReduce是一种用于大规模数据处理的编程模型。当一个Job被提交给Hadoop集群时,Hadoop会将其分解成多个任务(Tasks)并在集群中的各个节点上执行。这些任务包括Map任务和Reduce任务,它们分别执行Map函数和Reduce函数,并通过网络进行数据交换和传输
🧮提交到HDFS集群上测试:
上面通过Window依赖在Windows环境中运行,但是一般是在Linux环境下运行,故我们需要用maven打包成jar包
# 在pom.xml中加入插件,注意本地和虚拟机的jdk版本要统一
<build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
🧮修改输入路径和输出路径,使其接受传参
// 6.设置输入路径和输出路径FileInputFormat.setInputPaths(job,newPath(args[0]));FileOutputFormat.setOutputPath(job,newPath(args[1]));
上传重新生成的jar包(任一皆可,重命名为wc.jar移动到F盘)
# 上传sftp root@hadoop102
lcd F:\cd /root/
put wc.jar
# Linux下执行,Driver要敲Driver类的全类名
hadoop jar wc.jar com.ygy.mapreduce.wordcount2.WordCountDriver /input /output
👉打开Hadoop102web端看文件输出
👉打开Hadoop103web端看任务运行状态
假如报错:has been compiled by a more recent version of the Java Runtime (class file version 61.0), this version of the Java Runtime only recognizes class file versions up to 52.0
翻译:这个文件编译的JDK版本号应该是52(8),但是实际是61(17)
原因:windows上jdk在17及以上,SpringApplication该类的major版本是61,所以Linux上为jdk1.8用52的版本编译打包就会报错了
方法:
1、升级JDK版本到17及以上
2、降低依赖,不用spring6了
3、这么麻烦,懒得弄了
二、Hadoop序列化
1. 序列化概述
🧮序列化: 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输
🧮反序列化: 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换 成内存中的对象
为什么要用序列化?为什么不用Java的序列化?
- 一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的” 对象,可以将“活的”对象发送到远程计算机
- Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以, Hadoop 自己开发了一套序列化机制(Writable)
🧮Hadoop 序列化特点:
- 紧凑 :高效使用存储空间
- 快速:读写数据的额外开销小
- 互操作:支持多语言的交互
2. 自定义bean对象实现序列化接口(Writable)
在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在 Hadoop 框架内部传递一个 bean 对象,那么该对象就需要实现序列化接口
🌰具体实现bean对象序列化步骤:
- 实现Writable接口
- 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
publicFlowBean(){}
- 重写序列化方法
@Overridepublicvoidwrite(DataOutput dataOutput)throwsIOException{
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);}
- 重写反序列化方法
@OverridepublicvoidreadFields(DataInput dataInput)throwsIOException{this.upFlow = dataInput.readLong();this.downFlow = dataInput.readLong();this.sumFlow = dataInput.readLong();}
- 注意反序列化的顺序和序列化的顺序完全一致
- 要想把结果显示在文件中,需要重写toString(),可用
\t
分开,方便后续用- 如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为 MapReduce 框中的 Shuffle 过程要求对 key 必须能排序
🌰统计流量
🌰FlowBean.java:
importorg.apache.hadoop.io.Writable;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;/*
* 1.定义类实现Writable接口
* 2.重写序列化和反序列化方法
* 3.重写空参构造
* 4.重写toString方法
* */publicclassFlowBeanimplementsWritable{privatelong upFlow;privatelong downFlow;privatelong sumFlow;// 空参构造publicFlowBean(){}publiclonggetUpFlow(){return upFlow;}publicvoidsetUpFlow(long upFlow){this.upFlow = upFlow;}publiclonggetDownFlow(){return downFlow;}publicvoidsetDownFlow(long downFlow){this.downFlow = downFlow;}publiclonggetSumFlow(){return sumFlow;}publicvoidsetSumFlow(){this.sumFlow =this.upFlow +this.downFlow;}@Overridepublicvoidwrite(DataOutput out)throwsIOException{
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);}@OverridepublicvoidreadFields(DataInput in)throwsIOException{this.upFlow=in.readLong();this.downFlow=in.readLong();this.sumFlow=in.readLong();}@OverridepublicStringtoString(){return upFlow +"\t"+ downFlow +"\t"+ sumFlow ;}}
🌰FlowMapper.java:
importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;publicclassFlowMapperextendsMapper<LongWritable,Text,Text,FlowBean>{privateText k =newText();privateFlowBean v =newFlowBean();@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,FlowBean>.Context context)throwsIOException,InterruptedException{// 获取一行// 13480253104 192.196.100.1 www.atguigu.com 2481 24681 200String line = value.toString();// 切割String[] split = line.split("\t");// 获取我们需要的数据:手机号、上行流量和下行流量String phone = split[1];String up = split[split.length -3];String down = split[split.length -2];// 封装k和v
k.set(phone);
v.setUpFlow(Long.parseLong(up));
v.setDownFlow(Long.parseLong(down));
v.setSumFlow();// 写出
context.write(k, v);}}
🌰FlowReducer.java:
importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;publicclassFlowReducerextendsReducer<Text,FlowBean,Text,FlowBean>{privateFlowBean value =newFlowBean();@Overrideprotectedvoidreduce(Text key,Iterable<FlowBean> values,Reducer<Text,FlowBean,Text,FlowBean>.Context context)throwsIOException,InterruptedException{// 遍历集合累加值long totalUpFlow =0;long totalDownFlow =0;for(FlowBean value : values){
totalUpFlow += value.getUpFlow();
totalDownFlow += value.getDownFlow();}// 封装对象
value.setUpFlow(totalUpFlow);
value.setDownFlow(totalDownFlow);
value.setSumFlow();
context.write(key, value);}}
🌰FlowDriver.java:
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importjava.io.IOException;publicclassFlowDriver{publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,ClassNotFoundException{// 1. 获取job对象Configuration conf =newConfiguration();Job job =Job.getInstance(conf);// 2. 关联Driver类
job.setJarByClass(FlowDriver.class);// 3. 关联Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);// 4. 设置Map端输出kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);// 5. 设置程序最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);// 6. 设置程序的输入输出路径FileInputFormat.setInputPaths(job,newPath("F:\\hadoopTest\\input\\phone_data.txt"));FileOutputFormat.setOutputPath(job,newPath("F:\\hadoopTest\\output2"));// 7. 提交jobboolean result = job.waitForCompletion(true);System.exit(result ?0:1);}}
三、MapReduce框架原理
1. InputFormat 数据输入
MapTask 的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度
🧮数据块:Block是HDFS物理上把数据分成一块一块,数据块是HDFS存储数据单位
🧮数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask
🧮MapTask并行度决定机制:
🧮job源码解析
🧮FileInputFormat切片源码解析:
- 程序先找到你数据存储的目录
- 开始遍历处理(规划切片)目录下的每一个文件
- 遍历第一个文件ss.txt
- 获取文件大小fs.sizeOf(ss.txt)
- 计算切片大小 computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
- 默认情况下,切片大小=blocksize
- 开始切,形成第1个切片: ss.txt–0:128M 第2个切片ss.txt–128:256M第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
- 将切片信息写到一个切片规划文件中
- 整个切片的核心过程在getSplit()方法中完成
- ImputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等
- 提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数
🧮FileInputFormat切片机制:
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于Block大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
🧮FileInputFormat切片大小的参数配置
2. TextInputFormat
思考:在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?
🧮FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等
🧮TextInputFormat:是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型
3. CombineTextInputFormat切片机制
框架默认的TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下
🌰需求:将输入的大量小文件合并成一个切片统一处理,准备4个小文件a.txt、b.txt、c.txt、d.txt,期望一个切片处理4个文件
# 将wordcount包下三个java类一起复制到新建包combineTextInputFormat里,并且修改一下输出输入路径
FileInputFormat.setInputPaths(job,newPath("F:\\hadoopTest\\input\\inputword"));FileOutputFormat.setOutputPath(job,newPath("F:\\hadoopTest\\outputCombine1"));
💡实现过程:
- 先不做任何处理,运行该WordCount案例程序,观察可得切片个数为4
number of splits:4
- 在WordcountDriver中增加如下代码,运行程序,并观察运行的切片个数为3
// 如果不设置工InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);// 虚拟存储切片最大值设置4mCombineTextInputFormat.setMaxInputSplitSize(job,4194304);
- 虚拟存储切片最大值设置为20M,运行程序,并观察运行的切片个数为1
4. MapReduce工作流程
🧮Map阶段图解:
🧮Reduce阶段图解:
排序是MapReduce框架中最重要的操作之一(提高效率)。MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
- 对于MapTask:它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
- 对于ReduceTask:它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
🧮排序分类:
二次排序:在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序
5. Shuffer机制
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle
🧮Shuffer机制图:
🧮Partition分区
默认分区是根据key的hashCode对ReduceTasks个数取模得到的,用户没法控制哪个key存储到哪个分区
🧮自定义Partitioner步骤:
- 自定义类继承Partitioner,重写getPartition()方法
- 在Job驱动中,设置自定义Partitioner
job.setPartitioncrClass(CustomPartitioner.class);
- 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);
🧮Partition分区案例分析:
🌰实现:由于是在序列化案例的结果后将手机号不同划分到不同分区,复制Writable包下所有java类过来并且增加一个ProvincePartitioner分区类:
importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Partitioner;publicclassProvincePartitionerextendsPartitioner<Text,FlowBean>{@OverridepublicintgetPartition(Text text,FlowBean flowBean,int numPartitions){// 1.获取手机号前三位String preNum = text.toString().substring(0,3);// 2.分区int partition =4;if("136".equals(preNum)){
partition =0;}elseif("137".equals(preNum)){
partition =1;}elseif("138".equals(preNum)){
partition =2;}elseif("139".equals(preNum)){
partition =3;}return partition;}}
在FlowDriver.java中加上下面代码后,运行即可:
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);
假如我们把分区设置为4个,则会抛出IO异常;设为1个,则会调用默认的分区方法
🧮自定义排序WritableComparable:bean对象做为key传输,必须要有排序功能,需要实现WritableComparable接口重写compareTo方法,就可以实现排序
🌰WritableComparable排序案例实操(全排序)
🌰由于是在序列化案例的结果后对总流量进行倒序排序,复制Writable包下所有java类过来,修改FlowBean.java的接口为
WritableComparable<FlowBean>
,并实现compareTo方法:
@OverridepublicintcompareTo(FlowBean o){// 按照总流量大小,倒序排列returnthis.sumFlow > o.getSumFlow()?-1:1;}
输入数据都边了,处理方式自然不同,修改FlowMapper.java
publicclassFlowMapperextendsMapper<LongWritable,Text,FlowBean,Text>{privateFlowBean outK =newFlowBean();privateText outV =newText();@Overrideprotectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{// 1.获取一行String line = value.toString();// 2.切割String[] split = line.split("\t");// 3.封装对象
outV.set(split[0]);
outK.setUpFlow(Long.parseLong(split[1]));
outK.setDownFlow(Long.parseLong(split[2]));
outK.setSumFlow();// 4.写出
context.write(outK, outV);}
修改FlowReducer.java
publicclassFlowReducerextendsReducer<FlowBean,Text,Text,FlowBean>{@Overrideprotectedvoidreduce(FlowBean key,Iterable<Text> values,Context context)throwsIOException,InterruptedException{for(Text value : values){
context.write(value, key);}}}
修改FlowDriver.java,执行即可
// 互换kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
若想在总流量相同时按照上行流量倒序排序,只需修改FlowBean.java中修改compareTo方法即可
🌰WritableComparable排序案例实操(区内排序)
🌰实现:复制writableComarable包下所有java类过来,在FlowDriver中加上:
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);
🧮Combiner合并
🌰需求:统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能,希望在Map阶段就能看到统计结果
🌰实现:复制wordcount包中所有类到新建包combiner中,并新建WordCountCombiner类:
publicclassWordCountCombinerextendsReducer<Text,IntWritable,Text,IntWritable>{privateIntWritable outV =newIntWritable();@Overrideprotectedvoidreduce(Text key,Iterable<IntWritable> values,Reducer<Text,IntWritable,Text,IntWritable>.Context context)throwsIOException,InterruptedException{int sum =0;for(IntWritable value : values){
sum += value.get();}
outV.set(sum);
context.write(key, outV);}}
在FlowDriver中加入
// 设置Combiner类
job.setCombinerClass(WordCountCombiner.class);// 发现combiner类和WordCountReducer类一样,故可以直接用reducer不用编写Combiner类(常用,直接提前在Map阶段搞好)
job.setCombinerClass(WordCountReducer.class);
6. OutputFormat数据输出
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。
- TextOutputFormat默认输出格式
- 自定义OutputFormat(输出到MySQL、HBase、Elasticsearch等存储框架中)
🌰自定义OutputFormat案例:
🌰实现:创建outputFormat包,创建LogDriver、LogMapper、LogOutputFormat、LogRecordWriter、LogReducer五个Java类,代码如下:
// Mapper 类importorg.apache.hadoop.io.NullWritable;publicclassLogMapperextendsMapper<LongWritable,Text,Text,NullWritable>{@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,NullWritable>.Context context)throwsIOException,InterruptedException{// http://www.baidu.com// http://www.google.cos//(http://www.gaogle.com,NullWritable)// 不做任何处理,直接输出
context.write(value,NullWritable.get());}}// Reducer 类publicclassLogReducerextendsReducer<Text,NullWritable,Text,NullWritable>{@Overrideprotectedvoidreduce(Text key,Iterable<NullWritable> values,Reducer<Text,NullWritable,Text,NullWritable>.Context context)throwsIOException,InterruptedException{// http://www.baidu.com// http://www.baidu.com// 防止有重复的数据,丢数据for(NullWritable value : values){
context.write(key,NullWritable.get());}}}
自定义outputformat类:
publicclassLogOutputFormatextendsFileOutputFormat<Text,NullWritable>{@OverridepublicRecordWriter<Text,NullWritable>getRecordWriter(TaskAttemptContext job)throwsIOException,InterruptedException{LogRecordWriter lrw =newLogRecordWriter(job);return lrw;}}
继承了 RecordWriter 的 LogRecordWriter 类:
publicclassLogRecordWriterextendsRecordWriter<Text,NullWritable>{privateFSDataOutputStream out;privateFSDataOutputStream out2;publicLogRecordWriter(TaskAttemptContext job)throwsIOException{// 创建两条流FileSystem fs =FileSystem.get(job.getConfiguration());
out = fs.create(newPath("F:\\hadoopTest\\log.txt"));
out2 = fs.create(newPath("F:\\hadoopTest\\log2.txt"));}@Overridepublicvoidwrite(Text key,NullWritable value)throwsIOException,InterruptedException{// 判断key中是否有baiduif(key.toString().contains("baidu")){
out.write(key.toString().getBytes());}else{
out2.write(key.toString().getBytes());}}@Overridepublicvoidclose(TaskAttemptContext context)throwsIOException,InterruptedException{// 关闭流if(out !=null){
out.close();}if(out2 !=null){
out2.close();}}}
最后是Driver类:
publicclassLogDriver{publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,ClassNotFoundException{Configuration conf =newConfiguration();Job job =Job.getInstance(conf);
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);// 设置自定义的outputFormat
job.setOutputFormatClass(LogOutputFormat.class);FileInputFormat.setInputPaths(job,newPath("F:\\hadoopTest\\log.txt"));// 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat// 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录LogOutputFormat.setOutputPath(job,newPath("F:\\hadoopTest\\output"));boolean b = job.waitForCompletion(true);System.exit(b ?0:1);}}
7. MapReduce内核源码解析
🧮MapTask工作机制图示:
🧮MapTask五大阶段:
- Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value
- Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value
- Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用
OutputCollector.collect()
输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中 - 溢写阶段(Spill):当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作
溢写阶段详情:
- 利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序
- 按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件
output/spillN.out
(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作- 将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件
output/spillN.out.index
中
- Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件
output/file.out
中,同时生成相应的索引文件
output/file.out.index
。在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并
mapreduce.task.io.sort.factor
(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件
🧮ReduceTask工作机制图示:
🧮ReduceTask三大阶段:
- Copy阶段:ReduceTask从各个MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阙值,则写到磁盘上,否则直接放到内存中
- Sort阶段::在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照. MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可
- Reduce阶段:reduce()函数将计算结果写到HDFS上
🧮ReduceTask并行度决定机制
MapTask并行度由切片个数决定,切片个数由输入文件和切片规则决定
ReduceTask 的并行度同样影响整个Job 的执行并发度和执行效率,但与 MapTask 的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置job.setNumReduceTasks(4);
注意事项
- ReduceTask=0,表示没有Reduce阶段,输出文件个数和Map个数一致。
- ReduceTask默认值就是1,所以输出文件个数为一个。
- 如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜
- ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask。
- 具体多少个ReduceTask,需要根据集群性能而定。
- 如果分区数不是1,但是ReduceTask为1,不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。
8. Join多种应用
🧮Reduce Join:
- Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出
- Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在 Map阶段已经打标志)分开,最后进行合并就ok了
🌰案例:将两张表通过相同字段将两张表连接起来合并为一张表
🌰创建reduceJoin包编写java类,TableBean:
publicclassTableBeanimplementsWritable{// 序列化// 两张表的字段// id pid amount// pid pnameprivateString id;// 订单idprivateString pid;// 产品idprivateint amount;// 产品数量privateString pname;// 产品名称privateString flag;// 表的标记// 无参构造publicTableBean(){}// 省去getter/setter函数// 序列化@Overridepublicvoidwrite(DataOutput out)throwsIOException{
out.writeUTF(id);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);}// 反序列化@OverridepublicvoidreadFields(DataInput in)throwsIOException{
id = in.readUTF();
pid = in.readUTF();
amount = in.readInt();
pname = in.readUTF();
flag = in.readUTF();}@OverridepublicStringtoString(){// id pid amountreturn id +"\t"+ pname +"\t"+ amount;}}
TableMapper:
publicclassTableMapperextendsMapper<LongWritable,Text,Text,TableBean>{privateString name;privateText outK =newText();privateTableBean outV =newTableBean();@Overrideprotectedvoidsetup(Context context)throwsIOException,InterruptedException{// 初始化方法,只执行一次// 初始化 order 和 pd 两张表的数据FileSplit split =(FileSplit) context.getInputSplit();// 通过切片对象获取文件名
name = split.getPath().getName();}@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,TableBean>.Context context)throwsIOException,InterruptedException{// 读取一行数据String line = value.toString();// 判断是哪张表的数据if(name.contains("order")){String[] fields = line.split("\t");// 封装 key 和 value
outK.set(fields[1]);
outV.setId(fields[0]);
outV.setPid(fields[1]);
outV.setAmount(Integer.parseInt(fields[2]));
outV.setPname("");
outV.setFlag("order");}else{//处理商品表String[] fields = line.split("\t");// 封装 key 和 value
outK.set(fields[0]);
outV.setId("");
outV.setPid(fields[0]);
outV.setAmount(0);
outV.setPname(fields[1]);
outV.setFlag("pd");}// 写出
context.write(outK,outV);}}
TableReducer:
publicclassTableReducerextendsReducer<Text,TableBean,TableBean,NullWritable>{@Overrideprotectedvoidreduce(Text key,Iterable<TableBean> values,Context context)throwsjava.io.IOException,InterruptedException{// 准备初始化集合TableBean pdBean =newTableBean();ArrayList<TableBean> orderBeans =newArrayList<>();// 遍历 values,区分两张表
values.forEach(tableBean ->{if("order".equals(tableBean.getFlag())){// 订单表TableBean orderBean =newTableBean();try{org.apache.commons.beanutils.BeanUtils.copyProperties(orderBean,tableBean);}catch(Exception e){
e.printStackTrace();}
orderBeans.add(orderBean);}else{// 产品表try{org.apache.commons.beanutils.BeanUtils.copyProperties(pdBean,tableBean);}catch(Exception e){
e.printStackTrace();}}});// 表的拼接
orderBeans.forEach(orderBean ->{
orderBean.setPname(pdBean.getPname());try{
context.write(orderBean,NullWritable.get());}catch(Exception e){
e.printStackTrace();}});}}
TableDriver:
publicclassTableDriver{publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,ClassNotFoundException{Job job =Job.getInstance(newConfiguration());
job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
job.setMapOutputValueClass(Text.class);
job.setMapOutputKeyClass(TableBean.class);
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job,newPath("F:\\hadoopTest\\input\\inputtable"));FileOutputFormat.setOutputPath(job,newPath("F:\\hadoopTest\\output888"));boolean b = job.waitForCompletion(true);System.exit(b ?0:1);}}
缺点:
这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在 Reduce阶段极易产生数据倾斜
解决方案:
Map端实现数据合并==>Map Join
🧮Map Join
// 缓存普通文件到Task运行节点
job.addCacheFile(newURI("file://..."));//如果是集群运行,需要设置HDFS路径
ob.addCacheFile(newURI("hdfs://..."));
🌰同上一个案例,不同的是要求在Map阶段合并表,就不用reduce了
🌰创建一个mapJoin包,代码如下,MapJoinMapper:
publicclassMapJoinMapperextendsMapper<LongWritable,Text,Text,NullWritable>{privateHashMap<String,String> pdMap =newHashMap<>();privateText outK =newText();@Overrideprotectedvoidsetup(Mapper<LongWritable,Text,Text,NullWritable>.Context context)throwsIOException,InterruptedException{// 获取缓存的文件,并把文件内容封装到集合URI[] cacheFiles = context.getCacheFiles();FileSystem fs =FileSystem.get(context.getConfiguration());FSDataInputStream fis = fs.open(newPath(cacheFiles[0]));// 从流中读取数据BufferedReader reader =newBufferedReader(newInputStreamReader(fis,"UTF-8"));String line;while((line = reader.readLine())!=null){// 切割String[] split = line.split("\t");// 缓存到集合
pdMap.put(split[0], split[1]);}// 关流
reader.close();}@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,NullWritable>.Context context)throwsIOException,InterruptedException{// 处理 order.txt 文件String[] split = value.toString().split("\t");// 获取 pidString pid = pdMap.get(split[1]);// 获取订单id和订单数量// 封装
outK.set(split[0]+"\t"+ pid +"\t"+ split[2]);// 写出
context.write(outK,NullWritable.get());}}
MapJoinDriver:
publicclassMapJoinDriver{publicstaticvoidmain(String[] args)throwsIOException,URISyntaxException,InterruptedException,ClassNotFoundException{// 1. 获取 job 对象Configuration conf =newConfiguration();Job job =Job.getInstance(conf);// 2. 设置 jar 存储位置
job.setJarByClass(MapJoinDriver.class);// 3. 关联 map
job.setMapperClass(MapJoinMapper.class);// 4. 设置Map输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);// 5. 设置最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);// 加载缓存数据
job.addCacheFile(newURI("file:///D:/input/mapjoin/pd.txt"));// 设置 reduceTask 个数为 0
job.setNumReduceTasks(0);// 6. 设置输入路径和输出路径FileOutputFormat.setOutputPath(job,newPath("D:/output"));FileInputFormat.setInputPaths(job,newPath("D:/input/mapjoin/order.txt"));// 7. 提交 jobboolean result = job.waitForCompletion(true);System.exit(result ?0:1);}}
9. 数据清洗(ETL)
🧮ETL,是英文Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。
🧮ETL一词较常用在数据仓库,但其对象并不限于数据仓库。在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序
🌰需求:去除日志字段个数小于等于11的日志(map阶段),代码实现如下(WebLogMapper):
publicclassWebLogMapperextendsMapper<LongWritable,Text,Text,NullWritable>{@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,NullWritable>.Context context)throwsIOException,InterruptedException{// 1.获取一行String line = value.toString();// 2.Etlboolean result =parseLog(line, context);if(!result){return;}// 3.写出
context.write(value,NullWritable.get());}privatebooleanparseLog(String line,Mapper<LongWritable,Text,Text,NullWritable>.Context context){// 1.切割String[] fields = line.split(" ");// 2.过滤if(fields.length >11){// 计数器
context.getCounter("map","true").increment(1);returntrue;}else{
context.getCounter("map","false").increment(1);returnfalse;}}}
WebLogDriver:
publicclassWebLogDriver{publicstaticvoidmain(String[] args)throwsIOException{
args =newString[]{"D:\\Code\\Java\\big-data\\Hello\\src\\main\\java\\com\\ygy\\mapreduce\\etl\\input","D:\\Code\\Java\\big-data\\Hello\\src\\main\\java\\com\\ygy\\mapreduce\\etl\\output"};// 1.获取 job 对象Configuration conf =newConfiguration();Job job =Job.getInstance(newConfiguration());// 2.设置 jar 存储位置
job.setJarByClass(LogDriver.class);// 3.关联 mapper
job.setMapperClass(WebLogMapper.class);// 4.设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);// 5.设置输入输出路径FileInputFormat.setInputPaths(job,newPath(args[0]));FileOutputFormat.setOutputPath(job,newPath(args[1]));// 6.提交 jobtry{boolean result = job.waitForCompletion(true);System.exit(result ?0:1);}catch(InterruptedException|ClassNotFoundException e){
e.printStackTrace();}}}
可以利用正则匹配汇总来修改parseLog方法,实现更精确的过滤
四、Hadoop数据压缩
🧮优缺点:减少磁盘IO、减少磁盘存储空间,但是会增加CPU开销
🧮原则:运算密集型的Job,少用压缩;IO密集型的Job,多用压缩
🧮MapReduce支持的压缩编码:
压缩格式是否需要安装算法文件拓展名是否可切片压缩后原程序是否需要修改DEFLATE否DEFLATE.deflate否和文本处理一样,不需要修改Gzip否DEFLATE.gz否和文本处理一样,不需要修改Bzip2否Bzip2.bz2是和文本处理一样,不需要修改LZO要LZO.lzo是****需要建立索引,还需要指定输入格式Snappy否Snappy.snappy否和文本处理一样,不需要修改
压缩方式选择时重点考虑:
压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片
🧮几种压缩格式优缺点比较:
压缩格式优点缺点Gzip压缩率比较高不支持 Split;压缩/解压速度一般Bzip2压缩率高;支持Split压缩/解压速度慢LZO压缩/解压速度比较快;支持 Split压缩率一般;想支持切片需要额外创建索引Snappy压缩和解压缩速度快不支持 Split;压缩率一般
🧮压缩位置:压缩可以在 MapReduce作用的任意阶段启用
🧮压缩参数配置:为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器
# 查看是否安装,以及安装目录
Hadoop checknative
其他
✍WordCount是最常见、最基本的一个需求,例如进行词频统计、用户访问记录统计。如果数据量非常小的情况下,使用单机、批处理的方式就可以很快得到结果。但是如果数据量非常大,数据量可能是10G、100G等等。这种情况下使用单机、批处理的方式就非常低效率。所以这个时候就需要借助于分布式的思想进行处理——使用集群进行处理。
✍JAR(Java ARchive,Java 归档)是一种与平台无关的文件格式,可将多个文件合成一个文件。用户可将多个 Java applet 及其所需组件(.class 文件、图像和声音)绑定到 JAR 文件中,而后作为单个的简单 HTTP(Hypertext Tranfer Protocal,超文本传输协议)事务下载到浏览器中,从而大大提高下载速度。JAR 格式也支持压缩,从而减小了文件的大小,进一步缩短下载时间。
在实际开发中,maven等项目管理工具为我们自动地管理jar包以及相关的依赖,让jar包的调用看起来如黑盒一般"密不透风"。因为jar包主要是对class文件进行打包,而java编译生成的class文件是平台无关的,这就意味着jar包是跨平台的。当我们开发了一个程序以后,程序中有很多的类,如果需要提供给别人使用,发给对方一大堆源文件是非常不好的,因此通常需要把这些类以及相关的资源文件打包成一个 jar 包,把这个 jar 包提供给别人使用,同时提供给使用者清晰的文档。这样他人在拿到我们提供的jar之后,就能方便地进行调用。在平时写代码搬砖的时候,注意把自己代码的通用部分抽离出来。积累一些通用的util类,将其逐渐模块化,最后打成jar包供自己在别的项目或者模块中使用,同时不断打磨jar里面的内容,将其做得越来越容易理解和通用,这样的好处是除了会对你的代码重构能力以及模块抽象能力有很好的帮助之外,更是一种从长期解放你的重复工作量,让你有更多的精力去做其他事情的方式,甚至当你抽象出业内足够通用的jar之后,jar包还能为你带来意想不到的利润(当然公司里该保密的东西还是得保密的)。这也是java发展得如此之好的原因,无论出于盈利或者非盈利的目的,将自己的通用工具或者框架抽取出来,打成jar包供他人调用,使得整个java生态圈变得越来越强大–几乎很多业务场景都能找到对应的jar包。
✍Java Bean
并非所有的类都是 Java Bean,其是一种特殊的类,具有以下特征:
- 提供一个默认的无参构造函数。
- 需要被序列化并且实现了 Serializable 接口。
- 可能有一系列可读写属性,并且一般是 private 的。
- 可能有一系列的 getter 或 setter 方法。
✍下一篇:Yarn!
版权归原作者 欧叶冲冲冲 所有, 如有侵权,请联系我们删除。