《Hadoop核心技术》
一、定义与概述
Hadoop 是一个开源的分布式计算平台,主要用于存储和处理大规模数据集。它的设计初衷是为了能够在由普通硬件构建的集群上高效运行,通过分布式存储和分布式处理来应对数据量的增长和复杂的数据处理需求。Hadoop 具有高可扩展性、高可靠性和高效性等特点,被广泛应用于大数据领域,如互联网公司的数据仓库建设、日志分析、机器学习数据预处理等场景。
- 使用的软件是这两个
二、Hadoop基础架构
- Hadoop分布式文件系统(HDFS)
- 架构原理:详细介绍了HDFS的主从架构,包括NameNode(名称节点)和DataNode(数据节点)的功能。NameNode主要负责管理文件系统的命名空间,维护文件和目录的元数据,如文件名、目录结构、文件权限等。DataNode则负责实际的数据存储和读写操作,它们会将数据存储在本地磁盘上,并根据NameNode的指令进行数据的复制、删除等操作。例如,一个文件在HDFS中被分割成多个数据块(block),这些数据块会被存储在不同的DataNode上,默认的块大小是128MB,这种数据块的存储方式有利于数据的分布式存储和并行处理。
- 数据读写流程:- 写数据流程:当客户端要向HDFS写入数据时,首先会与NameNode通信,NameNode会根据文件系统的可用空间和DataNode的负载等情况,选择一系列合适的DataNode来存储数据块。然后客户端将数据块依次写入这些选定的DataNode中,并且每个数据块会在多个DataNode上进行冗余存储(默认复制因子为3),以提高数据的可靠性。- 读数据流程:客户端读取文件时,会先向NameNode询问文件数据块的存储位置,然后直接从相应的DataNode中读取数据块,多个DataNode可以并行地为客户端提供数据,从而提高读取速度。
- 高可靠性机制:通过数据冗余(副本机制)来保证数据的可靠性。当某个DataNode出现故障时,HDFS可以从其他副本中恢复数据。同时,NameNode也有相应的备份机制,如Secondary NameNode,它可以定期合并NameNode的编辑日志和镜像文件,在NameNode故障时,可以利用这些备份信息进行恢复。
- YARN(Yet Another Resource Negotiator)资源管理框架
- 架构组成:YARN主要由ResourceManager(资源管理器)、NodeManager(节点管理器)和ApplicationMaster(应用程序管理器)组成。ResourceManager负责整个集群资源的管理和分配,它接收来自各个应用程序的资源请求,并根据集群的资源状况进行调度。NodeManager运行在每个节点上,负责管理本节点的资源(如CPU、内存等),并向ResourceManager汇报资源使用情况。ApplicationMaster是每个应用程序特有的,它负责与ResourceManager协商资源,并与NodeManager通信,以在集群中启动和监控应用程序的任务。
- 任务调度机制:YARN采用了多种调度策略,如先来先服务(FCFS)、公平调度(Fair Scheduler)和容量调度(Capacity Scheduler)。公平调度会尽量保证每个用户或应用程序都能获得公平的资源份额;容量调度则可以为不同的用户或部门分配固定的资源容量,以满足不同的业务需求。例如,在一个多用户的集群中,使用公平调度可以确保小任务和大任务都能在合理的时间内得到资源执行。
- 代码示例:使用Java API读取HDFS文件
以下是一个简单的示例,用于读取HDFS中的文件内容。首先,需要导入相关的Hadoop库,包括org.apache.hadoop.conf.Configuration和org.apache.hadoop.fs.FileSystem等。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class HDFSReadFile {
public static void main(String[] args) {
try {
// 创建Hadoop配置对象
Configuration conf = new Configuration();
// 获取文件系统对象,这里是HDFS
FileSystem fs = FileSystem.get(conf);
// 指定要读取的文件路径
Path filePath = new Path("/user/hadoop/input/sample.txt");
// 打开文件输入流
FSDataInputStream in = fs.open(filePath);
// 读取文件内容并打印
byte[] buffer = new byte[1024];
int bytesRead = 0;
while ((bytesRead = in.read(buffer)) > 0) {
System.out.println(new String(buffer, 0, bytesRead));
}
// 关闭输入流
in.close();
// 关闭文件系统对象
fs.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
三、MapReduce编程模型
- 基本概念- Map和Reduce函数:MapReduce是一种编程模型,其中Map函数主要用于对输入数据进行处理,将输入数据转换为一系列的键值对(key - value pairs)。例如,对于一个文本文件,Map函数可以将每行文本拆分成单词,并以单词为键,出现次数为值(初始值为1)生成键值对。Reduce函数则是对Map阶段输出的具有相同键的值进行合并和汇总。比如,将相同单词的出现次数进行累加,得到每个单词在整个文件中的总出现次数。- 数据流程:数据首先被分割成多个数据块,这些数据块作为Map任务的输入。每个Map任务独立地处理数据块,生成中间结果(键值对)。然后这些中间结果会根据键进行分区(partition)和排序(sort),相同键的值会被发送到同一个Reduce任务中进行处理。Reduce任务处理完后输出最终结果。
- 编程实践- 示例代码:书中会通过实际的代码示例来展示如何编写MapReduce程序。例如,用Java编写一个简单的单词计数程序。在Map阶段,通过继承Mapper类,重写map方法来实现单词的拆分和计数;在Reduce阶段,继承Reducer类,重写reduce方法来实现单词计数的累加。同时还会介绍如何配置和运行MapReduce程序,包括如何设置输入输出路径、如何指定Mapper和Reducer类等。
以下是一个完整的使用 Java 编写 MapReduce 程序的示例,以经典的单词计数(WordCount)为例,展示如何统计文本文件中每个单词的出现次数:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// 定义一个可写的整数类型,用于表示计数1
private final static IntWritable one = new IntWritable(1);
// 定义一个文本类型,用于存储单词
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将输入的一行文本转换为字符串
String line = value.toString();
// 按照空格将行分割成单词数组
String[] words = line.split(" ");
// 遍历单词数组,对每个单词生成<单词, 1>的键值对
for (String w : words) {
word.set(w);
context.write(word, one);
}
}
}
- 优化策略:包括数据本地化优化(尽量让Map任务处理的数据存储在本地节点,减少数据传输开销)、Combiner优化(在Map端对数据进行部分合并,减少中间数据量)等。例如,在单词计数程序中,使用Combiner可以在Map端先对本地数据块中的相同单词进行计数合并,减少传输到Reduce端的数据量。
首先,定义一个Combiner类,它的实现逻辑与Reducer类类似
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
在作业配置中启用Combiner:在提交MapReduce作业的主程序中,通过job.setCombinerClass方法来启用Combiner。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountJobSubmitWithCombiner {
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "WordCountWithCombiner");
job.setJarByClass(WordCountJobSubmitWithCombiner.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置Combiner类,这里使用与Reducer相同的逻辑
job.setCombinerClass(WordCountCombiner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/user/hadoop/input"));
FileOutputFormat.setOutputPath(job, new Path("/user/hadoop/output"));
System.out.println(job.waitForCompletion(true)?"作业成功":"作业失败");
} catch (Exception e) {
e.printStackTrace();
}
}
}
四、Hadoop生态系统组件
- Hive
- 数据仓库功能:Hive是建立在Hadoop之上的数据仓库工具,它允许用户使用类似于SQL的查询语言(Hive SQL或HiveQL)来查询和分析存储在HDFS中的数据。它将SQL查询转换为一系列的MapReduce或其他执行引擎(如Tez、Spark)可以执行的任务。例如,用户可以使用Hive来对存储在HDFS中的日志文件进行数据分析,如查询某一时间段内的访问次数最多的页面等。
- 数据存储和表管理:Hive的数据存储在HDFS中,它支持多种数据格式,如文本格式、序列文件(SequenceFile)、ORC(Optimized Row Columnar)和Parquet等。用户可以通过创建表来定义数据的结构,包括列名、数据类型等。同时,Hive还支持分区表和桶表,分区表可以根据某个列的值(如日期)将数据分成不同的分区,便于数据的管理和查询;桶表则可以将数据按照哈希值等方式进行分组存储,提高查询性能。
1.解压,命令:tar -zxvf /opt/apache-hive-3.1.3-bin.tar.gz -C /opt
2.修改配置,命令:vi /etc/profile.d/my_env.sh,添加以下部分内容
- Pig
- 数据流语言Pig Latin:Pig提供了一种高级的数据流语言Pig Latin,它允许用户通过编写类似于脚本的程序来处理数据。Pig Latin程序主要由一系列的操作(如LOAD、FILTER、GROUP、FOREACH等)组成,用于数据的加载、过滤、分组和处理。例如,使用Pig可以很方便地从一个大型的数据文件中过滤出满足一定条件的数据,并对这些数据进行分组和聚合操作。
- 与MapReduce的对比和优势:与MapReduce相比,Pig更加灵活和易于使用。它隐藏了MapReduce的一些复杂细节,用户不需要深入了解MapReduce的编程模型就可以进行数据处理。而且Pig可以自动优化用户编写的程序,将其转换为高效的MapReduce或其他执行计划。
代码示例
- 准备数据 假设我们有一个名为 input.txt 的文本文件,内容如下:
hello world
hello hadoop
world hadoop
- Pig Latin 脚本示例
-- 加载数据,指定数据文件路径和格式(这里是文本格式)
input_data = LOAD 'input.txt' AS (line:chararray);
-- 将每行文本拆分成单词,生成新的关系(类似于Map操作)
words = FOREACH input_data GENERATE FLATTEN(TOKENIZE(line)) AS word;
-- 对单词进行分组,以便后续统计每个单词的出现次数(类似于Reduce操作的前置步骤)
grouped_words = GROUP words BY word;
-- 统计每个组(即每个单词)中的元素数量,得到每个单词的出现次数
word_count = FOREACH grouped_words GENERATE group AS word, COUNT(words) AS count;
-- 按照单词出现次数进行降序排序(可选步骤,可根据需求添加)
sorted_word_count = ORDER word_count BY count DESC;
-- 输出结果到控制台(也可以指定输出到文件等其他位置)
DUMP sorted_word_count;
- HBase
- 分布式数据库架构:HBase是一个分布式的、面向列的非关系型数据库,它构建在HDFS之上。它的架构包括Master节点(负责管理表的元数据和区域(Region)的分配)和RegionServer节点(负责存储和管理数据)。数据在HBase中以表的形式存储,表由行和列族组成,列族可以包含多个列。例如,在一个存储用户信息的HBase表中,可以有“基本信息”和“购物信息”两个列族,每个列族下可以有多个列,如“姓名”、“年龄”属于“基本信息”列族,“购买商品名称”、“购买金额”属于“购物信息”列族。
- 数据读写操作和性能特点:HBase的数据读写操作是基于行键(Row Key)的,数据按照行键进行排序存储。读操作可以通过行键或者行键范围来快速定位数据;写操作会先将数据写入内存中的MemStore,当MemStore达到一定大小后,会将数据刷写到磁盘上的HFile中。HBase适合存储海量的结构化或半结构化数据,并且具有高并发读写的性能优势,例如在一个实时数据采集和存储的场景中,HBase可以快速地接收和存储大量的传感器数据。
这些重点内容可以帮助读者全面理解Hadoop的核心技术,包括其分布式存储、资源管理和数据处理等方面的知识,以及如何利用Hadoop生态系统中的各种工具进行大数据的存储和分析。
版权归原作者 谭雪华 所有, 如有侵权,请联系我们删除。