目标
通过Java API来操作HDFS,完成的操作有:文件上传、文件下载、新建文件夹、查看文件、删除文件。
前提条件
1.Windows下安装好jdk1.8
2.Windows下安装好maven,这里使用Maven3.6.3
3.Windows下安装好IDEA,这里使用IDEA2021
4.Linux下安装好hadoop2,这里使用hadoop2.7.3
操作步骤
1.新建一个Maven工程
打开IDEA-->File-->New-->Project
选择Maven-->点击Next
选择工程代码存放目录,这个目录需要为一个空目录,目录名称就是工程名称,可以点击Artifact Coordinates左侧的三角形展开内容后,可以修改组织ID,组织ID一般为公司域名的倒写,例如:域名为baidu.com,组织ID填写com.baidu。这里使用默认值。然后点击
Finish
按钮完成Maven工程的创建。
2.pom.xml添加相关依赖
在
pom.xml
的
</project>
上一行添加如下依赖:
<dependencies>
<!-- hadoop相关依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
<!-- 单元测试依赖 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<!-- log4j依赖 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
</dependency>
</dependencies>
加载依赖
可以看到
3.编写测试代码
新建包
在
src\test\java
目录下新建package,包名为GroupID的值:
org.example
新建
HdfsTest
类
新建出的包名处,右键-->New-->Java Class
填写类名:
HdfsTest
文件上传
将本地
D:\test\input\1.txt
上传到HDFS的
/
目录下
在HdfsTest类编写代码,实现文件上传的代码如下:
/**
* 测试上传文件
*/
@Test
public void upload() throws URISyntaxException {
Configuration conf = new Configuration();
URI uri = new URI("hdfs://192.168.193.140:8020");//hdfs服务端地址
String localDir = "file:///D:/test/input/1.txt";//注意:这里的本地文件是指idea所在系统的本地文件
String hdfsDir = "/";
try {
Path localPath = new Path(localDir);
Path hdfsPath = new Path(hdfsDir);
FileSystem fs = FileSystem.get(uri,conf,"hadoop");
fs.copyFromLocalFile(localPath, hdfsPath);
} catch (Exception e) {
e.printStackTrace();
}
}
测试前数据准备,在Windows
D:\test\input
目录下新建一个
1.txt
文件,
1.txt
内容为
hello world
hello hadoop
启动hdfs服务
start-dfs.sh
查看HDFS的
/
目录,没有
1.txt
文件
[hadoop@node1 ~]$ hdfs dfs -ls /
Found 10 items
-rw-r--r-- 1 hadoop supergroup 26 2022-03-14 10:48 /2.txt
-rw-r--r-- 1 hadoop supergroup 1028 2022-03-15 16:22 /core-site.xml
drwxr-xr-x - hadoop supergroup 0 2022-03-14 10:39 /input
-rw-r--r-- 3 hadoop supergroup 27 2022-03-16 09:19 /newfile.txt
drwxr-xr-x - hadoop supergroup 0 2022-03-14 21:40 /out1
drwxr-xr-x - hadoop supergroup 0 2022-03-18 15:08 /out318
drwxr-xr-x - hadoop supergroup 0 2022-03-18 15:45 /out318-2
drwxr-xr-x - hadoop supergroup 0 2022-03-15 11:05 /output
drwxr-xr-x - hadoop supergroup 0 2022-03-14 10:32 /test
drwx------ - hadoop supergroup 0 2022-03-18 15:45 /tmp
运行测试,点击upload方法左侧的三角形,然后点击Run 'upload()'运行upload方法
看到Tests passed 或者 Exit code 0 说明运行成功。
查看HDFS的
/
目录,发现有
1.txt
文件
[hadoop@node1 ~]$ hdfs dfs -ls /
Found 11 items
-rw-r--r-- 3 hadoop supergroup 25 2022-03-21 23:26 /1.txt
-rw-r--r-- 1 hadoop supergroup 26 2022-03-14 10:48 /2.txt
-rw-r--r-- 1 hadoop supergroup 1028 2022-03-15 16:22 /core-site.xml
drwxr-xr-x - hadoop supergroup 0 2022-03-14 10:39 /input
-rw-r--r-- 3 hadoop supergroup 27 2022-03-16 09:19 /newfile.txt
drwxr-xr-x - hadoop supergroup 0 2022-03-14 21:40 /out1
drwxr-xr-x - hadoop supergroup 0 2022-03-18 15:08 /out318
drwxr-xr-x - hadoop supergroup 0 2022-03-18 15:45 /out318-2
drwxr-xr-x - hadoop supergroup 0 2022-03-15 11:05 /output
drwxr-xr-x - hadoop supergroup 0 2022-03-14 10:32 /test
drwx------ - hadoop supergroup 0 2022-03-18 15:45 /tmp
查看
1.txt
文件内容
[hadoop@node1 ~]$ hdfs dfs -cat /1.txt
hello world
hello hadoop
至此,说明从文件上传成功了。
思考:程序是如何实现文件上传的,核心代码是什么?如果不使用api是否可以自定义实现上传功能的代码?
文件下载
将HDFS
/1.txt
下载到本地
D:\test
目录下
/**
* 测试下载文件
*/
@Test
public void download(){
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.193.140:8020");
String localDir = "D:\\test\\download";//IDEA在windows下,所以本地是windows系统
String hdfsDir = "/1.txt";//下载文件
try {
Path localPath = new Path(localDir);
Path hdfsPath = new Path(hdfsDir);
FileSystem fs = FileSystem.newInstance(conf);
fs.copyToLocalFile(hdfsPath, localPath);
} catch (IOException e) {
e.printStackTrace();
}
}
运行测试
查看本地是否成功下载了
1.txt
文件
新建文件夹
/**
* 新建文件夹
*/
@Test
public void mkdir() throws URISyntaxException {
Configuration conf = new Configuration();
URI uri = new URI("hdfs://192.168.193.140:8020");
String hdfsDir = "/mkdir_test";
Path path = new Path(hdfsDir);
try {
FileSystem fs = FileSystem.get(uri,conf,"hadoop");
fs.mkdirs(path);
} catch (Exception e) {
e.printStackTrace();
}
}
运行测试
查看HDFS 是否创建了
/mkdir_test
[hadoop@node1 ~]$ hdfs dfs -ls /
Found 12 items
-rw-r--r-- 3 hadoop supergroup 25 2022-03-21 23:26 /1.txt
-rw-r--r-- 1 hadoop supergroup 26 2022-03-14 10:48 /2.txt
-rw-r--r-- 1 hadoop supergroup 1028 2022-03-15 16:22 /core-site.xml
drwxr-xr-x - hadoop supergroup 0 2022-03-14 10:39 /input
drwxr-xr-x - hadoop supergroup 0 2022-03-21 23:51 /mkdir_test
-rw-r--r-- 3 hadoop supergroup 27 2022-03-16 09:19 /newfile.txt
drwxr-xr-x - hadoop supergroup 0 2022-03-14 21:40 /out1
drwxr-xr-x - hadoop supergroup 0 2022-03-18 15:08 /out318
drwxr-xr-x - hadoop supergroup 0 2022-03-18 15:45 /out318-2
drwxr-xr-x - hadoop supergroup 0 2022-03-15 11:05 /output
drwxr-xr-x - hadoop supergroup 0 2022-03-14 10:32 /test
drwx------ - hadoop supergroup 0 2022-03-18 15:45 /tmp
思考:程序是如何实现文件下载的,核心代码是什么?如果不使用api是否可以自定义实现下载功能的代码?
查看文件状态
查看HDFS已存在的目录下的文件和目录,本案例查看的是
/user
,如果没有
/user
目录需要自行创建
/**
* 查看文件状态
*/
@Test
public void listFiles(){
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.193.140:8020");
try {
FileSystem fs = FileSystem.newInstance(conf);
// listFiles递归查询所有目录和文件
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path("/user"), true);
while (iterator.hasNext()){
LocatedFileStatus next = iterator.next();
System.out.println(next.getPath());
}
System.out.println("===================");
// listStatus只查询当前目录下的目录和文件
FileStatus[] fileStatuses = fs.listStatus(new Path("/user"));
for (int i = 0; i < fileStatuses.length; i++) {
FileStatus fileStatus = fileStatuses[i];
System.out.println(fileStatus.getPath());
}
} catch (IOException e) {
e.printStackTrace();
}
}
思考:这个程序能否运行成功,如果不能运行成功,原因是什么?请自行修改代码并运行成功。
删除文件
/**
* 删除目录或文件
*/
@Test
public void delete() throws URISyntaxException {
Configuration conf = new Configuration();
URI uri = new URI("hdfs://192.168.193.140:8020");
String hdfsDir = "/mkdir_test";
try {
FileSystem fs = FileSystem.get(uri,conf,"hadoop");
Path hdfsPath = new Path(hdfsDir);
fs.delete(hdfsPath,true);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
扩展
使用流拷贝方式实现上传和下载
// 采用流拷贝的方式实现上传、下载
/**
* 上传
*/
@Test
public void uploadByIOStream() throws URISyntaxException {
Configuration conf = new Configuration();
URI uri = new URI("hdfs://192.168.193.140:8020");
try {
FileSystem fs = FileSystem.get(uri, conf, "hadoop");
FileInputStream is = new FileInputStream("d:\\test\\data.txt");
FSDataOutputStream os = fs.create(new Path("/input2/data.txt"));//创建一个路径
// 流拷贝
IOUtils.copyBytes(is, os, 1024);
// 关闭资源
is.close();
os.close();
fs.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 下载
*/
@Test
public void downloadByIOStream() throws URISyntaxException, IOException, InterruptedException {
Configuration conf = new Configuration();
URI uri = new URI("hdfs://192.168.193.140:8020");
FileSystem fs = FileSystem.get(uri, conf, "hadoop");
FSDataInputStream is = fs.open(new Path("/input2/1.txt"));//输入流
FileOutputStream os = new FileOutputStream("d:\\test\\1downbyio.txt");
IOUtils.copyBytes(is,os,1024);
is.close();
os.close();
fs.close();
}
完成!enjoy it!
版权归原作者 Hadoop_Liang 所有, 如有侵权,请联系我们删除。