0


hadoop之MapReduce

MapReduce的概念

Hadoop的三大组件:HDFS、Yarn、MapReduce。

HDFS:解决的是分布式存储的问题。

MapReduce: 解决的是计算问题。

Yarn: 计算的时候,使用的资源如何协调(Windows操作系统)

mapReduce的优缺点:

优点

1、易于编程
代码写起来有固定的格式,编写难度非常的小,号称是八股文【固定写法】。
2、良好的扩展性
代码的计算资源不够了,可以直接拓展几台即可解决
3、高容出错
如果负责计算的电脑挂掉了,直接可以将任务转移到其他电脑上,任务不会执行失败的。
4、非常适合大数据集的计算(PB级以上) 1P=1024T

缺点

1、不适合做实时计算
mapreduce一个任务就要跑很长时间,不利于实时。不能做到秒级或者毫秒级的计算。
mapreduce 属于离线的技术。
2、不适合做流式计算
数据因为都是静态的,不是边产生数据,边计算。
固定计算:数据量是固定的,给了1T 就计算。
3、不适合做有向图(DAG)计算
多个应用程序之间有依赖关系,后一个程序需要依赖前面的程序的结果。这种场景就称之为有向图,mapreduce是不适合的。

总结:MapReduce只能做离线的数据分析,并且计算速度比较慢

MapReduce案例--WordCount

1、新建maven项目,并且导入包

<!--指定代码编译的版本-->
<properties>
  <maven.compiler.target>1.8</maven.compiler.target>
  <maven.compiler.source>1.8</maven.compiler.source>
</properties>

<dependencies>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.3.1</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.3.1</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>3.3.1</version>
  </dependency>
</dependencies>

补充Maven的支持:

2、创建一些数据

在项目的根路径下,创建一个文件夹,mr01 ,在mr01文件夹下,再创建数据的来源文件input文件夹,

在input文件夹下面,新建file,a.txt, b.txt, c.txt

a.txt
hello bigdata hello 1999 hello beijing hello 
world hello hello java good

b.txt
hello gaoxinqu hello bingbing 
hello chenchen hello 
ACMilan hello china

c.txt
hello hadoop hello java hello storm hello spark hello redis hello zookeeper
hello hive hello hbase hello flume

3、编写代码

1)编写Map代码

package com.bigdata;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* @Author laoyan
* @Description TODO
* @Date 2022/8/1 9:35
* @Version 1.0
*
* Mapper中的四个泛型跟什么照应:
* 1、LongWritable  行偏移量,一般都是LongWritable ,这一行的数据是从第几个字符开始计算的,因为数据量很多这个值也会很大,所以使用Long
* 2、Text     指的是这一行数据
* 3、Text     Map任务输出的Key值的类型           单词
* 4、IntWritable     Map任务输出的Key值的类型     1
*
*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    
    // Ctrl + o  可以展示哪些方法可以重写
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        // key 值 指的是行偏移量
        // value 指的是 这一行数据
        //hello bigdata hello 1999 hello beijing hello
        String line = value.toString();
        // [hello,bigdata,hello,1999,hello,beijing,hello]
        String[] arr = line.split("\\s+");
        //  hello-> 1,bigdata->1,hello->1,1999->1,hello->1,beijing->1,hello->1
        for (String word: arr) {
            context.write(new Text(word),new IntWritable(1));
        }
        // fori 循环
        /*for (int i = 0; i < arr.length ; i++) {
            context.write(new Text(arr[i]),new IntWritable(1));
        }*/
        
    }
}

2)编写Reduce代码

package com.bigdata;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 *   reduce 是用来合并的
 *   reduce四个泛型:
 *   前两个,跟map的输出类型一样
 *   后面两个泛型:reduce端的输出类型
 *     hello 5
 *     world 2
 *     ...
 */
public class WordCountReducer extends Reducer<Text, IntWritable,Text, IntWritable> {

    // reduce 这个方法,有多少个key值,就会调用多少次
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // reduce 拿到的数据是什么样的呢   hello [1,1,1,1,1]   world [1,1]
        // 数据不是你想像的这个样子 hello 1   hello 1  hello 1
        int count = 0;
        // 第一种写法
        for (IntWritable num : values) {
            int i = num.get();
            count = count + i;
        }
        // 第二种写法
        /*Iterator<IntWritable> iterator = values.iterator();
        while(iterator.hasNext()){
            int i = iterator.next().get();
            count = count + i;
        }*/
        // hello 5
        context.write(key,new IntWritable(count));
    }
}

3)编写测试代码

package com.bigdata;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

    // 这个类就是把 Mapper和 Reducer 放在一起执行的
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        // 使用本地的文件系统,而不是hdfs
        configuration.set("fs.defaultFS","file:///");
        // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
        configuration.set("mapreduce.framework.name","local");
        Job job = Job.getInstance(configuration, "wordCount单词统计");

        // 指定 map
        job.setMapperClass(WordCountMapper.class);
        // hello 1
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 自定义分区器的使用
        job.setPartitionerClass(WordCountPartitioner.class);
        // 设置reduceTask的数量
        // reduce的数量决定了reduceTask的任务数量,每一个任务,结束后都会产生一个文件 part-r-xxxxx
        // 结论:reduceTask的数量可以和分区数量不一致,但是没有意义,一般两者保持一致。
        job.setNumReduceTasks(5);

        // 指定 reduce
        job.setReducerClass(WordCountReducer.class);
        // hello 5
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 此处也可以使用绝对路径
        FileInputFormat.setInputPaths(job,"../WordCount/mr01/input/");
        FileOutputFormat.setOutputPath(job,new Path("../WordCount/mr01/output2"));

        boolean result = job.waitForCompletion(true);

        // 返回结果如果为true表示任务成功了,正常退出,否则非正常退出
        System.exit(result?0:-1);
    }
}

4、遇到的错误:

1、输出路径已经存在

2、出现了警告

这个项目中需要log4j的配置,你没有给,所有会出这个警告(可以处理也不可以不处理)

但是如果遇到某个错误,不出结果,也不报错,这个时候就需要程序打印详细的日志查看。

需要在项目中添加一个log4j.properties

# Global logging configuration
#  Debug   info   warn  error 
log4j.rootLogger=DEBUG, stdout
# MyBatis logging configuration...
log4j.logger.org.mybatis.example.BlogMapper=TRACE
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

如果觉得日志中出现的这个错误非常的难受,可以进行解决:

解决这个问题的办法:

第一步:导入包:

<dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.0.6</version>
</dependency>

第二步:resources文件夹下引入一个logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="CONSOLE_LOG_PATTERN"
              value="%date{yyyy-MM-dd HH:mm:ss}  %highlight(%-5level) ${PID:- }--%magenta([%15.15(%thread)]) %cyan(%-50.50(%logger{50})) : %msg %n"/>
    <appender name="CONSOLE_APPENDER" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <charset>UTF-8</charset>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
        </encoder>
    </appender>
    <root level="info">
        <appender-ref ref="CONSOLE_APPENDER"/>
    </root>
</configuration>

3、如果出现如下问题,如何解决?

Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1215)
    at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1420)
    at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
    at org.apache.hadoop.fs.FileSystem$4.<init>(FileSystem.java:2180)
    at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:2179)
    at org.apache.hadoop.fs.ChecksumFileSystem.listLocatedStatus(ChecksumFileSystem.java:783)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:320)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:279)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:404)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:310)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:327)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:200)
    at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1571)
    at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1568)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1568)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1589)
    at mr02.WordCountDriver.main(WordCountDriver.java:58)

如果之前在windows中的hadoop中拷贝过hadoop.dll等文件,是不会出现这个问题的。需要进行安装包的补丁配置。

解决方案:

将之前的 hadoop.dll 文件拷贝在 C:/Windows/System32文件夹下一份,可以不重启试试,假如不管用,重启一下。


本文转载自: https://blog.csdn.net/Yz9876/article/details/142181174
版权归原作者 Yz9876 所有, 如有侵权,请联系我们删除。

“hadoop之MapReduce”的评论:

还没有评论