0


Hadoop系统应用之MapReduce相关操作【IDEA版】---经典案例“倒排索引、数据去重、TopN”

倒排索引

一、实验说明

倒排索引是文档检索系统中最常用的数据结构,被广泛应用于全文搜索引擎。倒排索引主要用来存储某个单词(或词组)在一组文档中的存储位置的映射,提供了可以根据内容来查找文档的方式,而不是根据文档来确定内容,因此称为倒排索引(Inverted Index)。带有倒排索引的文件称为倒排索引文件,简称倒排文件(Inverted File)。

二、实验准备

在之前建立的HadoopDemo的基础上进行实验

三、开始实验

1.启动Hadoop服务

输入命令:

start-dfs.sh
start-yarn.sh
jps

2.虚拟机vM上创建相应的文本文件

创建

/export/mrtxt

目录,在里面创建三个文本文件

file1.txt

file2.txt

file3.txt

3.上传文件到HDFS指定目录

在hdfs上创建目录

/mrtxt/input

,将三个文本文件

file1.txt

file2.txt

file3.txt

,上传到HDFS的

/mrtxt/input

目录

在UI界面查看是否上传成功

三、开始实验

1.Map阶段实现【创建倒排索引映射器类:InvertedIndexMapper】

打开之前创建的Maven项目 HadoopDemo,创建net.army.mr包,在该路径下编写自定义Mapper类InvertedIndexMapper,主要用于将文本中的单词按照空格进行切割,并以冒号拼接,“单词:文档名称”作为key,单词次数作为value,都以文本方式输出至Combine阶段。

新建Java类

编写代码

package net.army.mr;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
 * 作者:yangjian
 * 日期:2023/04/24
 * 功能:倒排索引映射器类
 */

public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
    private static Text keyInfo = new Text(); // 存储单词和URL组合
    private static final Text valueInfo = new Text("1"); // 存储词频,初始化为1

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 获取文件行数据
        String line = value.toString();
        // 拆分得到单词数组
        String[] words = StringUtils.split(line, " ");
        // 得到这行数据所在的文件切片
        FileSplit fileSplit = (FileSplit) context.getInputSplit();
        // 根据文件切片得到文件名
        String fileName = fileSplit.getPath().getName();
        for (String word : words) {
            // key值由单词和URL组成,如“MapReduce:file1.txt”
            keyInfo.set(word + ":" + fileName);
            // 将键值对数据传入下一个阶段
            context.write(keyInfo, valueInfo);
        }
    }
}

2.Combine阶段实现【创建倒排索引合并器类:InvertedIndexCombiner】

根据Map阶段的输出结果形式,在net.army.mr包下,自定义实现Combine阶段的类InvertedIndexCombiner,对每个文档的单词进行词频统计。

新建Java类

编写代码

package net.army.mr;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * 作者:yangjian
 * 日期:2023/04/24
 * 功能:倒排索引合并器类
 */

public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {

    private static Text info = new Text();

    // 输入: <MapReduce:file3.txt {1,1,...}>
    // 输出: <MapReduce file3.txt:2>
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        // 统计词频
        int sum = 0;
        for (Text value : values) {
            sum += Integer.parseInt(value.toString());
        }
        // 获取分隔符冒号的位置
        int splitIndex = key.toString().indexOf(":");
        // 重新设置value值由URL和词频组成
        info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
        // 重新设置key值为单词
        key.set(key.toString().substring(0, splitIndex));
        // 将键值对数据传入下一个阶段
        context.write(key, info);
    }
}

3.Reduce阶段实现【创建倒排索引归并器类:InvertedIndexReducer】

根据Combine阶段的输出结果形式,同样在net.army.mr包下,自定义Reducer类InvertedIndexMapper,主要用于接收Combine阶段输出的数据,并最终案例倒排索引文件需求的样式,将单词作为key,多个文档名称和词频连接作为value,输出到目标目录。

新建Java类

编写代码

package net.army.mr;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * 作者:yangjian
 * 日期:2023/04/24
 * 功能:倒排索引归并器类
 */

public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {

    private static Text result = new Text();

    // 输入:<MapReduce file3.txt:2>
    // 输出:<MapReduce file1.txt:1;file2.txt:1;file3.txt:2;>
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        // 生成文档列表
        String fileList = new String();
        for (Text value : values) {
            fileList += value.toString() + ";";
        }
        // 设置结果数据
        result.set(fileList);
        // 将键值对数据输出
        context.write(key, result);
    }
}

4.Driver主类实现【创建倒排索引驱动器类:InvertedIndexDriver】

编写代码

package net.army.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.net.URI;

/**
 * 作者:yangjian
 * 日期:2023/04/24
 * 功能:倒排索引驱动器类
 */

public class InvertedIndexDriver {
    public static void main(String[] args) throws Exception {
        // 创建配置对象
        Configuration conf = new Configuration();
        // 设置数据节点主机名属性
        conf.set("dfs.client.use.datanode.hostname", "true");

        // 获取作业实例
        Job job = Job.getInstance(conf);

        // 设置作业启动类
        job.setJarByClass(InvertedIndexDriver.class);

        // 设置Mapper类
        job.setMapperClass(InvertedIndexMapper.class);
        // 设置map任务输出键类型
        job.setMapOutputKeyClass(Text.class);
        // 设置map任务输出值类型
        job.setMapOutputValueClass(Text.class);

        // 设置Combiner类
        job.setCombinerClass(InvertedIndexCombiner.class);

        // 设置Reducer类
        job.setReducerClass(InvertedIndexReducer.class);
        // 设置reduce任务输出键类型
        job.setOutputKeyClass(Text.class);
        // 设置reduce任务输出值类型
        job.setOutputValueClass(Text.class);

        // 定义uri字符串
        String uri = "hdfs://YANGJIAN00:9000";
        // 创建输入目录
        Path inputPath = new Path(uri + "/mrtxt/input");
        // 创建输出目录
        Path outputPath = new Path(uri + "/mrtxt/output");

        // 获取文件系统
        FileSystem fs = FileSystem.get(new URI(uri), conf);
        // 删除输出目录
        fs.delete(outputPath, true);

        // 给作业添加输入目录
        FileInputFormat.addInputPath(job, inputPath);
        // 给作业设置输出目录
        FileOutputFormat.setOutputPath(job, outputPath);

        // 等待作业完成
        job.waitForCompletion(true);

        // 输出统计结果
        System.out.println("======统计结果======");
        FileStatus[] fileStatuses = fs.listStatus(outputPath);
        for (int i = 1; i < fileStatuses.length; i++) {
            // 输出结果文件路径
            System.out.println(fileStatuses[i].getPath());
            // 获取文件输入流
            FSDataInputStream in = fs.open(fileStatuses[i].getPath());
            // 将结果文件显示在控制台
            IOUtils.copyBytes(in, System.out, 4096, false);
        }
    }
}

5. 运行倒排索引驱动器类【运行InvertedIndexDriver类】,查看结果

6.问题说明

运行过程中出现以下问题

问题解决方法(更换两个基础文件)

地址如下:(8条消息) Hadoop系统应用之java-API对HDFS的操作实验缺少的两个文件-桌面系统文档类资源-CSDN文库

数据去重

一、实验说明

数据去重主要是为了掌握利用并行化思想来对数据进行有意义的筛选,数据去重指去除重复数据的操作。在大数据开发中,统计大数据集上的多种数据指标,这些复杂的任务数据都会涉及数据去重。

二、实验准备

在之前建立的HadoopDemo的基础上进行实验

三、开始实验

1. Map阶段实现

新建Java类

编写代码

package cn.itcast.mr.dedup;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    private static Text line = new Text();//每行数据
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {
        line = value;
        context.write(line,NullWritable.get());
    }

}

2. Reduce阶段实现

新建Java类

编写代码

package cn.itcast.mr.dedup;

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    //重写reduce()方法
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        context.write(key,NullWritable.get());
    }
}

3.Driver程序主类实现

新建Java类

编写代码

package cn.itcast.mr.dedup;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

public class DedupDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(DedupDriver.class);
        job.setMapperClass(DedupMapper.class);
        job.setReducerClass(DedupReducer.class);
        //设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path("D:\\Hadoop\\HDFS\\Dedup\\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\Hadoop\\HDFS\\Dedup\\output"));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

4.在本机文件下编辑要处理的文件

5.查看运行结果

TopN

一、实验说明

TopN分析法是指从研究对象中按照某一个指标进行倒序或正序排列,取其中所需的N个数据,并对这N个数据进行重点分析的方法。

二、实验准备

在之前建立的HadoopDemo的基础上进行实验

三、开始实验

1. Map阶段实现

新建Java类

编写代码

package cn.itcast.mr.topN;

import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TopNMapper extends Mapper<LongWritable, Text, NullWritable, IntWritable>{
    private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, IntWritable>.Context context)throws IOException, InterruptedException {
        String line = value.toString();
        String[] nums = line.split(" ");

        for (String num :nums) {
            repToRecordMap.put(Integer.parseInt(num),"");
            if(repToRecordMap.size()>50) {
                repToRecordMap.remove(repToRecordMap.firstKey());
            }
        }
    }
    protected void cleanup (Mapper<LongWritable, Text, NullWritable, IntWritable>.Context context) {
        for(Integer i : repToRecordMap.keySet()) {
            try {
                context.write(NullWritable.get(), new IntWritable(i));
            }catch(Exception e) {
                e.printStackTrace();
            }

        }
    }

}

2. Reduce阶段实现

新建Java类

编写代码

package cn.itcast.mr.topN;

import java.io.IOException;
import java.util.Comparator;
import java.util.TreeMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class TopNReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {
    private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>(new Comparator<Integer>() {
        public int compare(Integer a, Integer b) {
            return b-a;
        }
    });
    @Override
    public void reduce(NullWritable key, Iterable<IntWritable> values, Reducer<NullWritable, IntWritable, NullWritable, IntWritable>.Context context) throws IOException, InterruptedException {

        for(IntWritable value :values) {
            repToRecordMap.put(value.get(),"");
            if(repToRecordMap.size()>50) {
                repToRecordMap.remove(repToRecordMap.firstKey());
            }
        }

        for(Integer i:repToRecordMap.keySet()) {
            context.write(NullWritable.get(), new IntWritable(i));
        }
    }

}

3.Driver程序主类实现

新建Java类

编写代码

package cn.itcast.mr.topN;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

public class TopNDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(TopNDriver.class);
        job.setMapperClass(TopNMapper.class);
        job.setReducerClass(TopNReducer.class);
        //设置输出类型
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(IntWritable.class);
        //设置输入和输出目录
        FileInputFormat.addInputPath(job, new Path("D:\\Hadoop\\HDFS\\TopN\\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\Hadoop\\HDFS\\TopN\\output"));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

4.在本机文件下编辑要处理的文件

5.查看运行结果

​​​​​​​


本文转载自: https://blog.csdn.net/m0_57464618/article/details/130348152
版权归原作者 -牧心- 所有, 如有侵权,请联系我们删除。

“Hadoop系统应用之MapReduce相关操作【IDEA版】---经典案例“倒排索引、数据去重、TopN””的评论:

还没有评论