实现HBase表和RDB表的转化
一、引入
转化为HBase表的三大来源:RDB Table、Client API、Files
如何构造通用性的代码模板实现向HBase表的转换,是一个值得考虑的问题。这篇文章着重讲解RDB表向HBase表的转换。
首先,我们需要分别构造rdb和hbase的对象,根据批处理的思想,我们可以考虑批量将rdb中的数据导出,并且转化为
List<Put>
的格式,直接导入HBase表中,最后释放资源,伪代码模板如下:
rdb=...
hbase=...
rdb.init();
hbase.init();
while(rdb.hasNextBatch()){
List<Put> batch = rdb.nextBatch();
hbase.putBatch(batch);}
hbase.close();
rdb.close();
二、代码讲解
1. 目录结构
2. 具体实现
- transfer.properties
内含HBase和RDB转换所有配置信息的配置文件,因为该配置文件是在启动时就需要进行配置,因此我们需要按以下图片进行配置导入配置文件:
- 在Run/Debug Configurations中,新建一个Application
- 配置好主类
- 配置好配置文件的具体路径
- RDB 接口
publicinterfaceRDBextendsCom{// 要提升性能,需要使用批处理booleanhasNextBatch()throwsSQLException;// 是否存在下一个批次List<Put>nextBatch()throwsSQLException;// 一个put代表往一个hbase表的一行的一个列族的一个列插入一条数据,对Hbase来说,批次就是List<Put>}
- RDB 实现类
publicclassRDBImplimplementsRDB{privatestaticLogger logger =Logger.getLogger(RDBImpl.class);// JDBC 的基本元素:连接对象(装载[驱动]、[URL]、[账号]、[密码])->执行对象(SQL语句)->结果集privateProperties config;/**
* 它们需要设置成全局变量的原因是它们需要共享
*/privateConnection con;privatePreparedStatement pst;privateResultSet rst;// 定义每个批次处理的记录数的最大数量privateint batchSize;// hbase的行键对应rdb的列的列名privateString hbaseRowKeyRdbCol;privateMap<String,Map<String,String>> hbaseRdbColMapping;// RDB配置可以灵活地从外部传入(构造方法),从内部读取(config())publicRDBImpl(Properties config){this.config = config;}@OverridepublicPropertiesconfig(){return config;}/**
* 内部资源初始化
*/@Overridepublicvoidinit()throwsException{
con =getConnection();
logger.info("RDB 创建 [ 连接 ] 对象成功");
pst =getStatement(con);
logger.info("RDB 创建 [ 执行 ] 对象成功");
rst =getResult(pst);
logger.info("RDB 创建 [ 结果集 ] 成功");
batchSize =batchSize();
hbaseRdbColMapping =hbaseRdbColumnsMapping();}@Overridepublicvoidclose(){closeAll(rst,pst,con);}privateStringdriver(){returncheckAndGetConfig("rdb.driver");}privateStringurl(){returncheckAndGetConfig("rdb.url");}privateStringusername(){returncheckAndGetConfig("rdb.username");}privateStringpassword(){returncheckAndGetConfig("rdb.password");}privateStringsql(){returncheckAndGetConfig("rdb.sql");}privateintbatchSize(){returnInteger.parseInt(checkAndGetConfig("rdb.batchSize"));}// java.sql下的ConnectionprivateConnectiongetConnection()throwsClassNotFoundException,SQLException{// 装载驱动Class.forName(driver());// 获取并返回连接对象returnDriverManager.getConnection(url(),username(),password());}privatePreparedStatementgetStatement(Connection con)throwsSQLException{return con.prepareStatement(sql());}privateResultSetgetResult(PreparedStatement statement)throwsSQLException{return statement.executeQuery();}/**
* hbase 列族和列与rdb中列的映射关系
* hbase列族 hbase列 rdb列
* @return Map<String,Map<String,String>>
*/privateMap<String,Map<String,String>>hbaseRdbColumnsMapping(){String mapping =checkAndGetConfig("rdb.hbase.columns.mapping");Map<String,Map<String,String>> map =newHashMap<>();String[] pss = mapping.split(",");for(String ps : pss){String[] pp = ps.split("->");String[] p = pp[0].split(":");String rdbCol = pp[1],hbaseColFamily,hbaseColName;if(p.length==1){
hbaseRowKeyRdbCol = pp[1];}else{
hbaseColFamily = p[0];
hbaseColName = p[1];if(!map.containsKey(hbaseColFamily)){
map.put(hbaseColFamily,newHashMap<>());}
map.get(hbaseColFamily).put(hbaseColName,rdbCol);}}return map;}/**
* 将RDB的列转化为字节数组(需要确定列的数据类型)
* @param rdbColumn
* @return
* @throws SQLException
*/privatebyte[]toBytesFromRdb(String rdbColumn)throwsSQLException{Object obj = rst.getObject(rdbColumn);if(obj instanceofString){returnBytes.toBytes((String)obj);}elseif(obj instanceofFloat){returnBytes.toBytes(((Float)obj).floatValue());}elseif(obj instanceofDouble){returnBytes.toBytes(((Double)obj).doubleValue());}elseif(obj instanceofBigDecimal){returnBytes.toBytes((BigDecimal)obj);}elseif(obj instanceofShort){returnBytes.toBytes(((Short) obj).shortValue());}elseif(obj instanceofInteger){returnBytes.toBytes(((Integer)obj).intValue());}elseif(obj instanceofBoolean){returnBytes.toBytes((Boolean)((Boolean) obj).booleanValue());}else{thrownewSQLException("HBase不支持转化为字节数组的类型:"+obj.getClass().getName());}}/**
* 将HBase的列名或列族名转化为字节数组
* @param name
* @return
*/privatebyte[]toBytes(String name){returnBytes.toBytes(name);}// 最后一个批次的数据最少有一条@OverridepublicbooleanhasNextBatch()throwsSQLException{return rst.next();}@OverridepublicList<Put>nextBatch()throwsSQLException{// 预先分配容量List<Put> list =newArrayList<>(batchSize);int count =0;do{/**
* 如何将一行解析为多个put(结合配置文件)
* 对每条数据,创建一个带行键的put,向put中放入HBase列族名,HBase列名,RDB列名
*/Put put =newPut(toBytesFromRdb(hbaseRowKeyRdbCol));for(Map.Entry<String,Map<String,String>> e : hbaseRdbColMapping.entrySet()){String columnFamily = e.getKey();for(Map.Entry<String,String> s : e.getValue().entrySet()){String hbaseColumn = s.getKey();String rdbColumn = s.getValue();// 需要将内容转变为字节数组传入方法
put.addColumn(toBytes(columnFamily),toBytes(hbaseColumn),toBytesFromRdb(rdbColumn));}}
list.add(put);}while(++count<batchSize && rst.next());return list;}}
如何理解一行转化为多个put?
结果集的实质?
rst.next() 的两个作用
rst.next();// 1.判定是否存在下一个有效行// 2.若存在下一个有效行,则指向该有效行
a. 只通过config作为参数构造rdb
b. 以JDBC为核心,需要连接对象(驱动,URL,账号,密码)=>执行对象(SQL)=>结果集,这些都需要被设计为全局变量(因为需要被共享)
c. 既实现了RDB接口,还实现了RDB的继承接口Com中的
init()、close()
进行资源的初始化和释放,
checkAndGetConfig()
根据传入的配置文件获取配置信息并且赋值给全局变量。
d. 重点:我们还需要对RDB和HBase的映射关系进行解析,最终解析出RDB列名,HBase列族名,HBase列名,具体如何解析参考配置文件
transfer.properties
,并将解析出来的名字构造成一个Put对象,由于构造Put对象只能放字节数组,所以需要转化为字节数组的方法,又因为解析RDB的列名需要考虑列的数据类型,而解析HBase的列族或列名不需要考虑,因此需要有两个转换方法==ToBytesFromRDB()和ToBytes()==分别实现两种情况的字节数组转化。
- HBase接口
publicinterfaceHBaseextendsCom{// RDBImpl的nextBatch()返回的就是List<Put>,直接放入HBase表即可。voidputBatch(List<Put> batch)throwsIOException;}
- HBase实现类
public class HBaseImpl implements HBase {
private static Logger loggerHBase = Logger.getLogger(HBaseImpl.class);
private Properties config;
private Connection con;
private Table hbaseTable;
public HBaseImpl(Properties config){
this.config = config;}
@Override
public Properties config(){return config;}
@Override
public void init() throws Exception {
con = getCon();
loggerHBase.info("HBase 创建 [ 连接 ] 成功");
hbaseTable = checkAndGetTable(con);
loggerHBase.info("HBase 创建 [ 数据表 ] 成功");}
@Override
public void close(){
closeAll(hbaseTable,con);}
private String tableName(){return checkAndGetConfig("hbase.table.name");}
private String zkUrl(){return checkAndGetConfig("hbase.zk");}
private Connection getCon() throws IOException {
// hadoop.conf的configuration
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum",zkUrl());return ConnectionFactory.createConnection(config);}
private Table checkAndGetTable(Connection con) throws IOException {
/**
* Admin : HBase DDL
*/
Admin admin = con.getAdmin();
TableName tableName = TableName.valueOf(tableName());
// 通过tableName判定表是否存在
if(!admin.tableExists(tableName)){
throw new IOException("HBase表不存在异常:"+tableName);}
/**
* Table : HBase DML & DQL
*/
// 传入的参数可以是TableName tableName,ExecutorService pool(表操作可以并发)return con.getTable(tableName);}
@Override
public void putBatch(List<Put> batch) throws IOException{
hbaseTable.put(batch);}}
HBase的实现类和RDB的实现类也非常类似:
先重写HBase接口中的方法和Com接口中的方法,发现往里放数据需要构造一个Table对象,而Table对象的构建需要一个连接对象和TableName,因此在构造了两个方法tableName()获取配置信息中的TableName(注意:此时的TableName是字符串类型),zkUrl()获取zk.url作为配置构造连接对象。
- Com接口
publicinterfaceCom{Logger logger =Logger.getLogger(Com.class);// 获取配置对象Propertiesconfig();// 初始化资源voidinit()throwsException;// 释放资源voidclose();defaultStringcheckAndGetConfig(String key){if(!config().containsKey(key)){// 因为该方法可能被用于HBase和RDBthrownewRuntimeException("配置项缺失异常:"+key);}String item =config().getProperty(key);
logger.info(String.format("获取配置项 %s : %s",key,item));return item;}defaultvoidcloseAll(AutoCloseable...acs){for(AutoCloseable ac : acs){if(Objects.nonNull(ac)){try{
ac.close();
logger.info(String.format("释放 %s 成功",ac.getClass().getName()));}catch(Exception e){
logger.error("释放资源异常:"+e);}}}}}
在Com接口中,设计了一些普通方法
config()
实现配置的导出,
init()、close()
资源的初始化和关闭;同样还设计了一些无需实现的默认方法便于实现
init()和close()
方法。这些方法适用于RDB和HBase的实现类。
- RDBToHBase接口
publicinterfaceRDBToHBase{// 创建一个RDB对象voidsetRDB(RDB rdb);// 创建一个HBase对象voidsetHBase(HBase hbase);// 进行数据的传输voidstartTransfer();}
- RDBToHBase实现类
publicclassRDBToHBaseImplimplementsRDBToHBase{// 日志显示privatestaticLogger loggerRH =Logger.getLogger(RDBToHBaseImpl.class);privateRDB rdb;privateHBase hbase;@OverridepublicvoidsetRDB(RDB rdb){this.rdb = rdb;}@OverridepublicvoidsetHBase(HBase hbase){this.hbase = hbase;}@OverridepublicvoidstartTransfer(){try{
rdb.init();
loggerRH.info("RDB 初始化成功");
hbase.init();
loggerRH.info("HBase 初始化成功");
loggerRH.info("数据从 RDB 迁移至 HBase 开始...");int count =0;while(rdb.hasNextBatch()){finalList<Put> batch = rdb.nextBatch();
hbase.putBatch(batch);
loggerRH.info(String.format("第 %d 批:%d 条数据插入成功",++count,batch.size()));}
loggerRH.info("数据从 RDB 迁移至 HBase 结束...");}catch(Exception e){
loggerRH.error("将 RDB 数据批量迁移至 HBase 异常",e);}finally{
hbase.close();
rdb.close();}}}
- AppRDBToHBase 实现类
publicclassAppRDBToHBase{privatestaticLogger logger =Logger.getLogger(AppRDBToHBase.class);privatestaticvoidstart(String[] args){try{if(Objects.isNull(args)|| args.length ==0||Objects.isNull(args[0])){thrownewNullPointerException("配置文件路径空指针异常");}finalStringPATH= args[0];finalFile file =newFile(PATH);if(!file.exists()|| file.length()==0||!file.canRead()){thrownewIOException("配置文件不存在、不可读、空白");}Properties config =newProperties();// final String path = args[0];
config.load(newFileReader(file));RDB rdb =newRDBImpl(config);HBase hBase =newHBaseImpl(config);RDBToHBase rdbToHBase =newRDBToHBaseImpl();
rdbToHBase.setRDB(rdb);
rdbToHBase.setHBase(hBase);
rdbToHBase.startTransfer();}catch(Exception e){
logger.error("配置异常",e);}}publicstaticvoidmain(String[] args ){start(args);}}
对于传入的配置文件路径,既要检查路径本身,也要检查路径代表的文件本身。
通过流的方式将文件进行配置,并且利用该配置构造RDB和HBase并进行数据的传输
其他:日志文件系统Log.4j的应用
- 准备:需要在Resources模块下配置log4j.properties文件
- 注意: - 日志文件信息的输出方式有三种
logger.error()、logger.info()、logger.warn()
,除了对错误信息进行输出之外,也要习惯于补充正常信息的输出,以增强代码的可读性。- log.4j除了在控制台打印日志信息之外,还能在磁盘下的日志文件中打印日志信息,因此在导入log4j.properties文件之后需要修改日志文件的路径。- 对于不同类或接口下的logger,需要注意进行名字的区分。
版权归原作者 Byyyi耀 所有, 如有侵权,请联系我们删除。