0


共享单车之数据分析-统计共享单车每天的平均使用时间

第1关:统计共享单车每天的平均使用时间

  • 任务描述
  • 相关知识 - 如何配置Hbase的MapReduce类- 如何使用Hbase的MapReduce进行数据分析
  • 编程要求
  • 测试说明

任务描述

本关任务:使用

Hbase

MapReduce

对已经存在 Hbase 的共享单车运行数据进行分析,统计共享单车每天的平均使用时间,其中共享单车运行数据在

Hbase

t_shared_bicycle

表中(表结构可在编程要求中进行查看)。

相关知识

为了完成本关任务,你需要掌握:

  1. 如何配置HbaseMapReduce类;
  2. 如何使用HbaseMapReduce进行数据分析。

如何配置

Hbase

MapReduce

MapReduce

是运行在

Job

上的一个并行计算框架,分为

Map

节点和

Reduce

节点。

Hbase

提供了

org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil

initTableMapperJob

initTableReducerJob

两个方法来完成

MapReduce

的配置。

initTableMapperJob 方法:

  1. /**
  2. *在提交TableMap作业之前使用它。 它会适当地设置
  3. * 工作。
  4. *
  5. * @param table要读取的表名。
  6. * @param scan具有列,时间范围等的扫描实例。
  7. * @param mapper要使用的mapper类。
  8. * @param outputKeyClass输出键的类。
  9. * @param outputValueClass输出值的类。
  10. * @param job当前要调整的工作。 确保传递的作业是
  11. *携带所有必要的HBase配置。
  12. * @throws IOException设置细节失败。
  13. */
  14. public static void initTableMapperJob(String table, Scan scan,
  15. Class<? extends TableMapper> mapper,
  16. Class<?> outputKeyClass,
  17. Class<?> outputValueClass, Job job)
  18. throws IOException
  19. / **

initTableReducerJob 方法:

  1. /**
  2. *在提交TableReduce作业之前使用它。 它会
  3. *适当设置JobConf。
  4. *
  5. * @param table输出表。
  6. * @param reducer要使用的reducer类。
  7. * @param job当前要调整的工作。
  8. * @throws IOException确定区域计数失败时。
  9. */
  10. public static void initTableReducerJob(String table,
  11. Class<? extends TableReducer> reducer, Job job)
  12. throws IOException

如何使用

Hbase

MapReduce

进行数据分析

下面我们以统计每个城市的酒店个数的例子来介绍

MapReduce

Map

节点和

Reduce

节点:

Map

节点执行类需要继承抽象类

TableMapper

,实现其

map

方法,结构如下:

  1. public static class MyMapper extends TableMapper<Text, DoubleWritable> {
  2. @Override
  3. protected void map(ImmutableBytesWritable rowKey, Result result, Context context) {
  4. }
  5. }

在**

map

方法中可从输入表(原数据表)得到行数据,最后向

Reduce

节点输出键值对

(key/value)

**。

  1. String cityId = Bytes.toString(result.getValue("cityInfo".getBytes(), "cityId".getBytes()));
  2. DoubleWritable i = new DoubleWritable(1);
  3. context.write(new Text(cityId),i);

下面介绍

Reduce

节点,

Reduce

节点执行类需要继承抽象类

TableReducer

,实现其

reduce

方法:

  1. public static class MyTableReducer extends TableReducer<Text, DoubleWritable, ImmutableBytesWritable> {
  2. @Override
  3. public void reduce(Text key, Iterable<DoubleWritable> values, Context context) {
  4. }
  5. }

reduce

方法里会**接收

map

方法里相同

key

的集合,最后把结果存到输出到表里**。

  1. double sum = 0;
  2. for (DoubleWritable num:values){
  3. sum += num.get();
  4. }
  5. Put put = new Put(Bytes.toBytes(key.toString()));
  6. put.addColumn("total_infos".getBytes(),"total".getBytes(),Bytes.toBytes(String.valueOf(sum)));
  7. context.write(null,put);//initTableReducerJob 设置了表名所以在这里无需设置了

编程要求

在右侧代码窗口完成代码编写:

  1. MapReduce类已经配置好,只需完成MapReduce的数据分析;
  2. map方法中,获取输入表t_shared_bicycle的相关信息,计算出使用时间=结束时间 - 开始时间,并把使用时间开始时间的日期传给reduce
  3. reduce方法中通过使用时间开始时间的日期计算共享单车每天平均使用时间,并把每天平均使用时间,四舍五入保留两位有效数字,存入到列族为info,字段为avgTime,ROWKEY 为avgTime的表里。
t_shared_bicycle

表结构如下:
列族名称字段对应的文件的描述ROWKEY (格式为:骑行

id

)infobeginTime开始时间

trip_id

infoendTime结束时间

trip_id

infobicycleId车辆

id
trip_id

infodeparture出发地

trip_id

infodestination目的地

trip_id

infocity所在城市

trip_id

infostart_longitude开始经度

trip_id

infostop_longitude结束经度

trip_id

infostart_latitude开始纬度

trip_id

infostop_latitude结束纬度

trip_id

测试说明

平台会对你编写的代码进行测试,若是与预期输出相同,则算通关。


开始你的任务吧,祝你成功!

package com.educoder.bigData.sharedbicycle;

import java.io.IOException;
import java.text.ParseException;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Scanner;
import java.math.RoundingMode;
import java.math.BigDecimal;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;

import com.educoder.bigData.util.HBaseUtil;

/**
 * 统计共享单车每天的平均使用时间
 */
public class AveragetTimeMapReduce extends Configured implements Tool {

    public static final byte[] family = "info".getBytes();

    public static class MyMapper extends TableMapper<Text, BytesWritable> {
        protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
                throws IOException, InterruptedException {
            /********** Begin *********/
         long beginTime = Long.parseLong(Bytes.toString(result.getValue(family, "beginTime".getBytes())));
         long endTime = Long.parseLong(Bytes.toString(result.getValue(family, "endTime".getBytes())));
         String format = DateFormatUtils.format(beginTime, "yyyy-MM-dd", Locale.CHINA);
         long useTime = endTime - beginTime;
         BytesWritable bytesWritable = new BytesWritable(Bytes.toBytes(format + "_" + useTime));
         context.write(new Text("avgTime"), bytesWritable);     
            /********** End *********/
        }
    }

    public static class MyTableReducer extends TableReducer<Text, BytesWritable, ImmutableBytesWritable> {
        @Override
        public void reduce(Text key, Iterable<BytesWritable> values, Context context)
                throws IOException, InterruptedException {
            /********** Begin *********/
          double sum = 0;
            int length = 0;
            Map<String, Long> map = new HashMap<String, Long>();
            for (BytesWritable price : values) {
                byte[] copyBytes = price.copyBytes();
                String string = Bytes.toString(copyBytes);
                String[] split = string.split("_");
                if (map.containsKey(split[0])) {
                    Long integer = map.get(split[0]) + Long.parseLong(split[1]);
                    map.put(split[0], integer);
                } else {
                    map.put(split[0], Long.parseLong(split[1]));
                }
            }
            Collection<Long> values2 = map.values();
            for (Long i : values2) {
                length++;
                sum += i;
            }
            BigDecimal decimal = new BigDecimal(sum / length /1000);
            BigDecimal setScale = decimal.setScale(2, RoundingMode.HALF_DOWN);
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.addColumn(family, "avgTime".getBytes(), Bytes.toBytes(setScale.toString()));
            context.write(null, put);
            /********** End *********/
        }

    }

    public int run(String[] args) throws Exception {
        // 配置Job
        Configuration conf = HBaseUtil.conf;
        // Scanner sc = new Scanner(System.in);
        // String arg1 = sc.next();
        // String arg2 = sc.next();
        String arg1 = "t_shared_bicycle";
        String arg2 = "t_bicycle_avgtime";
        try {
            HBaseUtil.createTable(arg2, new String[] { "info" });
        } catch (Exception e) {
            // 创建表失败
            e.printStackTrace();
        }
        Job job = configureJob(conf, new String[] { arg1, arg2 });
        return job.waitForCompletion(true) ? 0 : 1;
    }

    private Job configureJob(Configuration conf, String[] args) throws IOException {
        String tablename = args[0];
        String targetTable = args[1];
        Job job = new Job(conf, tablename);
        Scan scan = new Scan();
        scan.setCaching(300);
        scan.setCacheBlocks(false);// 在mapreduce程序中千万不要设置允许缓存
        // 初始化Mapreduce程序
        TableMapReduceUtil.initTableMapperJob(tablename, scan, MyMapper.class, Text.class, BytesWritable.class, job);
        // 初始化Reduce
        TableMapReduceUtil.initTableReducerJob(targetTable, // output table
                MyTableReducer.class, // reducer class
                job);
        job.setNumReduceTasks(1);
        return job;
    }
}

本文转载自: https://blog.csdn.net/m0_58245389/article/details/130800562
版权归原作者 呵呵world 所有, 如有侵权,请联系我们删除。

“共享单车之数据分析-统计共享单车每天的平均使用时间”的评论:

还没有评论