0


大数据技术使用java实现MapReduce对文件进行切分,分类汇总

Java使用MapReduce切分文件

比如有海量的文本文件,如订单,页面点击事件的记录,量特别大,很难搞定。
那么我们该怎样解决海量数据的计算?

1、获取总行数
2、计算每个文件中存多少数据
3、split切分文件
4、reduce将文件进行汇总

例如这里有百万条数据,单个文件操作太麻烦,所以我们需要进行切分
在切分文件的过程中会出现文件不能整个切分的情况,可能有剩下的数据并没有被读取到,所以我们每个切分128条数据,不足128条再保留到一个文件中


创建MapTask

importjava.io.*;importjava.util.HashMap;importjava.util.Map;importjava.util.Set;publicclassMapTaskextendsThread{//用来接收具体的哪一个文件privateFile file;privateint flag;publicMapTask(File file,int flag){this.file = file;this.flag = flag;}@Overridepublicvoidrun(){try{BufferedReader br =newBufferedReader(newFileReader(file));String line;HashMap<String,Integer> map =newHashMap<String,Integer>();while((line = br.readLine())!=null){/**
                 * 统计班级人数HashMap存储
                 */String clazz = line.split(",")[4];if(!map.containsKey(clazz)){
                    map.put(clazz,1);}else{
                    map.put(clazz, map.get(clazz)+1);}}
            br.close();BufferedWriter bw =newBufferedWriter(newFileWriter("F:\\IDEADEMO\\shujiabigdata\\part\\part---"+ flag));Set<Map.Entry<String,Integer>> entries = map.entrySet();for(Map.Entry<String,Integer> entry : entries){String key = entry.getKey();Integer value = entry.getValue();
                bw.write(key +":"+ value);
                bw.newLine();}
            bw.flush();
            bw.close();}catch(Exception e){
            e.printStackTrace();}}}

创建Map

import java.io.File;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Map {
    public static void main(String[] args) {
        long start= System.currentTimeMillis();// 多线程连接池(线程池)
        ExecutorService executorService = Executors.newFixedThreadPool(8);// 获取文件列表Filefile= new File("F:\\IDEADEMO\\shujiabigdata\\split");File[] files =file.listFiles();//创建多线程对象int flag =0;for(File f : files) {
            //为每一个文件启动一个线程
            MapTask mapTask = new MapTask(f, flag);
            executorService.submit(mapTask);
            flag++;
        }
        executorService.shutdown();
        long end= System.currentTimeMillis();
        System.out.println(end-start);
    }
}

创建ClazzSum

import java.io.BufferedReader;import java.io.FileReader;import java.util.HashMap;public class ClazzSum {
    public static void main(String[] args) throws Exception {
        long start= System.currentTimeMillis();
        BufferedReader br = new BufferedReader(
                new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\bigstudents.txt"));
        String line;
        HashMap<String,Integer> map = new HashMap<String,Integer>();while((line = br.readLine())!=null) {
            String clazz = line.split(",")[4];if(!map.containsKey(clazz)) {
                map.put(clazz,1);
            } else {
                map.put(clazz, map.get(clazz)+1);
            }
        }
        System.out.println(map);
        long end= System.currentTimeMillis();
        System.out.println(end-start);
    }
}

创建split128

importjava.io.BufferedReader;importjava.io.BufferedWriter;importjava.io.FileReader;importjava.io.FileWriter;importjava.util.ArrayList;publicclassSplit128{publicstaticvoidmain(String[] args)throwsException{BufferedReader br =newBufferedReader(newFileReader("F:\\IDEADEMO\\shujiabigdata\\data\\students.txt"));//用作标记文件,也作为文件名称int index =0;BufferedWriter bw =newBufferedWriter(newFileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---"+ index));ArrayList<String> list =newArrayList<String>();String line;//用作累计读取了多少行数据int flag =0;int row =0;while((line = br.readLine())!=null){
            list.add(line);
            flag++;// flag = 140if(flag ==140){// 一个文件读写完成,生成新的文件
                row =0+128* index;for(int i = row; i <= row +127; i++){
                    bw.write(list.get(i));
                    bw.newLine();}
                bw.flush();
                bw.close();/**
                 * 生成新的文件
                 * 计数清零
                 */
                index++;
                flag =12;
                bw =newBufferedWriter(newFileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---"+ index));}}//文件读取剩余128*1.1范围之内for(int i = list.size()- flag; i < list.size(); i++){
            bw.write(list.get(i));
            bw.newLine();}
        bw.flush();
        bw.close();}}

创建Reduce

importjava.io.BufferedReader;importjava.io.File;importjava.io.FileReader;importjava.util.HashMap;publicclassReduce{publicstaticvoidmain(String[] args)throwsException{long start =System.currentTimeMillis();HashMap<String,Integer> map =newHashMap<String,Integer>();File file =newFile("F:\\IDEADEMO\\shujiabigdata\\part");File[] files = file.listFiles();for(File f : files){BufferedReader br =newBufferedReader(newFileReader(f));String line;while((line = br.readLine())!=null){String clazz = line.split(":")[0];int sum =Integer.valueOf(line.split(":")[1]);if(!map.containsKey(clazz)){
                    map.put(clazz, sum);}else{
                    map.put(clazz, map.get(clazz)+ sum);}}}long end =System.currentTimeMillis();System.out.println(end-start);System.out.println(map);}}


最后将文件切分了8份,这里采用了线程池,建立线程连接,多个线程同时启动,比单一文件采用多线程效率更高更好使。

标签: mapreduce big data java

本文转载自: https://blog.csdn.net/hujieliang123/article/details/122546452
版权归原作者 liangzai2048 所有, 如有侵权,请联系我们删除。

“大数据技术使用java实现MapReduce对文件进行切分,分类汇总”的评论:

还没有评论