1.概述
2.TableInputFormat
TableInputFormat是Apache HBase中的一个重要的类,它允许MapReduce作业直接从HBase表中读取数据作为其输入。这使得HBase可以作为一个数据源,供MapReduce作业处理其存储的大规模数据集,而无需将数据导出到HDFS或其他文件系统。这样不仅提高了数据处理的效率,还减少了数据传输的延迟和潜在的错误。
TableInputFormat的作用
TableInputFormat的主要作用是将HBase表中的数据转换成适合MapReduce作业处理的形式。它将HBase中的行映射为MapReduce作业中的键值对(<k, v>),其中k通常是一个ImmutableBytesWritable对象,代表HBase行的主键(RowKey),而v是一个Result对象,包含了该行的所有列族、列和版本的信息。
TableInputFormat的配置参数
TableInputFormat可以通过一系列的配置参数来定制扫描行为,这些参数可以通过Job的Configuration对象设置。以下是主要的配置参数及其用途:
- hbase.mapreduce.inputtable:指定要扫描的HBase表的名称。
- hbase.mapreduce.scan:可以通过
- TableMapReduceUtil.convertScanToString(Scan scan)生成的字符串来指定一个Scan对象,从而控制扫描的具体行为。但是由于该方法不公开,一般会通过其他参数间接控制扫描行为。
- hbase.mapreduce.scan.row.start 和 hbase.mapreduce.scan.row.stop:分别定义扫描的起始RowKey和结束RowKey。
- hbase.mapreduce.scan.column.family:指定要扫描的列族。
- hbase.mapreduce.scan.columns:指定要扫描的列,多个列之间用空格分隔。
- hbase.mapreduce.scan.timestamp:如果设置,将只扫描指定时间戳的数据。
- hbase.mapreduce.scan.timerange.start 和 hbase.mapreduce.scan.timerange.end:分别定义时间戳范围的开始和结束,以限制扫描的时间范围。
- hbase.mapreduce.scan.maxversions:定义扫描结果中每列的最大版本数。
- hbase.mapreduce.scan.cacheblocks:如果设置为true,则在扫描过程中缓存数据块,以提高读取速度。
- hbase.mapreduce.scan.cachedrows:定义每次读取的最多行数,用于优化读取性能。
- hbase.mapreduce.scan.batchsize:定义每次读取的最多值的数量,这会影响内存使用和处理速度。
结合Map函数的说明
在MapReduce作业中,map()函数是处理TableInputFormat输出的核心部分。函数签名如下:
Java
publicvoidmap(ImmutableBytesWritable row,Result value,Context context)
在这个函数中:
- row:类型为ImmutableBytesWritable,代表当前处理行的RowKey。
- value:类型为Result,包含了当前行的所有列族、列和版本的数据。
- context:类型为Context,用于向Reducer发送键值对或写入日志等。
在map()函数内部,你可以根据value中的数据进行各种处理,如过滤、聚合等,然后通过context.write()将处理后的结果发送给Reducer。
3.TableOutputFormat
TableOutputFormat是Apache HBase提供的一个用于将MapReduce作业的输出直接写入HBase表的类。与TableInputFormat相对应,TableOutputFormat使得MapReduce作业能够将处理后的结果直接存储回HBase,而无需先写入HDFS再导入HBase,从而简化了数据流并提高了效率。
TableOutputFormat的作用
TableOutputFormat的主要功能是在MapReduce作业完成时,将MapReduce作业的输出数据写回到HBase表中。它接收的输出数据类型是<KEY, VALUE>对,其中VALUE必须是Put或Delete对象。Put对象用于插入或更新HBase表中的行,而Delete对象用于删除HBase表中的行。
TableOutputFormat的配置参数
TableOutputFormat需要通过Job的Configuration对象进行配置,主要的配置参数及其用途包括:
- hbase.mapred.outputtable:指定写入数据的目的HBase表的名称。
- hbase.mapred.output.quorum:指定目标HBase表所在的HBase集群的Zookeeper配置信息,格式为:“zookeeper所在机器名(多个实例以逗号分隔):端口号:HBase根节点名”。例如:“zookeeper1.example.com,zookeeper2.example.com:2181:/hbase”。
- hbase.mapred.output.quorum.port:Zookeeper服务器的端口号,虽然可以通过hbase.mapred.output.quorum中的格式指定,但有时也可以单独配置这个参数。
- hbase.mapred.output.rs.class 和 hbase.mapred.output.rs.impl:这两个参数用于指定RegionServer的实现类和服务实现,但在实际应用中很少被直接配置,因为默认的实现通常足够满足需求。
使用场景
TableOutputFormat适用于以下场景:
- 数据更新:当MapReduce作业的结果是对现有HBase表的更新时,可以直接使用TableOutputFormat将更新写回表中。
- 数据加载:当需要将大量数据从HDFS或其他数据源批量导入HBase时,可以使用TableOutputFormat将数据直接写入HBase,避免了先导入临时表再进行数据迁移的复杂过程。
- 数据分析:在进行数据分析或数据清洗后,可以直接将处理后的结果写回HBase表,以供后续分析或应用使用。
总之,TableOutputFormat提供了将MapReduce作业的输出直接写入HBase的能力,极大地简化了数据处理流程,并提高了数据处理的效率和灵活性。
3.案例
TableInputFormat和TableOutputFormat的案例
importjava.io.IOException;importjava.util.List;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.Cell;importorg.apache.hadoop.hbase.client.Put;importorg.apache.hadoop.hbase.client.Result;importorg.apache.hadoop.hbase.client.Scan;importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;importorg.apache.hadoop.hbase.mapreduce.TableInputFormat;importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;importorg.apache.hadoop.hbase.mapreduce.TableOutputFormat;importorg.apache.hadoop.hbase.util.Bytes;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;publicclassHBaseTableIOExample{publicstaticclassTestTableMapperextendsMapper<ImmutableBytesWritable,Result,ImmutableBytesWritable,ImmutableBytesWritable>{@Overrideprotectedvoidmap(ImmutableBytesWritable key,Result value,Context context)throwsIOException,InterruptedException{List<Cell> cells = value.listCells();for(Cell cell : cells){// Assuming the family is 'cf' and qualifier is 'qual'if(Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),"cf".getBytes())){if(Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),"qual".getBytes())){
context.write(key,newImmutableBytesWritable(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));}}}}}publicstaticclassTestTableReducerextendsReducer<ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,NullWritable>{@Overrideprotectedvoidreduce(ImmutableBytesWritable key,Iterable<ImmutableBytesWritable> values,Context context)throwsIOException,InterruptedException{byte[] rowKey = key.get();Put put =newPut(rowKey);for(ImmutableBytesWritable value : values){// Assuming we want to write back to column 'cf:qual'
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("qual"), value.get());}
context.write(newImmutableBytesWritable(rowKey),NullWritable.get());
context.getCounter("Custom","ProcessedRows").increment(1);}}publicstaticvoidmain(String[] args)throwsException{Configuration conf =HBaseConfiguration.create();Job job =Job.getInstance(conf,"HBaseTableIOExample");
job.setJarByClass(HBaseTableIOExample.class);// Set up input format
job.setInputFormatClass(TableInputFormat.class);Scan scan =newScan();
scan.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("qual"));TableMapReduceUtil.initTableMapperJob("test_input", scan,TestTableMapper.class,ImmutableBytesWritable.class,ImmutableBytesWritable.class, job);// Set up output format
job.setOutputFormatClass(TableOutputFormat.class);TableMapReduceUtil.initTableReducerJob("test_output",TestTableReducer.class, job);
job.setMapperClass(TestTableMapper.class);
job.setReducerClass(TestTableReducer.class);boolean success = job.waitForCompletion(true);System.exit(success ?0:1);}}
重要点:
- Mapper: TestTableMapper从输入表test_input读取数据,提取特定的列族和列的值,并将其传递给Reducer。
- Reducer: TestTableReducer接收来自Mapper的输出,处理数据(本例中为简单传递),并将其写入输出表test_output。
- Job Setup: 主函数main设置了MapReduce作业的输入和输出格式,以及Mapper和Reducer类,并使用TableInputFormat和TableOutputFormat初始化作业。
确保在运行此示例之前,你已经在HBase中创建了test_input和test_output表,并且test_input表中包含了适当的数据。此外,你可能需要根据你的环境调整HBase和Hadoop的配置。
版权归原作者 九师兄 所有, 如有侵权,请联系我们删除。