文章目录
HBase表数据的读、写操作与综合操作
一、实验目标
- 熟练掌握通过HBase shell命令来设计HBase表结构实例
- 掌握使用HBase编程创建HBase表、删除HBase表、修改HBase表和查看HBase表和表结构。
- 掌握通过HBase 编程实现HBase表数据的读、写操作
二、实验要求及注意事项
- 给出每个实验的主要实验步骤、实现代码和测试效果截图。
- 对本次实验工作进行全面的总结分析。
- 建议工程名,类名、包名或表名显示个人学号或者姓名
三、实验内容及步骤
实验任务1:使用MapReduce批量将HBase表中数据导入到HDFS上。表名和表中数据自拟。
主要实现步骤和运行效果图:
完整程序
WjwReadMapper:
packagehbase;importorg.apache.hadoop.hbase.mapreduce.TableMapper;importorg.apache.hadoop.hbase.*;importorg.apache.hadoop.hbase.io.*;importorg.apache.hadoop.hbase.client.*;importorg.apache.hadoop.hbase.util.*;importorg.apache.hadoop.io.*;publicclassWjwReadMapperextendsTableMapper<Writable,Writable>{privateText k=newText();privateText v=newText();publicstaticfinalStringF1="\u0001";protectedvoidsetup(Context c){}publicvoidmap(ImmutableBytesWritable row,Result r,Context c){String value=null;String rk=newString(row.get());byte[] family=null;byte[] column=null;long ts=0L;try{for(KeyValue kv:r.list()){
value=Bytes.toStringBinary(kv.getValue());
family=kv.getFamily();
column=kv.getQualifier();
ts=kv.getTimestamp();
k.set(rk);
v.set(Bytes.toString(family)+F1+Bytes.toString(column)+F1+value+F1+ts);
c.write(k, v);}}catch(Exception e){
e.printStackTrace();System.err.println();}}}
WjwReadMain:
packagehbase;importjava.io.IOException;importorg.apache.hadoop.io.*;importorg.apache.commons.logging.*;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.*;importorg.apache.hadoop.hbase.HBaseConfiguration;importorg.apache.hadoop.hbase.client.*;importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;importorg.apache.hadoop.hbase.util.*;importorg.apache.hadoop.mapreduce.*;importorg.apache.hadoop.mapreduce.lib.output.*;publicclassWjwReadMain{publicstaticfinalLogLOG=LogFactory.getLog(WjwMain.class);publicstaticfinalStringNAME="Member Test1";publicstaticfinalStringTEMP_INDEX_PATH="hdfs://master:9000/tmp/tb_wjw";publicstaticString inputTable ="tb_wjw";publicstaticvoidmain(String[] args)throwsIOException,ClassNotFoundException,InterruptedException{Configuration conf =HBaseConfiguration.create();Scan scan =newScan();
scan.setBatch(0);
scan.setCaching(10000);
scan.setMaxVersions();
scan.setTimeRange(System.currentTimeMillis()-3*24*3600*1000L,System.currentTimeMillis());
scan.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("keyword"));
conf.setBoolean("mapred.map.tasks.speculative.execution",false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution",false);Path tmpIndexPath =newPath(TEMP_INDEX_PATH);FileSystem fs =FileSystem.get(conf);if(fs.exists(tmpIndexPath)){
fs.delete(tmpIndexPath,true);}Job job =newJob(conf,NAME);
job.setJarByClass(WjwMain.class);TableMapReduceUtil.initTableMapperJob(inputTable, scan,WjwMapper.class,Text.class,Text.class, job);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TextOutputFormat.class);FileOutputFormat.setOutputPath(job, tmpIndexPath);boolean success = job.waitForCompletion(true);System.exit(success?0:1);}}
运行结果
创建表,用于等会将数据传入hadoop里
运行map程序将表数据导入hadoop,并查看是否导入成功
实验任务2:使用MapReduce批量将HDFS上的数据导入到HBase表中。表名和数据自拟,建议体现个人学号或姓名。使用Java编程创建表和删除表,表名和列族自拟。
主要实现步骤和运行效果图:
完整程序
WjwWriteMapper:
packagehbase;importjava.io.*;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.io.*;importorg.apache.hadoop.hbase.client.*;importorg.apache.hadoop.hbase.io.*;publicclassWjwWriteMapperextendsMapper<LongWritable,Text,ImmutableBytesWritable,Put>{privatebyte[] family=null;privatebyte[] qualifier=null;privatebyte[] val=null;privateString rk=null;privatelong ts=System.currentTimeMillis();protectedvoidmap(LongWritable key,Text value,Context context)throwsInterruptedException,IOException{try{String line=value.toString();String[] arr=line.split("\t",-1);if(arr.length==2){
rk=arr[0];String[] vals=arr[1].split("\u0001",-1);if(vals.length==4){
family=vals[0].getBytes();
qualifier=vals[1].getBytes();
val=vals[2].getBytes();
ts=Long.parseLong(vals[3]);Put put=newPut(rk.getBytes(),ts);
put.add(family,qualifier,val);
context.write(newImmutableBytesWritable(rk.getBytes()), put);}}}catch(Exception e){
e.printStackTrace();}}}
WjwWriteMain:
packagehbase;importorg.apache.hadoop.util.*;importorg.apache.hadoop.mapreduce.*;importorg.apache.hadoop.mapreduce.lib.input.*;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.conf.Configured;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.hbase.*;importorg.apache.hadoop.hbase.io.*;importorg.apache.hadoop.hbase.mapreduce.*;importjava.io.IOException;importorg.apache.commons.logging.*;publicclassWjwWriteMainextendsConfiguredimplementsTool{staticfinalLogLOG=LogFactory.getLog(WjwWriteMain.class);publicintrun(String[] args)throwsException{if(args.length!=2){LOG.info("2 parameters needed!");}String input="hdfs://master:9000/tmp/tb_wjw/part-m-00000";String table="tb_wjw01";Configuration conf=HBaseConfiguration.create();Job job=newJob(conf,"Input from file "+input+" into table "+table);
job.setJarByClass(WjwWriteMain.class);
job.setMapperClass(WjwWriteMapper.class);
job.setJarByClass(WjwWriteMain.class);
job.setMapperClass(WjwWriteMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Waitable.class);
job.setNumReduceTasks(0);FileInputFormat.addInputPath(job,newPath(input));return job.waitForCompletion(true)?0:1;}publicstaticvoidmain(String[] args)throwsIOException{Configuration conf=newConfiguration();String[] otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();try{System.out.println(ToolRunner.run(conf,newWjwWriteMain(),otherArgs));}catch(Exception e){
e.printStackTrace();}}}
运行结果
创建一个空表tb_wjw01,用于等会将tb_wjw的数据导入tb_wjw01
配置yarn,并运行map程序
查看hadoop里的表tb_wjw
将hadoop里tb_wjw的数据导入hbase里的tb_wjw01里面
实验任务3:在实验任务1和实验任务2的基础上,通过HBase编程,实现创建HBase表,修改HBase表(包括增加列族和删除列族),向HBase表中写入数据,读取HBase表中数据,查看HBase数据库中所有表和表结构功能,建议在一个类中定义多个方法实现上述功能,并进行验证。表名和数据自拟。
主要实现步骤和运行效果图:
完整程序
packagehbase;importjava.io.IOException;importjava.util.ArrayList;importjava.util.List;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.*;importorg.apache.hadoop.hbase.client.*;importorg.apache.hadoop.hbase.util.Bytes;publicclassWjwHbase{privatestaticConfiguration conf =HBaseConfiguration.create();publicstaticvoidcreateTable(String tableName,String[] families)throwsIOException{Connection conn =ConnectionFactory.createConnection(conf);Admin admin = conn.getAdmin();if(admin.tableExists(TableName.valueOf(tableName))){System.out.println("Table already exists!");}else{HTableDescriptor tableDesc =newHTableDescriptor(TableName.valueOf(tableName));for(String family : families){
tableDesc.addFamily(newHColumnDescriptor(family));}
admin.createTable(tableDesc);System.out.println("Table created successfully!");}
admin.close();
conn.close();}publicstaticvoidaddRecord(String tableName,String rowKey,String family,String qualifier,String value)throwsIOException{Connection conn =ConnectionFactory.createConnection(conf);Table table = conn.getTable(TableName.valueOf(tableName));Put put =newPut(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(family),Bytes.toBytes(qualifier),Bytes.toBytes(value));
table.put(put);System.out.println("Record added successfully!");
table.close();
conn.close();}publicstaticvoiddeleteRecord(String tableName,String rowKey,String family,String qualifier)throwsIOException{Connection conn =ConnectionFactory.createConnection(conf);Table table = conn.getTable(TableName.valueOf(tableName));Delete delete =newDelete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(family),Bytes.toBytes(qualifier));
table.delete(delete);System.out.println("Record deleted successfully!");
table.close();
conn.close();}publicstaticvoiddeleteTable(String tableName)throwsIOException{Connection conn =ConnectionFactory.createConnection(conf);Admin admin = conn.getAdmin();if(admin.tableExists(TableName.valueOf(tableName))){
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));System.out.println("Table deleted successfully!");}else{System.out.println("Table does not exist!");}
admin.close();
conn.close();}publicstaticvoidaddColumnFamily(String tableName,String columnFamily)throwsIOException{Connection conn =ConnectionFactory.createConnection(conf);Admin admin = conn.getAdmin();if(admin.tableExists(TableName.valueOf(tableName))){HColumnDescriptor columnDesc =newHColumnDescriptor(columnFamily);
admin.addColumn(TableName.valueOf(tableName), columnDesc);System.out.println("Column family added successfully!");}else{System.out.println("Table does not exist!");}
admin.close();
conn.close();}publicstaticvoiddeleteColumnFamily(String tableName,String columnFamily)throwsIOException{Connection conn =ConnectionFactory.createConnection(conf);Admin admin = conn.getAdmin();if(admin.tableExists(TableName.valueOf(tableName))){
admin.deleteColumn(TableName.valueOf(tableName),Bytes.toBytes(columnFamily));System.out.println("Column family deleted successfully!");}else{System.out.println("Table does not exist!");}
admin.close();
conn.close();}publicstaticvoidgetRecord(String tableName,String rowKey,String family,String qualifier)throwsIOException{Connection conn =ConnectionFactory.createConnection(conf);Table table = conn.getTable(TableName.valueOf(tableName));Get get =newGet(Bytes.toBytes(rowKey));Result result = table.get(get);byte[] value = result.getValue(Bytes.toBytes(family),Bytes.toBytes(qualifier));System.out.println("Result: "+Bytes.toString(value));
table.close();
conn.close();}publicstaticvoidscanTable(String tableName)throwsIOException{Connection conn =ConnectionFactory.createConnection(conf);Table table = conn.getTable(TableName.valueOf(tableName));Scan scan =newScan();ResultScanner scanner = table.getScanner(scan);for(Result result : scanner){System.out.println("Result: "+ result);}
table.close();
conn.close();}publicstaticvoidlistTables()throwsIOException{Connection conn =ConnectionFactory.createConnection(conf);Admin admin = conn.getAdmin();HTableDescriptor[] tableDescs = admin.listTables();List<String> tableNames =newArrayList<String>();for(HTableDescriptor tableDesc : tableDescs){
tableNames.add(tableDesc.getNameAsString());}System.out.println("Tables: "+ tableNames);
admin.close();
conn.close();}publicstaticvoiddescribeTable(String tableName)throwsIOException{Connection conn =ConnectionFactory.createConnection(conf);Admin admin = conn.getAdmin();HTableDescriptor tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName));System.out.println("Table structure: "+ tableDesc);
admin.close();
conn.close();}publicstaticvoidmain(String[] args)throwsIOException{String tableName ="wjwtest";String rowKey ="row1";String family ="cf1";String qualifier ="q1";String value ="this is wjw!";String columnFamily ="cf2";String[] families ={family};createTable(tableName, families);addRecord(tableName, rowKey, family, qualifier, value);getRecord(tableName, rowKey, family, qualifier);scanTable(tableName);addColumnFamily(tableName, columnFamily);describeTable(tableName);deleteColumnFamily(tableName, columnFamily);deleteRecord(tableName, rowKey, family, qualifier);deleteTable(tableName);listTables();}}
运行结果
附:系列文章
实验文章目录直达链接实验01Hadoop安装部署https://want595.blog.csdn.net/article/details/132767284实验02HDFS常用shell命令https://want595.blog.csdn.net/article/details/132863345实验03Hadoop读取文件https://want595.blog.csdn.net/article/details/132912077实验04HDFS文件创建与写入https://want595.blog.csdn.net/article/details/133168180实验05HDFS目录与文件的创建删除与查询操作https://want595.blog.csdn.net/article/details/133168734实验06SequenceFile、元数据操作与MapReduce单词计数https://want595.blog.csdn.net/article/details/133926246实验07MapReduce编程:数据过滤保存、UID 去重https://want595.blog.csdn.net/article/details/133947981实验08MapReduce 编程:检索特定群体搜索记录和定义分片操作https://want595.blog.csdn.net/article/details/133948849实验09MapReduce 编程:join操作和聚合操作https://want595.blog.csdn.net/article/details/133949148实验10MapReduce编程:自定义分区和自定义计数器https://want595.blog.csdn.net/article/details/133949522
版权归原作者 Want595 所有, 如有侵权,请联系我们删除。