0


云计算与大数据实验四 HDFS编程

一、实验目的

  1. 深入理解HDFS工作原理和编程思想
  2. 使用HDFS的Java接口进行文件的读写
  3. 使用HDFS的Java接口进行之上传文件
  4. 使用HDFS的Java接口进行之删除文件

二、实验内容

  1. HDFS的Java API接口进行文件的读写操作
  2. HDFS的Java API接口进行之上传文件操作
  3. HDFS的Java API接口进行之删除文件操作

三、实验步骤

(一)HDFS-JAVA接口之读取文件

我们要深入探索

Hadoop

FileSystem

类,它是与

Hadoop

的某一文件系统进行交互的

API

为了完成接下来的操作,你需要学习并掌握:

  1. FileSystem
    
    对象的使用,2.
    FSDataInputSteam
    
    对象的使用。

FileSystem对象

要从

Hadoop

文件系统中读取文件,最简单的办法是使用

java.net.URL

对象打开数据流,从中获取数据。不过这种方法一般要使用

FsUrlStreamHandlerFactory

实例调用

setURLStreamHandlerFactory()

方法。不过每个

Java

虚拟机只能调用一次这个方法,所以如果其他第三方程序声明了这个对象,那我们将无法使用了。 因为有时候我们不能在程序中设置

URLStreamHandlerFactory

实例,这个时候咱们就可以使用

FileSystem API

来打开一个输入流,进而对

HDFS

进行操作。

代码如下:

public sattic void main(String[] args){
    URI uri = URI.create("hdfs://localhost:9000/user/tmp/test.txt");
    Configuration config = new Configuration();
    FileSystem fs = FileSystem.get(uri, config);
    InputStream in = null;
    try {
        in = fs.open(new Path(uri));
        IOUtils.copyBytes(in, System.out, 2048, false);
    } catch (Exception e) {
        IOUtils.closeStream(in);
    }
}
FileSystem

是一个通用的文件系统

API

FileSystem

实例有下列几个静态工厂方法用来构造对象。

public static FileSystem get(Configuration conf)throws IOException
public static FileSystem get(URI uri,Configuration conf)throws IOException
public static FileSystem get(URI uri,Configuration conf,String user)throws IOException
Configuration

对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如:

/etc/hadoop/core-site.xml

)。

  • 第一个方法返回的默认文件系统是在core-site.xml中指定的,如果没有指定,就使用默认的文件系统。
  • 第二个方法使用给定的URI方案和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则返回默认文件系统,
  • 第三个方法作为给定用户来返回文件系统,这个在安全方面来说非常重要。

FSDataInputStream对象

实际上,

FileSystem

对象中的

open()

方法返回的就是

FSDataInputStream

对象,而不是标准的

java.io

类对象。这个类是继承了

java.io.DataInputStream

的一个特殊类,并支持随机访问,由此可以从流的任意位置读取数据。

在有了

FileSystem

实例之后,我们调用

open()

函数来获取文件的输入流。

public FSDataInputStream open(Path p)throws IOException
public abst\fract FSDataInputStream open(Path f,int bufferSize)throws IOException

了解了这些,我们在来回顾上文代码,就能更好的理解这些方法的作用了:

编写代码实现如下功能:

使用

FSDataInputStream

获取

HDFS

/user/hadoop/

目录下的

task.txt

的文件内容,并输出;

预期输出:

WARN [main] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
怕什么真理无穷,进一寸有一寸的欢喜。

相关代码:

1.    //启动hadoop: start-dfs.sh
2.    package step2;
3.    
4.    import java.io.IOException;
5.    import java.io.InputStream;
6.    import java.net.URI;
7.    
8.    import org.apache.hadoop.conf.Configuration;
9.    import org.apache.hadoop.fs.FileSystem;
10.    import org.apache.hadoop.fs.Path;
11.    import org.apache.hadoop.io.IOUtils;
12.    
13.    
14.    public class FileSystemCat {
15.    
16.        public static void main(String[] args) throws IOException {
17.            /********* Begin *********/
18.            Configuration config = new Configuration();
19.            URI uri = URI.create("hdfs://localhost:9000/user/hadoop/task.txt");
20.            FileSystem fs = FileSystem.get(uri, config);
21.            InputStream in = null;
22.            try {
23.                in = fs.open(new Path(uri));
24.                IOUtils.copyBytes(in, System.out, 2048, false);
25.            } catch (Exception e) {
26.                IOUtils.closeStream(in);
27.            }
28.            /********* End *********/
29.        }
30.    
31.    }

(二)HDFS-JAVA接口之上传文件

FSDataOutputStream对象

我们知道在

Java

中要将数据输出到终端,需要文件输出流,

HDFS

JavaAPI

中也有类似的对象。

FileSystem

类有一系列新建文件的方法,最简单的方法是给准备新建的文件制定一个

path

对象,然后返回一个用于写入数据的输出流:

public FSDataOutputStream create(Path p)throws IOException

该方法有很多重载方法,允许我们指定是否需要强制覆盖现有文件,文件备份数量,写入文件时所用缓冲区大小,文件块大小以及文件权限。

注意:

create()

方法能够为需要写入且当前不存在的目录创建父目录,即就算传入的路径是不存在的,该方法也会为你创建一个目录,而不会报错。如果有时候我们并不希望它这么做,可以先用

exists()

方法先判断目录是否存在。

我们在写入数据的时候经常想要知道当前的进度,

API

也提供了一个

Progressable

用于传递回调接口,这样我们就可以很方便的将写入

datanode

的进度通知给应用了。

package org.apache.hadoop.util;
public interface Progressable{
    public void progress();
}

编写代码与脚本实现如下功能:

/develop/input/

目录下创建

hello.txt

文件,并输入如下数据:

迢迢牵牛星,皎皎河汉女。
纤纤擢素手,札札弄机杼。
终日不成章,泣涕零如雨。
河汉清且浅,相去复几许?
盈盈一水间,脉脉不得语。
《迢迢牵牛星》

使用

FSDataOutputStream

对象将文件上传至

HDFS

/user/tmp/

目录下,并打印进度。

预期输出:

相关代码:

shell指令:

1.    mkdir /develop
2.    mkdir /develop/input
3.    cd /develop/input
4.    touch hello.txt
5.    vim hello.txt  插入数据  wq 保存退出
6.    start-dfs.sh
1.    package step3;
2.    
3.    
4.    import java.io.BufferedInputStream;
5.    import java.io.FileInputStream;
6.    import java.io.FileNotFoundException;
7.    import java.io.IOException;
8.    import java.io.InputStream;
9.    import java.net.URI;
10.    import java.io.File;
11.    
12.    import org.apache.hadoop.conf.Configuration;
13.    import org.apache.hadoop.fs.FSDataOutputStream;
14.    import org.apache.hadoop.fs.FileSystem;
15.    import org.apache.hadoop.fs.Path;
16.    import org.apache.hadoop.io.IOUtils;
17.    import org.apache.hadoop.util.Progressable;
18.    
19.    
20.    public class FileSystemUpload {
21.    
22.        public static void main(String[] args) throws IOException {
23.            /********* Begin *********/
24.            File localPath = new File("/develop/input/hello.txt");
25.            String hdfsPath = "hdfs://localhost:9000/user/tmp/hello.txt";
26.    
27.            InputStream in = new BufferedInputStream(new FileInputStream(localPath));// 获取输入流对象
28.    
29.            Configuration config = new Configuration();
30.    
31.            FileSystem fs = FileSystem.get(URI.create(hdfsPath), config);
32.    
33.            long fileSize = localPath.length() > 65536 ? localPath.length() / 65536 : 1; // 待上传文件大小
34.    
35.            FSDataOutputStream out = fs.create(new Path(hdfsPath), new Progressable() {
36.                // 方法在每次上传了64KB字节大小的文件之后会自动调用一次
37.                long fileCount = 0;
38.    
39.                public void progress() {
40.                    System.out.println("总进度" + (fileCount / fileSize) * 100 + "%");
41.                    fileCount++;
42.                }
43.            });
44.    
45.            IOUtils.copyBytes(in, out, 2048, true);// 最后一个参数的意思是使用完之后是否关闭流
46.            /********* End *********/
47.        }
48.    }

(三)HDFS-JAVA接口之删除文件

我们在开发或者维护系统时,经常会需要列出目录的内容,在

HDFS

API

中就提供了

listStatus()

方法来实现该功能。

public FileStatus[] listStatus(Path f)throws IOException
public FileStatus[] listStatus(Path f,PathFilter filter)throws IOException
public FileStatus listStatus(Path[] files)throws IOException
public FileStatus() listStatus(Path[] files,PathFilter filter)throws IOException

当传入参数是一个文件时,他会简单的转变成以数组方式返回长度为

1

FileStatus

对象,当传入参数是一个目录时,则返回

0

或多个

FileStatus

对象,表示此目录中包含的文件和目录。

删除文件

使用

FileSystem

delete()

方法可以永久性删除文件或目录。

public boolean delete(Path f,boolean recursive)throws IOException

如果

f

是一个文件或者空目录,那么

recursive

的值可以忽略,当

recursize

的值为

true

,并且

p

是一个非空目录时,非空目录及其内容才会被删除(否则将会抛出

IOException

异常)。

编写代码实现如下功能:

  • 删除HDFS/user/hadoop/目录(空目录);
  • 删除HDFS/tmp/test/目录(非空目录);
  • 列出HDFS根目录下所有的文件和文件夹;
  • 列出HDFS/tmp/的所有文件和文件夹。

预期输出:

相关代码:

1.    package step4;
2.    import java.io.IOException;
3.    import java.net.URI;
4.    import org.apache.hadoop.conf.Configuration;
5.    import org.apache.hadoop.fs.FileStatus;
6.    import org.apache.hadoop.fs.FileSystem;
7.    import org.apache.hadoop.fs.FileUtil;
8.    import org.apache.hadoop.fs.Path;
9.    
10.    public class FileSystemDelete {
11.    
12.        public static void main(String[] args) throws IOException {
13.        /********* Begin *********/
14.            String root = "hdfs://localhost:9000/";//根目录
15.            String path = "hdfs://localhost:9000/tmp"; //要列出的目录
16.            //待删除的两个目录
17.            String del1 = "hdfs://localhost:9000/user/hadoop";
18.            String del2 = "hdfs://localhost:9000/tmp/test";
19.    
20.            Configuration config = new Configuration();
21.            FileSystem fs = FileSystem.get(URI.create(root),config);
22.            fs.delete(new Path(del1),true);
23.            fs.delete(new Path(del2),true);
24.            Path[] paths = {new Path(root),new Path(path)};
25.            FileStatus[] status = fs.listStatus(paths);
26.            Path[] listPaths = FileUtil.stat2Paths(status);
27.    
28.            for(Path path1 : listPaths){
29.                System.out.println(path1);
30.            }
31.        /********* End *********/
32.        }
33.    }

四、实验心得

会使用HDFS的Java接口进行文件的读写

会使用HDFS的Java接口进行之上传文件

会使用HDFS的Java接口进行之删除文件


本文转载自: https://blog.csdn.net/qq_64314976/article/details/128495528
版权归原作者 Meteor.792 所有, 如有侵权,请联系我们删除。

“云计算与大数据实验四 HDFS编程”的评论:

还没有评论