0


【HBase分布式数据库】第七章 数据的导入导出 (2-5)

7.2 bulkload导入数据

任务目的

掌握引入外部依赖包的方法
掌握eclipse打包的方法
掌握bulkload导入数据的逻辑代码

任务清单

  • 任务1:引入外部依赖包
  • 任务2:bulkload导入数据

任务步骤

任务1:引入外部依赖包

Bulkload是通过一个MapReduce Job来实现的,通过Job直接生成一个HBase的内部HFile格式文件来形成一个特殊的HBase数据表,然后直接将数据文件加载到运行的集群中。使用bulk load功能最简单的方式就是使用importtsv 工具。importtsv 是从TSV文件直接加载内容至HBase的一个内置工具。它通过运行一个MapReduce Job,将数据从TSV文件中直接写入HBase的表或者写入一个HBase的自有格式数据文件。在编写代码逻辑之前,我们首先要引入程序依赖的jar包,步骤如下:

1、右键项目,选择【build path】>【configure build path】

8.2-1

8.2-1

2、在弹出的对话框内,单击【libraries】> 【add external jars】

8.2-2

8.2-2

3、弹出的对话框中,找到Hadoop存放jar包的路径,路径如图所示。

8.2-3

8.2-3

4、当前页面下的文件夹包括MapReduce、hdfs、yarn和common下的所有jar包,选中之后,单击底部的open按钮。需要注意的是,这4个包每个包需要单独打开,单独选中。全部添加完毕之后,单击【apply and close】

任务2:bulkload导入数据
构建BulkLoadJob类

importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FsShell;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.hbase.HBaseConfiguration;importorg.apache.hadoop.hbase.KeyValue;importorg.apache.hadoop.hbase.client.HTable;importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;importorg.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;importorg.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;importorg.apache.hadoop.hbase.util.Bytes;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.io.IOException;publicclassBulkLoadJob{//指定的类BulkLoadJob初始化日志对象,方便在日志输出的时候,可以打印出日志信息所属的类。staticLogger logger =LoggerFactory.getLogger(BulkLoadJob.class);//构建map端输入publicstaticclassBulkLoadMapextendsMapper<LongWritable,Text,ImmutableBytesWritable,KeyValue>{//map方法publicvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{//对输入数据进行切分String[] valueStrSplit = value.toString().split("\t");//拿到行键String hkey = valueStrSplit[0];//拿到列族String family = valueStrSplit[1].toString().split(":")[0];//拿到列String column = valueStrSplit[1].toString().split(":")[1];//拿到数值String hvalue = valueStrSplit[2];//行键转换成不可变型的字节finalbyte[] rowKey =Bytes.toBytes(hkey);finalImmutableBytesWritableHKey=newImmutableBytesWritable(rowKey);//把行键、列族、列和值封装成KV对儿KeyValue kv =newKeyValue(rowKey,Bytes.toBytes(family),Bytes.toBytes(column),Bytes.toBytes(hvalue));//写到磁盘
                        context.write(HKey, kv);}}publicstaticvoidmain(String[] args)throwsException{//配置信息的创建Configuration conf =HBaseConfiguration.create();
                conf.set("hbase.zookeeper.property.clientPort","2181");
                conf.set("hbase.zookeeper.quorum","localhost");//指定数据的输入和输出String[] dfsArgs =newGenericOptionsParser(conf, args).getRemainingArgs();String inputPath = dfsArgs[0];System.out.println("source: "+ dfsArgs[0]);String outputPath = dfsArgs[1];System.out.println("dest: "+ dfsArgs[1]);HTable hTable =null;Job job =Job.getInstance(conf,"Test Import HFile & Bulkload");
                job.setJarByClass(BulkLoadJob.class);
                job.setMapperClass(BulkLoadJob.BulkLoadMap.class);
                job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                job.setMapOutputValueClass(KeyValue.class);// 避免测试task
                job.setSpeculativeExecution(false);
                job.setReduceSpeculativeExecution(false);// 输入输出端的格式
                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(HFileOutputFormat2.class);FileInputFormat.setInputPaths(job, inputPath);FileOutputFormat.setOutputPath(job,newPath(outputPath));//指定表名
                hTable =newHTable(conf,"ns:t_table");HFileOutputFormat2.configureIncrementalLoad(job, hTable);if(job.waitForCompletion(true)){FsShell shell =newFsShell(conf);try{
                                shell.run(newString[]{"-chmod","-R","777", dfsArgs[1]});}catch(Exception e){
                                logger.error("Couldnt change the file permissions ", e);thrownewIOException(e);}//数据导入hbase表LoadIncrementalHFiles loader =newLoadIncrementalHFiles(conf);
                        loader.doBulkLoad(newPath(outputPath), hTable);}else{
                        logger.error("loading failed.");System.exit(1);}if(hTable !=null){
                        hTable.close();}}}

创建测试表

进入hbase的shell环境,创建测试命名空间和测试表。

bin/hbase shell
create_namespace 'ns'
create 'ns:t_table','cf1','cf2'

8.2-4

8.2-4

数据上传

HDFS上创建目录,把数据bulkdata.csv上传到HDFS。

cat bulkdata.csv
hadoop fs -mkdir -p /data/input
hadoop fs -put bulkdata.csv /data/input

8.2-5

8.2-5

完成了数据和程序之后,就要对程序打包了。

1、选中程序所在包,右键选择【export】

8.2-6

8.2-6

2、弹出的对话框中选择【Java】下的【jar file】,单击next。

8.2-7

8.2-7

3、在弹出的对话框,勾选依赖,指定jar包的输出路径。单击next。

8.2-8

8.2-8

4、本对话框不需要操作

8.2-9

8.2-9

5、在接下来的对话框中,需要指定运行主类。最后单击finish

8.2-10

8.2-10

运行jar包

使用Hadoop运行jar包的命令,执行导入数据操作。

hadoop jar /headless/Desktop/test.jar /data/input/bulkdata.csv /data/output/bulk_out

8.2-11

8.2-11

查看结果

进入shell环境,查看表中是否有数据。

bin/hbase shell
scan 'ns:t_table'

8.2-12

7.3 HBase的WordCount

任务目的
实践hbase的Wordcount
任务清单
任务1:准备工作
任务2:WordCount

任务步骤

任务1:准备工作

测试命名空间和测试表

进入shell环境。创建测试命名空间ns以及测试表src_table和dest_table,两张表都只有一个列族cf。

bin/hbase shell
create_namespace 'ns'
create 'ns:src_table','cf'
create 'ns:dest_table','cf'

测试数据

为测试表插入测试数据。

put 'ns:src_table','1','cf:word','hello'
put 'ns:src_table','2','cf:word','Java'
put 'ns:src_table','3','cf:word','hello'
put 'ns:src_table','4','cf:word','Scala'

8.3-1

8.3-1

任务2:WordCount

程序逻辑

新建wordcount包,在包下新建HbaseWordCount类。

packagewordcount;importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.Cell;importorg.apache.hadoop.hbase.CellUtil;importorg.apache.hadoop.hbase.HBaseConfiguration;importorg.apache.hadoop.hbase.client.Mutation;importorg.apache.hadoop.hbase.client.Put;importorg.apache.hadoop.hbase.client.Result;importorg.apache.hadoop.hbase.client.Scan;importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;importorg.apache.hadoop.hbase.mapreduce.TableMapper;importorg.apache.hadoop.hbase.mapreduce.TableReducer;importorg.apache.hadoop.hbase.util.Bytes;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;publicclassHbaseWordCount{//日志输出的时候,可以打印出日志信息所在类staticLogger logger =LoggerFactory.getLogger(HbaseWordCount.class);//设置服务器端口以及服务器地址staticConfiguration conf =null;static{
                conf =HBaseConfiguration.create();
                conf.set("hbase.zookeeper.property.clientPort","2181");
                conf.set("hbase.zookeeper.quorum","localhost");}publicstaticclassHBMapperextendsTableMapper<Text,IntWritable>{privatestaticIntWritable one =newIntWritable(1);privatestaticText word =newText();@Overrideprotectedvoidmap(ImmutableBytesWritable key,Result value,Mapper<ImmutableBytesWritable,Result,Text,IntWritable>.Context context)throwsIOException,InterruptedException{for(Cell cell : value.rawCells()){
                                word.set(CellUtil.cloneValue(cell));
                                context.write(word, one);}}}publicstaticclassHBReducerextendsTableReducer<Text,IntWritable,ImmutableBytesWritable>{@SuppressWarnings("deprecation")@Overrideprotectedvoidreduce(Text key,Iterable<IntWritable> values,Reducer<Text,IntWritable,ImmutableBytesWritable,Mutation>.Context context)throwsIOException,InterruptedException{int sum =0;for(IntWritable value : values){
                                sum += value.get();}//把单词作为行键进行存储Put put =newPut(Bytes.toBytes(key.toString()));//数据存储到hbase表,列族为cf,列为col,值为sum
                        put.add(Bytes.toBytes("cf"),Bytes.toBytes("col"),Bytes.toBytes(String.valueOf(sum)));//写到hbase中的需要指定行键和put
                        context.write(newImmutableBytesWritable(Bytes.toBytes(key.toString())), put);}}publicstaticvoidmain(String[] args)throwsIOException,ClassNotFoundException,InterruptedException{@SuppressWarnings("deprecation")Job job =newJob(conf,"hbase wordcount");Scan scan =newScan();//使用TableMapReduceUtil工具类初始化map,扫描源表中数据执行map操作TableMapReduceUtil.initTableMapperJob("ns:src_table",
                                scan,HBMapper.class,Text.class,IntWritable.class,
                                job);//使用TableMapReduceUtil工具类初始化reduce,把reduce之后的结果存储到目标表TableMapReduceUtil.initTableReducerJob("ns:dest_table",HBReducer.class,
                                job);
                job.waitForCompletion(true);System.out.println("finished");}}

执行结果

程序完成之后,运行程序。当我们看到“finished”后,进入shell环境查看目标表中数据。

scan 'ns:dest_table'

8.3-2

8.3-2

8.4 HDFS数据导入HBase

任务目的
掌握Hadoop与HBase的集成使用
任务清单
任务1:HDFS数据导入HBase

任务步骤

任务1:HDFS数据导入HBase

上传数据

在HDFS上新建/data/input目录,把hbase目录下的testdata中的csvdata.txt上传到该目录。

hadoop fs -mkdir -p /data/input
hadoop fs -put ./csvdata.txt /data/input

在这里插入图片描述
8.4-1

程序

新建一个hdfsandhbase包,包下新建一个Hdfs2Hbase类。

packagehdfsandhbase;importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.hbase.HColumnDescriptor;importorg.apache.hadoop.hbase.HTableDescriptor;importorg.apache.hadoop.hbase.NamespaceDescriptor;importorg.apache.hadoop.hbase.TableName;importorg.apache.hadoop.hbase.client.Admin;importorg.apache.hadoop.hbase.client.Connection;importorg.apache.hadoop.hbase.client.ConnectionFactory;importorg.apache.hadoop.hbase.client.Put;importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;importorg.apache.hadoop.hbase.mapreduce.TableReducer;importorg.apache.hadoop.hbase.util.Bytes;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;publicclassHdfs2Hbase{publicstaticclassMyMapperextendsMapper<LongWritable,Text,Text,Text>{@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,Text>.Context context)throwsIOException,InterruptedException{String line = value.toString();String[] lines = line.split(",");for(String s : lines){
                                context.write(newText(s),newText(1+""));}}}publicstaticclassMyReduceextendsTableReducer<Text,Text,ImmutableBytesWritable>{@Overrideprotectedvoidreduce(Text key,Iterable<Text> value,Context context)throwsIOException,InterruptedException{int counter =0;for(Text t:value){
                                counter +=Integer.parseInt(t.toString());}//写出到hbase中去Put put =newPut(Bytes.toBytes(key.toString()));
                        put.addColumn("data".getBytes(),"count".getBytes(),(counter+"").getBytes());
                        context.write(newImmutableBytesWritable(key.getBytes()), put);}}publicstaticvoidmain(String[] args)throwsIOException,ClassNotFoundException,InterruptedException{Configuration conf =newConfiguration();
                conf.set("fs.defaultFS","hdfs://localhost:9000");
                conf.set("hbase.zookeeper.quorum","localhost");TableName tn =TableName.valueOf("ns:test");//对hbase进行操作Connection conn =ConnectionFactory.createConnection(conf);Admin admin = conn.getAdmin();//创建命名空间NamespaceDescriptor nsd =NamespaceDescriptor.create("ns").build();
                admin.createNamespace(nsd);//创建表HTableDescriptor htd =newHTableDescriptor(TableName.valueOf("ns:test"));HColumnDescriptor hcd =newHColumnDescriptor("data");
                htd.addFamily(hcd);//判断表是否存在if(admin.tableExists(tn)){if(admin.isTableEnabled(tn)){
                                admin.disableTable(tn);}
                        admin.deleteTable(tn);}
                admin.createTable(htd);//定义jobJob job =Job.getInstance(conf,"hdfs2hbase");
                job.setMapperClass(MyMapper.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);//数据输入路径FileInputFormat.addInputPath(job,newPath("/data/input/csvdata.txt"));//使用TableMapreduceUtil初始化reduce TableMapReduceUtil.initTableReducerJob("ns:test",MyReduce.class,
                                job);
                job.waitForCompletion(true);System.out.println("finished");}}

查看结果

运行程序没有报错的情况下,进入shell环境,查看test表中数据。

bin/hbase shell
scan 'ns:test'

8.4-2

8.4-2

8.5 HBase数据导入HDFS

任务目的
掌握hbase数据导入到HDFS的程序逻辑
任务清单
任务1:HBase数据导入HDFS

任务步骤

任务1:HBase数据导入HDFS

原始表和数据

进入shell环境,创建测试表“ns:test”,包括一个列族cf和一个列col,并插入两条数据。

create_namespace 'ns'
create 'ns:test','cf'
put 'ns:test','1','cf:col','value1'
put 'ns:test','2','cf:col','value2'

8.5-1

8.5-1

程序

在hdfsandhbase包下新建Hbase2Hdfs类。

packagehdfsandhbase;importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.hbase.client.Result;importorg.apache.hadoop.hbase.client.Scan;importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;importorg.apache.hadoop.hbase.mapreduce.TableMapper;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;publicclassHbase2Hdfs{publicstaticclassMyMapperextendsTableMapper<Text,Text>{@Overrideprotectedvoidmap(ImmutableBytesWritable key,Result value,Mapper<ImmutableBytesWritable,Result,Text,Text>.Context context)throwsIOException,InterruptedException{//获取对应的列族和列,设置为utf-8String cfandc =newString(value.getValue("cf".getBytes(),"col".getBytes()),"utf-8");
                        context.write(newText(""),newText(cfandc));}}publicstaticclassMyReducerextendsReducer<Text,Text,Text,Text>{//实例化Text  用来存储获取到的数据privateText result =newText();@Overrideprotectedvoidreduce(Text key,Iterable<Text> values,Context context)throwsIOException,InterruptedException{for(Text t : values){
                                result.set(t);
                                context.write(key, result);}}}publicstaticvoidmain(String[] args)throwsIOException,ClassNotFoundException,InterruptedException{//配置相关信息Configuration conf =newConfiguration();
                conf.set("fs.defaultFS","hdfs://localhost:9000");
                conf.set("hbase.zookeeper.quorum","localhost");//实例化任务Job job =Job.getInstance(conf,"hbase2hdfs");//设置运行主类
                job.setJarByClass(Hbase2Hdfs.class);Scan scan =newScan();TableMapReduceUtil.initTableMapperJob("ns:test",
                                scan,MyMapper.class,Text.class,Text.class,
                                job);
                job.setReducerClass(MyReducer.class);//设置输出路径FileOutputFormat.setOutputPath(job,newPath("/data/output/out"));
                job.waitForCompletion(true);System.out.println("finished");}}

执行结果

退出shell环境,查看输出路径下的文件结果。

quit
hadoop fs -cat /data/output/out/part-r-00000

8.5-2
8.5-2


本文转载自: https://blog.csdn.net/qq_23934063/article/details/140887612
版权归原作者 不太灵光的程序员 所有, 如有侵权,请联系我们删除。

“【HBase分布式数据库】第七章 数据的导入导出 (2-5)”的评论:

还没有评论