文章目录
hadoop第五章
!(*-*)! 关于下面windows要不要配置hadoop环境
此时我的window下是没有hadoop环境的
只有从集群下载文件到windows时报错,没有设置hadoop环境,需要配置hadoop环境
5.1 筛选日志文件生成序列化文件
5.1.1、设置MapReduce输入格式
job.setInputFotmatClass(TextInputFormat.class)
5.1.2、设置MapReduce输出格式
job.setOutputFormatClass(SequenceFileOutputFormat.class);
5.1.3、任务实现 筛选日志文件并生成序列化文件(完整代码)
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;importjava.io.IOException;publicclassSelectData{publicstaticclassMyMapextendsMapper<Object,Text,Text,Text>{publicvoidmap(Object key,Text value,Context context)throwsIOException,InterruptedException{String line = value.toString();String arr[]= line.split(",");if(arr[4].contains("2021/1")|| arr[4].contains("2021/2")){
context.write(newText(arr[2]),newText(arr[4].substring(0,9)));}}}publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();String[] otherArgs =newGenericOptionsParser(conf, args).getRemainingArgs();if(otherArgs.length <2){System.err.println("必须输入读取文件路径和输出路径");System.exit(2);}Job job =Job.getInstance(conf,"Select Data");
job.setJarByClass(SelectData.class);
job.setMapperClass(MyMap.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);// 设置输入格式
job.setInputFormatClass(TextInputFormat.class);// 设置输出格式
job.setOutputFormatClass(SequenceFileOutputFormat.class);// 设置Reducer任务数为0
job.setNumReduceTasks(0);for(int i =0; i < otherArgs.length -1;++i){FileInputFormat.addInputPath(job,newPath(otherArgs[i]));}FileOutputFormat.setOutputPath(job,newPath(otherArgs[otherArgs.length -1]));System.exit(job.waitForCompletion(true)?0:1);}}
pom.xml文件配置 !!!!!!!!不要全复制,复制依赖就行了
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>hadoop5.2</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.4</version></dependency></dependencies></project>
使用maven进行打包,打包完成在target文件夹下生产jar包
上传jar包到master节点下的/opt/jars/Hadoop文件夹下
运行jar包
hadoop jar /opt/jars/Hadoop/hadoop5.1-SelectData.jar SelectData
/Tipdm/Hadoop/MapReduce/raceData.csv
/Tipdm/Hadoop/MapReduce/Result/Select_Data
结果
5.2 使用Hadoop Java API 读取序列化文件(完整代码)
列举子目录
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileStatus;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;publicclass view_folders {publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();
conf.set("fs.defaultFS","master:8020");//获取文件系统FileSystem fs =FileSystem.get(conf);// 声明文件路径Path path =newPath("/user/root");// 获取文件列表FileStatus[] fileStatuses = fs.listStatus(path);// 遍历文件列表for(FileStatus file : fileStatuses){// 判断是否是文件夹if(file.isDirectory()){System.out.println(file.getPath().toString());}}// 关闭文件系统
fs.close();}}
结果
列举文件
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileStatus;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;publicclass view_files {publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();
conf.set("fs.defaultFS","master:8020");//获取文件系统FileSystem fs =FileSystem.get(conf);// 声明文件路径Path path =newPath("/user/root");// 获取文件列表FileStatus[] fileStatuses = fs.listStatus(path);// 遍历文件列表for(FileStatus file : fileStatuses){// 判断是否是文件夹if(file.isFile()){System.out.println(file.getPath().toString());}}// 关闭文件系统
fs.close();}}
结果
创建目录
涉及权限问题
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;publicclass mkdir_folders {publicstaticvoidmain(String[] args)throwsException{//获取配置Configuration conf =newConfiguration();
conf.set("fs.defaultFS","master:8020");//获取文件系统FileSystem fs =FileSystem.get(conf);//声明创建的目录Path path =newPath("/user/root/view_log");//调用mkdirs函数创建目录
fs.mkdirs(path);//关闭文件系统
fs.close();}}
直接运行报错,提示用户没有权限
简单方法:返回master节点下通过hdfs命令修改目录权限
hadoop fs -chmod 777 /user/root
然后再执行代码
执行成功,查看目录
删除文件
先上传文件至对应文件夹
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;publicclass delete_files {publicstaticvoidmain(String[] args)throwsException{//获取配置Configuration conf =newConfiguration();
conf.set("fs.defaultFS","master:8020");//获取文件系统FileSystem fs =FileSystem.get(conf);//声明文件路径Path path =newPath("/user/root/raceData.csv");//删除文件
fs.delete(path,true);//关闭文件系统
fs.close();}}
结果
上传文件至HDFS
!!!!!!!!!!修改自己本地文件的位置
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;publicclass put_files {publicstaticvoidmain(String[] args)throwsException{// 获取配置Configuration conf =newConfiguration();
conf.set("fs.defaultFS","master:8020");// 获取文件系统FileSystem fs =FileSystem.get(conf);// 声明源文件路径和目标路径Path fromPath =newPath("C:\\javaproject\\NO.5\\raceData.csv");Path toPath =newPath("/user/root/view_log");// 调用copyFromLocalFile方法上传文件
fs.copyFromLocalFile(fromPath, toPath);// 关闭文件系统
fs.close();}}
结果
下载文件到本地
下载位置修改成自己本地的目录
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;publicclass load_files {publicstaticvoidmain(String[] args)throwsException{// 获取配置Configuration conf =newConfiguration();
conf.set("fs.defaultFS","master:8020");// 获取文件系统FileSystem fs =FileSystem.get(conf);// 声明源文件路径和目标路径Path fromPath =newPath("/user/root/view_log/raceData.csv");Path toPath =newPath("C:/");// 调用copyToLocalFile方法下载文件到本地
fs.copyToLocalFile(false, fromPath, toPath,true);// 关闭文件系统
fs.close();}}
结果
读取文件内容
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FSDataInputStream;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importjava.io.BufferedReader;importjava.io.InputStreamReader;publicclass read_files {publicstaticvoidmain(String[] args)throwsException{//获取配置Configuration conf =newConfiguration();
conf.set("fs.defaultFS","master:8020");//获取文件系统FileSystem fs =FileSystem.get(conf);//声明查看的路径Path path =newPath("/user/root/view_log/raceData.csv");//获取指定文件的数据字节流FSDataInputStream is = fs.open(path);//读取文件内容并打印出来BufferedReader br =newBufferedReader(newInputStreamReader(is,"utf-8"));String line ="";while((line = br.readLine())!=null){System.out.println(line);}//关闭数据字节流
br.close();
is.close();//关闭文件系统
fs.close();}}
结果
写入数据
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FSDataInputStream;importorg.apache.hadoop.fs.FSDataOutputStream;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importjava.io.BufferedReader;importjava.io.BufferedWriter;importjava.io.InputStreamReader;importjava.io.OutputStreamWriter;publicclass write_files {publicstaticvoidmain(String[] args)throwsException{//获取配置Configuration conf =newConfiguration();
conf.set("fs.defaultFS","master:8020");//获取文件系统FileSystem fs =FileSystem.get(conf);//声明查看的路径Path path =newPath("/user/root/view_log/raceData.csv");//创建新文件Path newPath =newPath("/user/root/view_log/new_raceData.csv");
fs.delete(newPath,true);FSDataOutputStream os = fs.create(newPath);//获取指定文件的数据字节流FSDataInputStream is = fs.open(path);//读取文件内容并写入到新文件BufferedReader br =newBufferedReader(newInputStreamReader(is,"utf-8"));BufferedWriter bw =newBufferedWriter(newOutputStreamWriter(os,"utf-8"));String line ="";while((line = br.readLine())!=null){
bw.write(line);
bw.newLine();}//关闭数据字节流
bw.close();
os.close();
br.close();
is.close();//关闭文件系统
fs.close();}}
结果
5.2.4读取序列化文件
记得修改目录
importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.SequenceFile;importorg.apache.hadoop.io.Text;importjava.io.BufferedWriter;importjava.io.FileOutputStream;importjava.io.OutputStreamWriter;publicclass task5_2 {publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();
conf.set("fs.defaultFS","master:8020");//获取文件系统FileSystem fs =FileSystem.get(conf);//获取SequenceFile.Reader对象SequenceFile.Reader reader =newSequenceFile.Reader(fs,newPath("/Tipdm/Hadoop/MapReduce/Result/Select_Data/part-m-00000"),
conf);//获取序列化文件中使用的键值类型Text key =newText();Text value =newText();//读取的数据写入janfeb.txt文件BufferedWriter out =newBufferedWriter(newOutputStreamWriter(newFileOutputStream("C:\\javaproject\\NO.5\\task5_3.txt",true)));while(reader.next(key, value)){
out.write(key.toString()+"\t"+ value.toString()+"\r\n");}
out.close();
reader.close();}}
结果
查看本地文件夹
至此5.1和5.2结束,如有问题,欢迎评论
标签:
hadoop
本文转载自: https://blog.csdn.net/2401_86523075/article/details/144142545
版权归原作者 倒霉男孩 所有, 如有侵权,请联系我们删除。
版权归原作者 倒霉男孩 所有, 如有侵权,请联系我们删除。