目录
一、hadoop安装配置
1、下载解压hadoop-x.x.x.tar.gz
tar-xzvf hadoop-x.x.x.tar.gz
2、下载解压jdk
tar-xzvf jdkx.x.x_xxx.tar.gz
3、配置环境变量
vi /etc/profile
exportJAVA_HOME=/root/env/jdk1.8.0_301
exportHADOOP_HOME=/root/env/hadoop-3.3.1
exportPATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
3.1、hadoop3.x版本需要额外添加
exportHDFS_NAMENODE_USER=root
exportHDFS_DATANODE_USER=root
exportHDFS_SECONDARYNAMENODE_USER=root
exportYARN_RESOURCEMANAGER_USER=root
exportYARN_NODEMANAGER_USER=root
3.2、加载环境变量
source /etc/profile
4、配置主机名&主机映射
4.1、主机名
vi /etc/hostname
4.2、主机映射
vi /etc/hosts
172.16.162.71 namenode01
172.16.162.75 resourcemanager01
172.16.162.199 secondaryNN01
172.16.162.223 database
5、修改hadoop配置文件(路径:$HADOOP_HOME/etc/hadoop)
5.1、core-site.xml
<configuration><!-- 指定namenode地址 --><property><name>fs.defaultFS</name><value>hdfs://namenode01:9820</value></property><!-- 指定hadoop数据的存储目录 --><property><name>hadoop.tmp.dir</name><value>/root/env/hadoop-3.3.1/data</value></property><!-- namenode网页用户 --><property><name>hadoop.http.staticuser.user</name><value>root</value></property></configuration>
5.2、hdfs-site.xml
<configuration><!-- 指定Hadoop辅助名称节点主机配置 --><property><name>dfs.namenode.secondary.http-address</name><value>secondaryNN01:9868</value></property></configuration>
5.3、mapred-site.xml
<configuration><!-- 指定Mapreduce在yarn上运行 --><property><name>mapreduce.framework.name</name><value>yarn</value></property><!-- hadoop3.x版本中需要添加classpath配置 --><property><name>mapreduce.application.classpath</name><value>/root/env/hadoop-3.3.1/etc/hadoop:/root/env/hadoop-3.3.1/share/hadoop/common/lib/*:/root/env/hadoop-3.3.1/share/hadoop/common/*:/root/env/hadoop-3.3.1/share/hadoop/hdfs:/root/env/hadoop-3.3.1/share/hadoop/hdfs/lib/*:/root/env/hadoop-3.3.1/share/hadoop/hdfs/*:/root/env/hadoop-3.3.1/share/hadoop/mapreduce/*:/root/env/hadoop-3.3.1/share/hadoop/yarn:/root/env/hadoop-3.3.1/share/hadoop/yarn/lib/*:/root/env/hadoop-3.3.1/share/hadoop/yarn/*</value></property></configuration>
5.4、yarn-site.xml
<configuration><!-- 指定MR走shuffle --><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><!-- 指定resourcemanager地址 --><property><name>yarn.resourcemanager.hostname</name><value>resourcemanager01</value></property></configuration>
5.5、在workers文件中配置所有节点的ip或者主机名(hadoop2.x版本中是slaves)
5.6、配置完后,分发配置
rsync[path][user]@[ip地址]:[path]
二、运行hadoop
1、使用命令格式namenode
hadoop namenode -format
或
hdfs namenode -format
2、在namenode节点启动hdfs
start-dfs.sh
3、在resourcemanager节点启动yarn
start-yarn.sh
三、hadoop2.x和hadoop3.x变化
1、功能
功能hadoop2.xhadoop3.x支持的最低Java版本Hadoop 2.x - java的最低支持版本是java 7Hadoop 3.x - java的最低支持版本是java 8容错Hadoop 2.x - 可以通过复制(浪费空间)来处理容错。Hadoop 3.x - 可以通过Erasure编码处理容错。数据平衡Hadoop 2.x - 对于数据,平衡使用HDFS平衡器。Hadoop 3.x - 对于数据,平衡使用Intra-data节点平衡器,该平衡器通过HDFS磁盘平衡器CLI调用。存储SchemeHadoop 2.x - 使用3X副本SchemeHadoop 3.x - 支持HDFS中的擦除编码。存储开销Hadoop 2.x - HDFS在存储空间中有200%的开销。Hadoop 3.x - 存储开销仅为50%。存储开销示例Hadoop 2.x - 如果有6个块,那么由于副本方案(Scheme),将有18个块占用空间。Hadoop 3.x - 如果有6个块,那么将有9个块空间,6块block,3块用于奇偶校验。YARN时间线服务Hadoop 2.x - 使用具有可伸缩性问题的旧时间轴服务。Hadoop 3.x - 改进时间线服务v2并提高时间线服务的可扩展性和可靠性。兼容的文件系统Hadoop 2.x - HDFS(默认FS),FTP文件系统:它将所有数据存储在可远程访问的FTP服务器上。Amazon S3(简单存储服务)文件系统Windows Azure存储Blob(WASB)文件系统。Hadoop 3.x - 它支持所有前面以及Microsoft Azure Data Lake文件系统。Datanode资源Hadoop 2.x - Datanode资源不专用于MapReduce,我们可以将它用于其他应用程序。Hadoop 3.x - 此处数据节点资源也可用于其他应用程序。MR API兼容性Hadoop 2.x - 与Hadoop 1.x程序兼容的MR API,可在Hadoop 2.X上执行Hadoop 3.x - 此处,MR API与运行Hadoop 1.x程序兼容,以便在Hadoop 3.X上执行HDFS联盟Hadoop 2.x - 在Hadoop 1.0中,只有一个NameNode来管理所有Namespace,但在Hadoop 2.0中,多个NameNode用于多个Namespace。Hadoop 3.x - Hadoop 3.x还有多个名称空间用于多个名称空间。更快地访问数据Hadoop 2.x - 由于数据节点缓存,我们可以快速访问数据。Hadoop 3.x - 这里也通过Datanode缓存我们可以快速访问数据。平台Hadoop 2.x - 可以作为各种数据分析的平台,可以运行事件处理,流媒体和实时操作。Hadoop 3.x - 这里也可以在YARN的顶部运行事件处理,流媒体和实时操作。
2、端口
应用Haddop 2.x portHaddop 3.x portNamenode80209820NN HTTP UI500709870NN HTTPS UI504709871SNN HTTP500919869SNN HTTP UI500909868DN IPC500209867DN500109866DN HTTP UI500759864Datanode504759865
其余端口没有变化,reourcemanager网页端口 8088
四、HDFS常用命令
(此时操作的是hdfs的目录,并不是Linux的目录)
1、帮助命令
hdfs dfs --help
查看
Usage: hadoop fs [generic options][-appendToFile <localsrc>... <dst>][-cat [-ignoreCrc]<src>...][-checksum [-v]<src>...][-chgrp [-R] GROUP PATH...][-chmod [-R]<MODE[,MODE]... | OCTALMODE>PATH...][-chown [-R][OWNER][:[GROUP]]PATH...][-concat <target path><src path><src path>...][-copyFromLocal [-f][-p][-l][-d][-t <thread count>]<localsrc>... <dst>][-copyToLocal [-f][-p][-ignoreCrc][-crc]<src>... <localdst>][-count [-q][-h][-v][-t [<storage type>]][-u][-x][-e][-s]<path>...][-cp [-f][-p | -p[topax]][-d]<src>... <dst>][-createSnapshot <snapshotDir>[<snapshotName>]][-deleteSnapshot <snapshotDir><snapshotName>][-df [-h][<path>...]][-du [-s][-h][-v][-x]<path>...][-expunge [-immediate][-fs <path>]][-find <path>... <expression>...][-get [-f][-p][-ignoreCrc][-crc]<src>... <localdst>][-getfacl [-R]<path>][-getfattr [-R]{-n name | -d}[-e en]<path>][-getmerge [-nl][-skip-empty-file]<src><localdst>][-head <file>][-help [cmd ...]][-ls [-C][-d][-h][-q][-R][-t][-S][-r][-u][-e][<path>...]][-mkdir [-p]<path>...][-moveFromLocal [-f][-p][-l][-d]<localsrc>... <dst>][-moveToLocal <src><localdst>][-mv <src>... <dst>][-put [-f][-p][-l][-d][-t <thread count>]<localsrc>... <dst>][-renameSnapshot <snapshotDir><oldName><newName>][-rm [-f][-r|-R][-skipTrash][-safely]<src>...][-rmdir [--ignore-fail-on-non-empty]<dir>...][-setfacl [-R][{-b|-k}{-m|-x <acl_spec>}<path>]|[--set <acl_spec><path>]][-setfattr {-n name [-v value]|-x name}<path>][-setrep [-R][-w]<rep><path>...][-stat [format]<path>...][-tail [-f][-s <sleep interval>]<file>][-test -[defswrz]<path>][-text [-ignoreCrc]<src>...][-touch [-a][-m][-t TIMESTAMP (yyyyMMdd:HHmmss)][-c]<path>...][-touchz <path>...][-truncate [-w]<length><path>...][-usage [cmd ...]]
Generic options supported are:
-conf<configuration file> specify an application configuration file-D<property=value> define a value for a given property
-fs<file:///|hdfs://namenode:port> specify default filesystem URL to use, overrides 'fs.defaultFS' property from configurations.
-jt<local|resourcemanager:port> specify a ResourceManager
-files<file1,...> specify a comma-separated list of files to be copied to the map reduce cluster
-libjars<jar1,...> specify a comma-separated list of jar files to be included in the classpath
-archives<archive1,...> specify a comma-separated list of archives to be unarchived on the compute machines
The general command line syntax is:
command[genericOptions][commandOptions]
整体命令类似于Linux
2、HDFS上传下载操作
2.1、上传
hdfs dfs -put[src file][HDFS path]
2.2、下载
hdfs dfs -get[HDFS path][local path]
3、查看文件系统健康状态
hdfs dfsadmin -report
4、安全模式
安全模式是hadoop的一种保护机制,用于保证集群中的数据块的安全性。当hdfs进入安全模式时不允许客户端进行任何修改文件的操作,包括上传文件,删除文件,重命名,创建文件夹等操作。
当集群启动的时候,会首先进入安全模式。当系统处于安全模式时会检查数据块的完整性。假设我们设置的副本数(即参数dfs.replication)是5,那么在datanode上就应该有5个副本存在,假设只存在3个副本,那么比例就是3/5=0.6。通过配置
dfs.safemode.threshold.pct
定义最小的副本率,默认为0.999。
4.1、查看安全模式
hdfs dfsadmin -safemode get
4.2、强制进入安全模式
hdfs dfsadmin -safemode enter
4.3、强制离开安全模式
hdfs dfsadmin -safemode leave
五、Java操作HDFS
1、pom导入依赖
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.10.1</version></dependency>
2、编码(使用模板方法设计模式)
2.1、Template接口
importorg.apache.hadoop.fs.FileSystem;publicinterfaceTemplate{publicvoidtemplate(FileSystem fileSystem)throwsException;}
2.2、Template实现类
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importjava.net.URI;publicclassHadoopTemplate{publicstaticvoidexec(URI uri,Configuration configuration,String user,Template template)throwsException{//1.获取到客户端对象FileSystem fileSystem =FileSystem.get(uri, configuration, user);//2.执行操作
template.template(fileSystem);//3.关闭资源
fileSystem.close();}}
2.3、调用类
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FSDataInputStream;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importjava.net.URI;publicclassTest01{publicstaticvoidmain(String[] args)throwsException{String uri ="hdfs://172.16.106.56:9000";String user ="root";String path ="/test/input/a.txt";HadoopTemplate.exec(newURI(uri),newConfiguration(), user,newTemplate(){@Overridepublicvoidtemplate(FileSystem fileSystem)throwsException{//执行操作FSDataInputStream stream = fileSystem.open(newPath(path));byte[] bytes =newbyte[1024];int length =0;while((length = stream.read(bytes))!=-1){System.out.println(newString(bytes,0, length));}
stream.close();}});}}
可对HDFS上的文件及目录进行增删改查操作
六、MapReduce
使用新API写法
1、MR程序
importlombok.extern.java.Log;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.conf.Configured;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.Tool;importorg.apache.hadoop.util.ToolRunner;importjava.io.IOException;publicclassWordCountextendsConfiguredimplementsTool{/** 一次读取一行数据
泛型LongWritable, Text, Text, IntWritable
LongWritable为偏移量,就是每行数据开始的下标
Text为当前行
Text, IntWritable Map阶段输出的k,v类型(自定义)
*/publicstaticclassWCMapperextendsMapper<LongWritable,Text,Text,IntWritable>{privateText outKey =newText();privateIntWritable outValue =newIntWritable(1);@Overrideprotectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{// 指定行内每个字段的分割符String[] words = value.toString().split(" ");for(String word : words){
outKey.set(word);// 循环写出给reduce处理
context.write(outKey, outValue);}}}// Text, IntWritable 对应Map阶段的输出k,v// Text, IntWritable reduce阶段输出的k,v类型publicstaticclassWCReducerextendsReducer<Text,IntWritable,Text,IntWritable>{/**
传进来的数据类似于
key:hello
values:(1,1,1)
key为map阶段设置的key,values为map阶段输出的所有同一个key的value组成的可迭代对象
*/@Overrideprotectedvoidreduce(Text key,Iterable<IntWritable> values,Context context)throwsIOException,InterruptedException{int sum =0;for(IntWritable value : values){
sum += value.get();}
context.write(key,newIntWritable(sum));}}@Overridepublicintrun(String[] args)throwsException{// 获取jobJob job =Job.getInstance(this.getConf());//设置jar包路径
job.setJarByClass(this.getClass());//关联mapper和reducer
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);//设置map输出的k,v类型(对应mapper的输出类型)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);//设置最终输出的k,v类型(对应reducer输出类型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);//设置输入、输出路径FileInputFormat.setInputPaths(job,newPath(args[0]));FileOutputFormat.setOutputPath(job,newPath(args[1]));//提交jobboolean result = job.waitForCompletion(true);return result ?0:1;}// args参数为命令行下输入的运行时参数publicstaticvoidmain(String[] args)throwsException{Configuration configuration =newConfiguration();Path output =newPath(args[1]);FileSystem fileSystem =FileSystem.get(configuration);//判断HDFS上是否存在输出路径,有则删除if(fileSystem.exists(output)){System.out.println("==========目录已存在,执行删除==========");
fileSystem.delete(output,true);}//运行int status =ToolRunner.run(configuration,newWordCount(), args);System.exit(status);}}
2、自定义对象作为key或者value
importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importorg.apache.hadoop.io.WritableComparable;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;// lombok自动生成get/set方法@Data// lombok自动生成无参构造器@NoArgsConstructor// lombok自动生成全参构造器@AllArgsConstructor/**
bean对象需要实现hadoop序列化接口
作为map/reduce的value只需要实现 Writable 接口,实现write和readFields方法
作为map/reduce的key需要再实现Comparable<T>比较接口,可直接实现WritableComparable<T>达到效果
*/publicclassUserimplementsWritableComparable<FlowBean>{privateString name;privateInteger age;// write和readFields属性顺序必须一致@Overridepublicvoidwrite(DataOutput dataOutput)throwsIOException{
dataOutput.writeUTF(this.name);
dataOutput.writeInt(this.age);}@OverridepublicvoidreadFields(DataInput dataInput)throwsIOException{this.name = dataInput.readUTF();this.age = dataInput.readInt;}// 重写object的toString方法,不然MR输出的对象为内存地址@OverridepublicStringtoString(){returnthis.name +"\t"+this.age;}// Comparable<T>的比较方法,用于key排序@OverridepublicintcompareTo(FlowBean o){return totalFlow.compareTo(o.getTotalFlow());}}
3、MR其余操作
3.1、多个小文件
job.setInputFormatClass(CombineTextInputFormat.class);
通过设置InpuFormat类型处理,不设置则会几个文件启动几个maptask 即使该文件没有达到切片大小
3.2、分区,reduce之后输出的文件
/**
Text, FlowBean为map阶段输出的k,v
通过方法return数字决定输出文件的分区,从0开始
*/publicstaticclassFlowPartitionerextendsPartitioner<Text,FlowBean>{@OverridepublicintgetPartition(Text text,FlowBean flowBean,int i){return0;}}
在run方法中设置
// 有几个分区就设置几个reduceTask
job.setNumReduceTasks(1);// 分区的实现类
job.setPartitionerClass(FlowPartitioner.class);
3.3、reduce之前预聚合,MR优化(全局排序等不能使用)
// 可直接把自定义的redue类的class放入
job.setCombinerClass(mrEnum.getReducerClass());
4、输出到database
importmapreducer.DBWordCount;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.conf.Configured;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.db.DBConfiguration;importorg.apache.hadoop.mapreduce.lib.db.DBOutputFormat;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.util.Tool;importorg.apache.hadoop.util.ToolRunner;importentity.DBWCBean;publicclassFile2DBextendsConfiguredimplementsTool{publicstaticclassDBMapperextendsMapper<LongWritable,Text,Text,IntWritable>{privateText outKey =newText();privateIntWritable outValue =newIntWritable(1);@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,IntWritable>.Context context)throwsIOException,InterruptedException{String[] words = value.toString().split(" ");for(String word : words){
outKey.set(word);
context.write(outKey, outValue);}}}publicstaticclassDBReducerextendsReducer<Text,IntWritable,DBWCBean,NullWritable>{@Overrideprotectedvoidreduce(Text key,Iterable<IntWritable> values,Reducer<Text,IntWritable,DBWCBean,NullWritable>.Context context)throwsIOException,InterruptedException{int sum =0;for(IntWritable value : values){
sum += value.get();}
context.write(newDBWCBean(key.toString(), sum),NullWritable.get());}}@Overridepublicintrun(String[] args)throwsException{DBConfiguration.configureDB(this.getConf(),"com.mysql.cj.jdbc.Driver","jdbc:mysql://database:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC","root","123456");Job job =Job.getInstance(this.getConf());
job.setJarByClass(this.getClass());
job.setMapperClass(DBMapper.class);
job.setReducerClass(DBReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(DBWCBean.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(DBOutputFormat.class);FileInputFormat.setInputPaths(job,newPath(args[0]));DBOutputFormat.setOutput(job,"wc","word","count");return job.waitForCompletion(true)?0:1;}publicstaticvoidmain(String[] args)throwsException{Configuration configuration =newConfiguration();System.exit(ToolRunner.run(configuration,newFile2DB(), args));}}
5、可以自定义OutputFormat
5.1、继承FileOutputFormat<K,V>,实现getRecordWriter
5.2、继承RecordWriter<K,V>,并对其按业务需求进行实现
不做演示
6、MR实现Join操作
6.1、reduce端实现
实例文件
order.xt
01 1001 1
02 1002 2
03 1003 3
01 1004 4
02 1005 5
03 1006 6
pd.txt
01 小米
02 华为
03 苹果
自定义bean对象(包含所有值)
importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importorg.apache.hadoop.io.WritableComparable;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;@Data@NoArgsConstructor@AllArgsConstructorpublicclassTableJoinBeanimplementsWritableComparable<TableJoinBean>{privateString orderId ="";privateString productName ="";privateint sum;privateboolean flag;publicTableJoinBean(String orderId,int sum){this.orderId = orderId;this.sum = sum;}@Overridepublicvoidwrite(DataOutput dataOutput)throwsIOException{
dataOutput.writeUTF(this.orderId);
dataOutput.writeUTF(this.productName);
dataOutput.writeInt(this.sum);
dataOutput.writeBoolean(this.flag);}@OverridepublicvoidreadFields(DataInput dataInput)throwsIOException{this.orderId = dataInput.readUTF();this.productName = dataInput.readUTF();this.sum = dataInput.readInt();this.flag = dataInput.readBoolean();}@OverridepublicintcompareTo(TableJoinBean o){returnthis.orderId.compareTo(o.getOrderId());}@OverridepublicStringtoString(){return orderId +'\t'+ productName +'\t'+ sum;}}
Reduce&Mapper
importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileSplit;importjava.io.IOException;importjava.util.ArrayList;publicclassTableJoin{publicstaticclassTJMapperextendsMapper<LongWritable,Text,Text,TableJoinBean>{privateString fileName;privateText k =newText();privateTableJoinBean v =newTableJoinBean();// map方法执行前,并且只执行一下 map方法每来一行数据执行一次@Overrideprotectedvoidsetup(Context context)throwsIOException,InterruptedException{FileSplit inputSplit =(FileSplit) context.getInputSplit();// 获取当前文件的名称
fileName = inputSplit.getPath().getName();}@Overrideprotectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{String[] strings = value.toString().split(" ");
k.set(strings[0]);// 对象文件名称进行判断,并给对象对应的属性赋值
v.setFlag(fileName.contains("order"));if(v.isFlag()){
v.setOrderId(strings[1]);
v.setSum(Integer.parseInt(strings[2]));}else{
v.setProductName(strings[1]);}
context.write(k, v);}}publicstaticclassTJReducerextendsReducer<Text,TableJoinBean,Text,TableJoinBean>{@Overrideprotectedvoidreduce(Text key,Iterable<TableJoinBean> values,Context context)throwsIOException,InterruptedException{String productName ="";ArrayList<TableJoinBean> tableJoinBeans =newArrayList<>();// 对同一key的对象属性进行聚合操作for(TableJoinBean value : values){if(value.isFlag()){// 因为Hadoop重写了Iterable,复用对象以达到优化的效果 所有得每次得重新创建对象
tableJoinBeans.add(newTableJoinBean(value.getOrderId(), value.getSum()));}else{
productName = value.getProductName();}}for(TableJoinBean tableJoinBean : tableJoinBeans){
tableJoinBean.setProductName(productName);
context.write(key, tableJoinBean);}}}}
6.2、Map端实现(效率会比reduce端快,mapTask数量多余reduceTask)
思路:
不需要reduce,设置reduceTask为0
job.setNumReduceTasks(0);
在map端把较小的一张表加载为缓存,并把k,v用map存起来
job.addCacheFile(newURL("HDFS上文件的路径,或者本地文件的路径"));
setup()方法中:
- 获取缓存的文件
URL[] cacheFiles = context.getCacheFiles();FileSystem fs =FileSystem.get(context.getConfiguration());FSDataInputStream fis = fs.open(newPath(cacheFiles[0]));
- 循环读取文件的一行
BufferdReader reader =newBufferedReader(newInputStreamReader(fis,"UTF-8"));// java io流读取操作while(...){...}
- 切割
- 缓存数据到HashMap
- 关闭流
map()方法中:
- 获取一行
- 截取
- 获取对应HashMap的key
- 获取其余值
- 拼接
- 写出
七、压缩
1、MR支持的压缩编码
压缩格式Hadoop自带算法文件扩展名是否支持切片是否修改程序DEFAULT是DEFAULT.default否否Gzip是DEFAULT.gz否否bzip2是bzip2.bz2是否LZO否(需要安装)LOZ.loz是需要创建索引,指定输入格式Snappy是Snappy`.snappy都否
2、输入端
2.1、数据量小于块大小,优先考虑压缩速度 LZO/Snappy
2.2、数据量大,优先考虑支持切片 Bzip/LZO
3、mapper输出端
重点考虑压缩/解压速度 LZO/Snappy
4、Reduce输出端
4.1、如果数据永久保存,考虑压缩率较高的 Bzip2/Gzip
4.2、如果传给下一个MR做处理需考虑数据量和是否支持切片
5、实际配置
参数默认值阶段io.compression.codecs(core-site.xml)无输入压缩mapreduce.map.output.compress(mapred-site.xml)falsemapper输出mapreduce.map.output.compress.codec(mapred-site.xml)org.apache.hadoop.io.compress.DefaultCodecmapper输出mapreduce.output.fileoutputformat.compress(mapred-site.xml)falsereduce输出mapreduce.output.fileoutputformat.compress.codec(mapred-site.xml)org.apache.hadoop.io.compress.DefaultCodecreduce输出
6、在Java代码中配置
...Configuration conf =newConfiguration();// map输出端开启压缩/**
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
*/// reduce输出端开启压缩/**
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, Bzip2Codec.class);
*/
八、yarn常用命令
1、yarn application 查看任务
1.1、列出所有任务
yarn application -list
1.2、根据 application 状态过滤
所有状态:ALL、NEW、NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED
yarn application -list-appStates[状态]
1.3、kill 掉 application
yarn application -kill[applicaionId]
1.4、查询 application 日志
yarn logs -applicationId[applicationId]
1.5、查看 container日志
yarn logs -applicationId[applicationId]-containerId[containerId]
1.6、列出 application 尝试的列表
yarn applicationattempt -list[applicationId]
1.7、打印 applicationAttemp状态
yarn applicationattempt -status[applicationAttemptId]
1.8、列出所有容器
yarn container -list[applicationAttemptId]
1.9、打印容器状态
yarn container -status[containerId]
1.10、查看node状态
yarnnode-list-all
1.11、加载队列配置
yarn remadmin -reefreshQueues
1.12、查看队列
yarn queue -status[queueName]
九、yarn生产环境核心参数配置
1、resourceManager相关
参数作用yarn.resourcemanager.scheduler.class配置调度器yarn.resourcemanager.scheduler.client.thread.countresourcemanager处理的线程数,默认50
2、nodeManager相关(每个节点单独配置)
参数作用yarn.nodemanager.resource.detect-hardware-capabilitiesyarn自己检查硬件进行配置,默认falseyarn.nodemanager.resource.count-logical-processors-as-cores是否将虚拟核数当中cpu核数,默认falseyarn.nodemanager.resource.pcores-vcores-multiplier虚拟核数与物理核数的比值,默认1.0yarn.nodemanager.resource.memory-mbnodeManager使用内存,默认8Gyarn.nodemanager.resource.system-reserved-memore-mbnodeManager为系统保留多少内存yarn.nodemanager.resource.cpu-vcoresnodeManager使用CPU核数,默认8yarn.nodemanager.pmem-check-enabled是否开启物理内存检查限制container,默认trueyarn.nodemanager.vmem-check-enabled是否开启虚拟内存检查限制container,默认trueyarn.nodemanager.vmem-pmem-ratio虚拟内存物理内存比例,默认2.1
3、container相关
参数作用yarn.scheduler.minimum-allocation-mb容器最小内存,默认1Gyarn.scheduler.maximum-allocation-mb容器最大内存,默认8Gyarn.scheduler.minimum-allocation-vcores容器最小cpu核数,默认1个yarn.scheduler.maximum-allocation-vcores容器最大cpu核数,默认4个
es-vcores-multiplier | 虚拟核数与物理核数的比值,默认1.0 |
| yarn.nodemanager.resource.memory-mb | nodeManager使用内存,默认8G |
| yarn.nodemanager.resource.system-reserved-memore-mb | nodeManager为系统保留多少内存 |
| yarn.nodemanager.resource.cpu-vcores | nodeManager使用CPU核数,默认8 |
| yarn.nodemanager.pmem-check-enabled | 是否开启物理内存检查限制container,默认true |
| yarn.nodemanager.vmem-check-enabled | 是否开启虚拟内存检查限制container,默认true |
| yarn.nodemanager.vmem-pmem-ratio | 虚拟内存物理内存比例,默认2.1 |
3、container相关
参数作用yarn.scheduler.minimum-allocation-mb容器最小内存,默认1Gyarn.scheduler.maximum-allocation-mb容器最大内存,默认8Gyarn.scheduler.minimum-allocation-vcores容器最小cpu核数,默认1个yarn.scheduler.maximum-allocation-vcores容器最大cpu核数,默认4个
版权归原作者 红陌樱花vip 所有, 如有侵权,请联系我们删除。