一、文件写入流程
1.相关知识点介绍
Pipeline管道:
Pipeline,即管道。这是 HDFS 在上传⽂件写数据过程时采⽤的⼀种数据传输⽅式。客户端将数据块写⼊第⼀个数据节点,第⼀个数据节点保存数据之后再将块复制到第⼆个数据节点,后者保存后将其复制到第三个数据节点。通俗描述 pipeline 的过程就是:Client——>DN1——>DN2—>DN3
为什么 datanode 之间采⽤ pipeline 线性传输,⽽不是⼀次给三个 datanode 拓扑式传输呢?因为数据以管道的⽅式,顺序的沿着⼀个⽅向传输,这样能够充分利⽤每个机器的带宽,避免⽹络瓶颈和⾼延迟时的连接,最⼩化推送所有数据的延时。在线性推送模式下,每台机器所有的出⼝宽带都⽤于以最快的速度传输数据,⽽不是在多个接受者之间分配宽带。
ACK应答:
ACK (Acknowledge character)即是确认字符,在数据通信中,接收⽅发给发送⽅的⼀种传输类控制字符。表示发来的数据已确认接收⽆误。在 pipeline 管道传输数据的过程中,传输的反⽅向会进⾏ ACK 校验,确保数据传输安全。
2.写入流程
(1)客户端调⽤ DistributedFileSystem 对象的 create() ⽅法创建⼀个⽂件输出流对象。
(2)DistributedFileSystem 对象向 NameNode 发起 RPC 调⽤,NameNode 检查该⽂件是否已经存在,以及客户端是否有权限新建⽂件。如果这些检查通过,NameNode 就会为创建新⽂件记录⼀条记录。否则,⽂件创建失败并向客户端抛出 IOException。
(3)DistributedFileSystem 向客户端返回 FSDataOutputStream 输出流对象。由此客户端可以开始写⼊数据。FSDataOutputStream 封装⼀个 DFSOutputStream 对象,负责处理 DataNode 和 NameNode 之间的通信。
(4)客户端调⽤ FSDataOutputStream 对象的 write() ⽅法写数据。DFSOutputStream 将数据分成⼀个个数据包(packet 默认 64KB),并写⼊⼀个称之为数据队列(Data queue)的内部队列。DFSOutputStream 有⼀个内部类DataStreamer,⽤于请求 NameNode 挑选出适合存储数据副本的⼀组 DataNode。这⼀组DataNode 采⽤ pipeline 机制做数据的发送。默认是 3 副本存储。
(5)Pipeline 传递数据给 DataNode。DataStreamer 将数据包流式传输到 pipeline 的第⼀个 datanode,该DataNode存储数据包并将它发送到 pipeline 的第⼆个 DataNode。同样,第⼆个 DataNode 存储数据包并且发送给第三个(也是最后⼀个)DataNode。
(6)DFSOutputStream 也维护着⼀个内部数据包队列来等待 DataNode 的收到确认回执,称之为确认队列(ackqueue),收到 pipeline 中所有 DataNode 确认信息后,该数据包才会从确认队列删除。管道上的数据节点按反向顺序返回确认信息,最终由管道中的第⼀个数据节点将整条管道的确认信息发送给客户端。
(7)客户端完成写⼊,调⽤ close() ⽅法关闭⽂件输出流。
(8)通知 NameNode ⽂件写⼊成功。
(9)在⼀个块被写⼊期间可能会出现多个 DataNode 同时发⽣故障。只要写⼊了指定的最⼩副本数(dfs.namenode.replication.min 的默认值为 1),写操作就会判定成功。数据块其他副本可以在集群中异步复制,直到达到其⽬标副本数(dfs.replication 的默认值为 3)。
3.代码实现
// ⽂件写⼊
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.FileInputStream;
import java.io.IOException;
public class HDFSWriteDemo {
public static void main(String[] args) throws IOException {
// 设置客户端⽤户身份:root 具备在 hdfs 的读写权限
System.setProperty("HADOOP_USER_NAEM", "root");
// 创建 conf 对象
Configuration conf = new Configuration();
// 设置分布⽂件系统,默认为本地⽂件系统file:///
conf.set("fs.defaultFS", "hdfs://node01:9000");
// 创建 FileSystem 对象
FileSystem fs = FileSystem.get(conf);
// 设置⽂件输出路径
Path path = new Path("/mydir/data.txt");
// 调⽤ create ⽅法创建⽂件
FSDataOutputStream out = fs.create(path);
// 创建本地输⼊流
FileInputStream in = new FileInputStream("D:\\data.txt");
// IO ⼯具类实现流对拷⻉
IOUtils.copy(in, out);
// 关闭连接
fs.close();
}
}
二、文件读取流程
1.读取流程
(1)HDFS Client 调⽤抽象类 FileSystem.get() 获取⼀个 DistributedFileSystem 对象,然后调⽤
DistributedFileSystem.open() 打开要读取的⽂件。
(2)DistributedFileSystem 向 NameNode 发起 RPC 调⽤,获得⽂件的数据块信息。NameNode 返回数据块 ID 及 存储数据块的 DataNode 地址列表,该列表按照数据块 DataNode 与客户端的⽹络拓扑距离进⾏排序。
(3)DistributedFileSystem 在底层调⽤ ClientProtocol.open(),返回⼀个 FSDataInputStream 对象⽤于读取数据。FSDataInputStream 底层封装了 DFSInputStream 对象,负责管理 DataNode 和 NameNode 之间的 I/O。
(4)客户端对 DFSInputStream 调⽤ read() ⽅法。随即 DFSInputStream 连接与客户端距离最近的 NameNode,通过对数据流反复调⽤read()⽅法,把数据从 DataNode 传输到客户端。
(5)当数据块读取完毕时,DFSInputStream 将关闭与该 DataNode 的连接,然后寻找下⼀个块的最佳DataNode。这些操作对⽤户来说是透明的。所以⽤户感觉起来它⼀直在读取⼀个连续的流。
(6)当客户端读取完数据时,调⽤ FSDataInputStream.close() ⽅法关闭输⼊流。
(7)如果 DFSInputStream 与 DataNode 通信时遇到错误,它将尝试该块的下⼀个最接近的 DataNode 读取数据,并记住发⽣故障的 DataNode,保证以后不会反复读取该 DataNode 后续的块。此外,DFSInputStream也会通过校验和(checksum)确认从 DataNode 发来的数据是否完整。如果发现有损坏的块,DFSInputStream 会尝试从其他DataNode 读取该块的副本,也会将被损坏的块报告给 NameNode。
2.相关代码
// ⽂件读取
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.FileOutputStream;
import java.io.IOException;
public class HDFSReadDemo {
public static void main(String[] args) throws IOException {
// 设置客户端⽤户身份:root 具备在 hdfs 的读写权限
System.setProperty("HADOOP_USER_NAEM", "root");
// 创建 conf 对象
Configuration conf = new Configuration();
// 设置分布⽂件系统,默认为本地⽂件系统file:///
conf.set("fs.defaultFS", "hdfs://node01:9000");
// 创建 FileSystem 对象
FileSystem fs = FileSystem.get(conf);
// 调⽤ open ⽅法读取⽂件
FSDataInputStream in = fs.open(new Path("/data.txt"));
// 创建本地输出流
FileOutputStream out = new FileOutputStream("D:\\data.txt");
// IO ⼯具类实现流对拷⻉
IOUtils.copy(in, out);
// 关闭连接
fs.close();
}
}
版权归原作者 cx330上的猫 所有, 如有侵权,请联系我们删除。