HBase 提供的
TableScanMR
和
SnapshotScanMR
是两种用于在大数据集中进行扫描的 MapReduce 作业,网上也有很多介绍Spark如何实现TableScanMR,但是对SnapshotScanMR的实现方式很少几乎没找到可用的,接下来我们先说说这两者的一些共同点以及不同的实现原理,再介绍Spark是如何实现的。
相同点
- 目的: 两者都是为了高效地扫描 HBase 中大量的数据,适用于数据分析和批处理任务。
- 基于 MapReduce: 两个作业都基于 Hadoop 的 MapReduce 框架,实现分布式计算,能够利用集群资源进行并行处理。
- 支持高并发: 这两种扫描方式都能充分利用 HBase 的分布式特性,支持高并发的读操作,适合处理大规模数据集。
不同点
- 数据来源:- TableScanMR: 直接对 HBase 实时表进行访问,读取当前的数据状态,包含所有的行和最新的变化。- SnapshotScanMR: 读取特定快照的数据,快照创建时数据的一个静态视图,不会反映后续的更改。
- 一致性:- TableScanMR: 由于是实时访问,扫描期间可能受到其他写操作的影响,因此返回的数据可能存在一致性问题。- SnapshotScanMR: 提供了对某一时刻的数据访问,因此在扫描时数据是一致的,不受后续操作影响。
- 性能特性:- TableScanMR: 性能可能受到数据库实时写入和更新的影响,适用于实时数据分析,但在写入繁重的环境中性能可能波动。- SnapshotScanMR: 通常具有较稳定的性能,适合于对历史数据的分析和处理,性能受后续写入影响较小。
- 实现的原理:- TableScanMR: TableScan实际上还是一种并行的ScanApi,它离不开RegionServer,所有的Scan请求都会打到RegionServer上,所以如果RegionServer有压力时这种Scan效果并不理想。比如我们在Scan的同时,服务还在大量的compact 或者还有其他的bulkload的等操作影响RegionServer压力的时,Scan效果不是很理想。工作原理如下- SnapshotScanMR: 它的Scan方式是直接绕过了RegionServer,直接读取Hbase的HDFS文件,所以RegionServer的压力对他无影响。影响它的就是磁盘IO或者网络。所以当执行任务和Hbase数据是在同机房时SnapshotScan的速度是TableScan 的10~30倍,如果跨机房数据量大了可能还不如TableScan,所以得保证同机房。工作原理如下
- 使用场景:- TableScanMR: - 适用于需要访问最新数据的场景,如实时计算、在线分析等。- 适用于数据分布均匀的场景- SnapshotScanMR: - 适用于Bulkload后直接Scan的场景- 适用于缓解RegionServer压力大或者压力分布不均匀的场景(前提可以忽略实时的写入数据,毕竟使用的是快照)- 适用于大部分表数据分布均匀,但是部分大表数据分布不均匀,经常大量长时的compact影响单RegionServer压力,导致在Scan均匀表时出现部分长尾任务的情况(这种也是我所遇到的)。- 计算任务和数据在同机房的情况这点很重要,如果跨机房数据复制的RPC就会非常的耗时
- Spark实现方式:- TableScanMR:
val sc =new SparkContext(sparkConf)val scan =new Scan();
scan.addFamily(Bytes.toBytes("c"))
scan.setTimeStamp(timeStamp)
scan.withStartRow(Bytes.toBytes(startRow))
scan.withStopRow(Bytes.toBytes(stopRow))val hbaseConf =new Configuration()
hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
hbaseConf.set(TableInputFormat.INPUT_TABLE,"Hbase的表名")val dataRDD: RDD[(ImmutableBytesWritable, Result)]= sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])// 处理后续逻辑
- SnapshotScanMR:
//1.定义扫描范围val sc =new SparkContext(sparkConf)val scan =new Scan();
scan.addFamily(Bytes.toBytes("c"))
scan.setTimeStamp(timeStamp)
scan.withStartRow(Bytes.toBytes(startRow))
scan.withStopRow(Bytes.toBytes(stopRow))//2.创建快照val table = TableName.valueOf("Hbase的表名")val snapshotName ="快照名称"val tmpRestoreDir =new Path("临时目录")//创建快照(可以参考Hbase创建快照)
Admin.snapshot(snapshotName, table);//3.创建SnapshotScan并返回一个RDDval job: Job = Job.getInstance()
TableMapReduceUtil.initCredentials(job)//initTableSnapshotMapJob是重写了Hbase里的TableMapReduceUtil,具体修改看后面
TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, scan,classOf[IdentityTableMapper],null,null, job,true, tmpRestoreDir)//最后一个参数是HBaseContext 可以根据具体的实现传入val dataRDD: RDD[(ImmutableBytesWritable, Result)]=new NewHBaseRDD(sc,classOf[TableSnapshotInputFormat],classOf[ImmutableBytesWritable],classOf[Result], job.getConfiguration,HBaseContext)// 处理后续逻辑//4.删除快照和目录
TableMapReduceUtil的重写逻辑,我们知道这里的都是columns,不能直接接受Scan,所以要改造一下。
public static void initTableSnapshotMapJob(String snapshotName, Scan scan,
Class<?extendsTableMap> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, JobConf job,
boolean addDependencyJars, Path tmpRestoreDir)
throws IOException {
TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
initTableMapJob(snapshotName, scan, mapper, outputKeyClass,outputValueClass, job, addDependencyJars,TableSnapshotInputFormat.class);
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);}
public static void initTableMapJob(String table,Scan scan,
Class<?extendsTableMap> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
Class<?extendsInputFormat> inputFormat){
job.setInputFormat(inputFormat);
job.setMapOutputValueClass(outputValueClass);
job.setMapOutputKeyClass(outputKeyClass);
job.setMapperClass(mapper);
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
conf.set(TableInputFormat.INPUT_TABLE, table);
conf.set(TableInputFormat.SCAN, convertScanToString(scan));
conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());if(addDependencyJars){try{
addDependencyJars(job);}catch(IOException e){
e.printStackTrace();}}try{
initCredentials(job);}catch(IOException ioe){// just spit out the stack trace? really?
ioe.printStackTrace();}}
版权归原作者 龙大. 所有, 如有侵权,请联系我们删除。