前言:超市销售数据的管理和分析对于商家来说至关重要。借助Hadoop生态系统中的MapReduce框架,我们可以有效地处理大规模的销售数据,并从中提取有价值的信息,比如每个月的销售总额。本文将介绍如何利用Hadoop编写MapReduce程序来计算超市消费数据的月份销售总额。
一. 数据准备
首先,我们需要准备超市的销售数据。假设我们的数据以文本文件的形式存储,每行代表一笔销售记录,包括日期、商品名称和销售额等信息。例如:
二.Hadoop分布式文件系统(HDFS)
介绍
Hadoop分布式文件系统(HDFS)是Apache Hadoop生态系统的核心组件之一,用于存储和管理大规模数据集。它设计用于在廉价的硬件上运行,并且提供高可靠性、高性能的分布式存储解决方案。本文将深入探讨HDFS的工作原理、架构和优势。
HDFS架构
HDFS的架构由以下几个重要组件组成:
- NameNode:NameNode是HDFS的关键组件之一,负责管理文件系统的命名空间和存储块的元数据信息。它维护了文件系统树及其相关属性,并且记录了每个文件块的位置信息。
- DataNode:DataNode是另一个重要组件,负责实际存储数据块。每个DataNode管理着其所在节点上的存储,定期向NameNode报告其存储容量和健康状况。
- Secondary NameNode:Secondary NameNode并不是NameNode的备份,而是协助NameNode进行元数据的周期性合并和编辑日志的滚动。它帮助减轻了NameNode的压力,并且有助于提高系统的可靠性。
- 客户端:客户端是与HDFS交互的应用程序,负责向HDFS读取和写入数据。
HDFS工作原理
HDFS的工作原理可以简要概括为以下几个步骤:
- 文件分块:当一个文件被上传到HDFS时,它会被划分成固定大小的数据块(通常为128MB或256MB)。这些数据块被分散存储在多个DataNode上,以实现数据的分布式存储和处理。
- 命名空间管理:NameNode负责管理文件系统的命名空间和元数据信息。它维护文件系统树,记录文件和数据块的位置信息,并处理客户端的文件系统操作请求。
- 数据复制:为了提高数据的可靠性和容错性,HDFS会将每个数据块复制到多个DataNode上。这些副本通常分布在不同的机架上,以减少硬件故障对数据的影响。
- 容错机制:HDFS通过周期性的心跳检测和块报告来监视DataNode的健康状况。如果发现某个数据块的副本丢失或损坏,HDFS会自动从其他DataNode上的副本进行恢复。
HDFS的优势
HDFS作为大数据存储解决方案,具有以下几个显著的优势:
- 高可靠性:HDFS通过数据复制和容错机制,保证了数据的可靠性和容错性,即使在硬件故障的情况下也能保持数据的完整性。
- 高扩展性:HDFS可以轻松地扩展到数以千计的节点,以应对不断增长的数据量和处理需求。
- 高性能:由于数据块的分布式存储和并行处理,HDFS可以实现高吞吐量和低延迟的数据访问。
- 成本效益:HDFS运行在廉价的标准硬件上,并且采用了大规模并行处理的模式,使得它成为一种经济高效的数据存储解决方案。
在HDFS的/<个人学号>/data/路径(例如/202201/data)下创建并存储该数据文件;
通过hdfs dfs -cat/<个人学号>/data/xxx命令查看数据文件的截图,用以证明数据已成功存储在HDFS 上。
1.首先打开HDFS进程,并用hdfs dfs mkdir -p /学号/data 命令在HDFS中创建文件夹,并用hdfs dfs -ls/查看是否创建。
- 将csv文件准备好,并移动到linxu操作系统的 /opt/ 目录下,运用命令 hdfs dfs -put /opt/超市消费数据.csv /202201/data/(把在Linx下的数据转到HDFS上)。
并使用:hdfs dfs -cat查看是否成功上传。
三.MapReduce指标计算与存储
引言
MapReduce是一种用于大规模数据处理的编程模型,最初由Google提出,并在Apache Hadoop项目中得到了广泛的实现和应用。MapReduce分布式文件系统结合了分布式文件系统和MapReduce计算模型,为大规模数据处理提供了高效、可靠的解决方案。本文将深入探讨MapReduce分布式文件系统的原理、架构和应用场景。
MapReduce分布式文件系统架构
MapReduce分布式文件系统的架构通常由以下几个核心组件组成:
- Master节点:Master节点是MapReduce系统的控制节点,负责协调整个作业的执行过程。它包含了JobTracker(作业跟踪器)和Resource Manager(资源管理器)等关键组件。
- Worker节点:Worker节点是MapReduce集群中的计算节点,负责执行Map和Reduce任务。每个Worker节点包含了TaskTracker(任务跟踪器)和DataNode(数据节点)等关键组件。
- 分布式文件系统:MapReduce分布式文件系统通常基于Hadoop分布式文件系统(HDFS)或其他分布式文件系统实现。它负责存储输入数据和中间计算结果,并提供高可靠性、高扩展性的分布式存储服务。
工作原理
MapReduce分布式文件系统的工作原理可以简述为以下几个步骤:
- 作业提交:用户通过客户端提交MapReduce作业到Master节点,包括输入数据的位置和MapReduce任务的代码。
- 作业调度:Master节点接收到作业后,将其分配给空闲的Worker节点进行执行。它负责调度Map和Reduce任务的执行顺序,并监控整个作业的执行进度。
- 数据处理:Worker节点根据作业中指定的Map和Reduce函数对输入数据进行处理。Map任务将输入数据划分为多个键值对,并根据键将其分组;Reduce任务则对每个键值对组进行聚合计算。
- 中间结果存储:MapReduce分布式文件系统将Map任务产生的中间结果存储在分布式文件系统中,以便Reduce任务进行读取和处理。
- 结果输出:最终,Reduce任务将聚合结果写回到分布式文件系统中,并通知Master节点作业执行完成。
Mapper类
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SalesMapper extends Mapper<LongWritable, Text, Text, FloatWritable> {
private Text outputKey = new Text();
private FloatWritable outputValue = new FloatWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length == 3) {
String dateStr = fields[0]; // 日期字段
String amountStr = fields[1]; // 销售金额字段
try {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
Date date = dateFormat.parse(dateStr);
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
int month = calendar.get(Calendar.MONTH) + 1; // 月份从0开始,需要加1
outputKey.set(String.valueOf(month));
outputValue.set(Float.parseFloat(amountStr));
context.write(outputKey, outputValue);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
}
Reducer类
import java.io.IOException;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SalesReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {
private FloatWritable result = new FloatWritable();
public void reduce(Text key, Iterable<FloatWritable> values, Context context)
throws IOException, InterruptedException {
float sum = 0;
for (FloatWritable value : values) {
sum += value.get();
}
result.set(sum);
context.write(key, result);
}
}
Driver类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MonthlySalesDriver {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MonthlySalesDriver <input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Monthly Sales");
job.setJarByClass(MonthlySalesDriver.class);
job.setMapperClass(SalesMapper.class);
job.setReducerClass(SalesReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
将计算结果存储到Linux上的MySQL数据库中的t_<个人学号>表中(例如t_202201)
首先登录MySQL数据库:mysql -uroot -p123456,并手动创建数据库:create database mydata; 然后并查看是否创建成功:show databases;
使用use mydata 切换到目标数据库,然后使用show tables 查看所有的位于mydata数据库下的所有表。
使用select * from t_20220322,查看是否有数据存在,如若有,则说明成功
版权归原作者 2301_78160051 所有, 如有侵权,请联系我们删除。