Apache Hbase 系列文章
1、hbase-2.1.0介绍及分布式集群部署、HA集群部署、验证、硬件配置推荐
2、hbase-2.1.0 shell基本操作详解
3、HBase的java API基本操作(创建、删除表以及对数据的添加、删除、查询以及多条件查询)
4、HBase使用(namespace、数据分区、rowkey设计、原生api访问hbase)
5、Apache Phoenix(5.0.0-5.1.2) 介绍及部署、使用(基本使用、综合使用、二级索引示例)、数据分区示例
6、Base批量装载——Bulk load(示例一:基本使用示例)
7、Base批量装载-Bulk load(示例二:写千万级数据-mysql数据以ORCFile写入hdfs,然后导入hbase)
8、HBase批量装载-Bulk load(示例三:写千万级数据-mysql数据直接写成Hbase需要的数据,然后导入hbase)
文章目录
本文主要介绍了通过java api操作hbase的基本示例。
本文依赖hbase环境可用。
本分主要分为2个部分,即maven依赖和源码示例。
一、maven依赖
1、pom.xml
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.1.0</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency>
2、复制HBase和Hadoop配置文件
将以下二个配置文件复制到resource目录中
hbase-site.xml
core-site.xml
注意:在哪个环境操作就使用哪个环境的配置文件,或者开发测试时直接在代码中设置zookeeper的地址
二、源码
要操作Hbase也需要建立Hbase的连接。此处我们仍然使用TestNG来编写测试。使用@BeforeTest初始化HBase连接,创建admin对象、@AfterTest关闭连接。
1、创建/删除表
1)、实现步骤
- 使用HbaseConfiguration.create()创建Hbase配置
- 使用ConnectionFactory.createConnection()创建Hbase连接
- 要创建表,需要基于Hbase连接获取admin管理对象
- 使用admin.close、connection.close关闭连接
2)、实现
- 以下是将配置文件放在java工程的resource目录中示例
importstaticorg.junit.Assert.*;importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.HBaseConfiguration;importorg.apache.hadoop.hbase.TableName;importorg.apache.hadoop.hbase.client.Admin;importorg.apache.hadoop.hbase.client.ColumnFamilyDescriptor;importorg.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;importorg.apache.hadoop.hbase.client.Connection;importorg.apache.hadoop.hbase.client.ConnectionFactory;importorg.apache.hadoop.hbase.client.TableDescriptor;importorg.apache.hadoop.hbase.client.TableDescriptorBuilder;importorg.apache.hadoop.hbase.util.Bytes;importorg.junit.After;importorg.junit.Before;importorg.junit.Test;/**
* 创建和删除表操作
*
* @author alanchan
*
*/publicclassAdminTest{privateConfiguration configuration;privateConnection connection;privateAdmin admin;privateString table_Name ="TEST";@BeforepublicvoidbeforeTest()throwsIOException{
configuration =HBaseConfiguration.create();
connection =ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();}@TestpublicvoidcreateTableTest()throwsIOException{TableName tableName =TableName.valueOf(table_Name);// 1. 判断表是否存在if(admin.tableExists(tableName)){// a) 存在,则退出return;}// 构建表// 2. 使用TableDescriptorBuilder.newBuilder构建表描述构建器// TableDescriptor: 表描述器,描述这个表有几个列簇、其他的属性都是在这里可以配置TableDescriptorBuilder tableDescriptorBuilder =TableDescriptorBuilder.newBuilder(tableName);// 3. 使用ColumnFamilyDescriptorBuilder.newBuilder构建列簇描述构建器// 创建列簇也需要有列簇的描述器,需要用一个构建起来构建ColumnFamilyDescriptor// 经常会使用到一个工具类:Bytes(hbase包下的Bytes工具类)// 这个工具类可以将字符串、long、double类型转换成byte[]数组// 也可以将byte[]数组转换为指定类型ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("C1"));// 4. 构建列簇描述,构建表描述ColumnFamilyDescriptor cfDes = columnFamilyDescriptorBuilder.build();// 建立表和列簇的关联
tableDescriptorBuilder.setColumnFamily(cfDes);TableDescriptor tableDescriptor = tableDescriptorBuilder.build();// 5. 创建表
admin.createTable(tableDescriptor);assertTrue("表创建成功", admin.tableExists(tableName));}@TestpublicvoiddeleteTableTest()throwsIOException{TableName tableName =TableName.valueOf(table_Name);// 1. 判断表是否存在if(admin.tableExists(tableName)){// 2.如果存在,则禁用表
admin.disableTable(tableName);// 3.再删除表
admin.deleteTable(tableName);}assertFalse("表删除成功", admin.tableExists(tableName));}@AfterpublicvoidafterTest()throwsIOException{
admin.close();
connection.close();}}
- 以下是配置文件没有放在java工程的resource目录下示例
importstaticorg.junit.Assert.assertFalse;importstaticorg.junit.Assert.assertTrue;importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.HBaseConfiguration;importorg.apache.hadoop.hbase.TableName;importorg.apache.hadoop.hbase.client.Admin;importorg.apache.hadoop.hbase.client.ColumnFamilyDescriptor;importorg.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;importorg.apache.hadoop.hbase.client.Connection;importorg.apache.hadoop.hbase.client.ConnectionFactory;importorg.apache.hadoop.hbase.client.TableDescriptor;importorg.apache.hadoop.hbase.client.TableDescriptorBuilder;importorg.apache.hadoop.hbase.util.Bytes;importorg.junit.After;importorg.junit.Before;importorg.junit.Test;/**
* 该示例是基于core-site.xml和hbase-site.xml文件没有的情况下,直接在代码中配置zookeeper信息
*
* @author alanchan
*
*/publicclassAdminTestNoXmlConf{privateConfiguration configuration;privateConnection connection;privateAdmin admin;privateString table_Name ="TEST";@BeforepublicvoidbeforeTest()throwsIOException{
configuration =HBaseConfiguration.create();// 创建配置项,设置zookeeper的参数
configuration.set("hbase.zookeeper.quorum","server1,server2,server3");
configuration.set("hbase.zookeeper.property.clientPort","2181");
connection =ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();}@TestpublicvoidcreateTableTest()throwsIOException{TableName tableName =TableName.valueOf(table_Name);// 1. 判断表是否存在if(admin.tableExists(tableName)){// a) 存在,则退出return;}// 构建表// 2. 使用TableDescriptorBuilder.newBuilder构建表描述构建器// TableDescriptor: 表描述器,描述这个表有几个列簇、其他的属性都是在这里可以配置TableDescriptorBuilder tableDescriptorBuilder =TableDescriptorBuilder.newBuilder(tableName);// 3. 使用ColumnFamilyDescriptorBuilder.newBuilder构建列簇描述构建器// 创建列簇也需要有列簇的描述器,需要用一个构建起来构建ColumnFamilyDescriptor// 经常会使用到一个工具类:Bytes(hbase包下的Bytes工具类)// 这个工具类可以将字符串、long、double类型转换成byte[]数组// 也可以将byte[]数组转换为指定类型ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("C1"));// 4. 构建列簇描述,构建表描述ColumnFamilyDescriptor cfDes = columnFamilyDescriptorBuilder.build();// 建立表和列簇的关联
tableDescriptorBuilder.setColumnFamily(cfDes);TableDescriptor tableDescriptor = tableDescriptorBuilder.build();// 5. 创建表
admin.createTable(tableDescriptor);assertTrue("表创建成功", admin.tableExists(tableName));}@TestpublicvoiddeleteTableTest()throwsIOException{TableName tableName =TableName.valueOf(table_Name);// 1. 判断表是否存在if(admin.tableExists(tableName)){// 2.如果存在,则禁用表
admin.disableTable(tableName);// 3.再删除表
admin.deleteTable(tableName);}assertFalse("表删除成功", admin.tableExists(tableName));}@AfterpublicvoidafterTest()throwsIOException{
admin.close();
connection.close();}}
2、CRUD操作-put、get、delete、scan、filter实现示例
importjava.io.IOException;importjava.util.Iterator;importjava.util.List;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.Cell;importorg.apache.hadoop.hbase.CompareOperator;importorg.apache.hadoop.hbase.HBaseConfiguration;importorg.apache.hadoop.hbase.TableName;importorg.apache.hadoop.hbase.client.Connection;importorg.apache.hadoop.hbase.client.ConnectionFactory;importorg.apache.hadoop.hbase.client.Delete;importorg.apache.hadoop.hbase.client.Get;importorg.apache.hadoop.hbase.client.Put;importorg.apache.hadoop.hbase.client.Result;importorg.apache.hadoop.hbase.client.ResultScanner;importorg.apache.hadoop.hbase.client.Scan;importorg.apache.hadoop.hbase.client.Table;importorg.apache.hadoop.hbase.filter.BinaryComparator;importorg.apache.hadoop.hbase.filter.FilterList;importorg.apache.hadoop.hbase.filter.SingleColumnValueFilter;importorg.apache.hadoop.hbase.util.Bytes;importorg.junit.After;importorg.junit.Before;importorg.junit.Test;importlombok.extern.slf4j.Slf4j;/**
* 添加、查询和删除数据。
* 修改数据可以看作是重新Put添加数据。
*
* @author alanchan
*
*/@Slf4jpublicclassOperatorTest{// Connection是线程安全的privateConnection connection;privateTableName TABLE_NAME =TableName.valueOf("TEST");@BeforepublicvoidbeforeTest()throwsIOException{// 1. 使用HbaseConfiguration.create()创建Hbase配置Configuration configuration =HBaseConfiguration.create();// 2. 使用ConnectionFactory.createConnection()创建Hbase连接
connection =ConnectionFactory.createConnection(configuration);}@TestpublicvoidputTest()throwsIOException{// 1. 使用Hbase连接获取HtableTable table = connection.getTable(TABLE_NAME);// 2. 构建ROWKEY、列簇名、列名String rowkey ="4944191";String columnFamily ="C1";String columnName ="NAME";String columnNameADDRESS ="ADDRESS";String columnNameSEX ="SEX";String columnNamePAY_DATE ="PAY_DATE";String columnNameNUM_CURRENT ="NUM_CURRENT";String columnNameNUM_PREVIOUS ="NUM_PREVIOUS";String columnNameNUM_USAGE ="NUM_USAGE";String columnNameTOTAL_MONEY ="TOTAL_MONEY";String columnNameRECORD_DATE ="RECORD_DATE";String columnNameLATEST_DATE ="LATEST_DATE";// value:// 3. 构建Put对象(对应put命令)Put put =newPut(Bytes.toBytes(rowkey));// 4. 添加姓名列
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes("登卫红"));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnNameADDRESS),Bytes.toBytes("贵州省铜仁市德江县7单元267室"));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnNameSEX),Bytes.toBytes("男"));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnNamePAY_DATE),Bytes.toBytes("2020-05-10"));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnNameNUM_CURRENT),Bytes.toBytes("308.1"));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnNameNUM_PREVIOUS),Bytes.toBytes("283.1"));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnNameNUM_USAGE),Bytes.toBytes("25"));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnNameTOTAL_MONEY),Bytes.toBytes("150"));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnNameRECORD_DATE),Bytes.toBytes("2020-04-25"));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnNameLATEST_DATE),Bytes.toBytes("2020-06-09"));// 5. 使用Htable表对象执行put操作
table.put(put);// 6. 关闭Htable表对象// HTable是一个轻量级的对象,可以经常创建// HTable它是一个非线程安全的API
table.close();}@TestpublicvoidgetTest()throwsIOException{// 1. 获取HTableTable table = connection.getTable(TABLE_NAME);// 2. 使用rowkey构建Get对象Get get =newGet(Bytes.toBytes("4944191"));// 3. 执行get请求Result result = table.get(get);// 4. 获取所有单元格// 列出所有的单元格List<Cell> cellList = result.listCells();// 5. 打印rowkeybyte[] rowkey = result.getRow();
log.info("rowkey={}",Bytes.toString(rowkey));// 6. 迭代单元格列表for(Cell cell : cellList){// 将字节数组转换为字符串// 获取列簇的名称String cf =Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());// 获取列的名称String columnName =Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());// 获取值String value =Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
log.info("列簇:列->值={}:{}->{}", cf, columnName, value);}// 7. 关闭表
table.close();}@TestpublicvoiddeleteTest()throwsIOException{// 1. 获取HTable对象Table table = connection.getTable(TABLE_NAME);// 2. 根据rowkey构建delete对象Delete delete =newDelete(Bytes.toBytes("4944191"));// 3. 执行delete请求
table.delete(delete);// 4. 关闭表
table.close();}@AfterpublicvoidafterTest()throwsIOException{
connection.close();}// 查询2020年6月份所有用户的用水量//// hbase(main):117:0> get 'WATER_BILL','9951726', {FORMATTER => 'toString'}// COLUMN CELL // C1:ADDRESS timestamp=1588911489455, value=安徽省宣城市市辖区13单元187室 // C1:LATEST_DATE timestamp=1588911489455, value=2019-07-03 // C1:NAME timestamp=1588911489455, value=检喜云 // C1:NUM_CURRENT timestamp=1588911489455, value=@}�fffff // C1:NUM_PREVIOUS timestamp=1588911489455, value=@z陙��� // C1:NUM_USAGE timestamp=1588911489455, value=@9 // C1:PAY_DATE timestamp=1588911489455, value=2020-09-26 // C1:RECORD_DATE timestamp=1588911489455, value=2019-07-18 // C1:SEX timestamp=1588911489455, value=男 // C1:TOTAL_MONEY timestamp=1588911489455, value=@`� @TestpublicvoidscanFilterTest()throwsIOException{// 1. 获取表Table table = connection.getTable(TABLE_NAME);// 2. 构建scan请求对象Scan scan =newScan();// 3. 构建两个过滤器// a) 构建两个日期范围过滤器(注意此处请使用RECORD_DATE——抄表日期比较SingleColumnValueFilter startFilter =newSingleColumnValueFilter(Bytes.toBytes("C1"),Bytes.toBytes("RECORD_DATE"),CompareOperator.GREATER_OR_EQUAL,newBinaryComparator(Bytes.toBytes("2020-06-01")));SingleColumnValueFilter endFilter =newSingleColumnValueFilter(Bytes.toBytes("C1"),Bytes.toBytes("RECORD_DATE"),CompareOperator.LESS_OR_EQUAL,newBinaryComparator(Bytes.toBytes("2020-06-30")));// b) 构建过滤器列表FilterList filterList =newFilterList(FilterList.Operator.MUST_PASS_ALL, startFilter, endFilter);// 4. 执行scan扫描请求
scan.setFilter(filterList);ResultScanner resultScanner = table.getScanner(scan);Iterator<Result> iterator = resultScanner.iterator();// 5. 迭代打印resultwhile(iterator.hasNext()){Result result = iterator.next();// 列出所有的单元格List<Cell> cellList = result.listCells();// 5. 打印rowkeybyte[] rowkey = result.getRow();
log.info("rowkey={}",Bytes.toString(rowkey));// 6. 迭代单元格列表for(Cell cell : cellList){// 将字节数组转换为字符串// 获取列簇的名称String cf =Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());// 获取列的名称String columnName =Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),cell.getQualifierLength());String value ="";if(columnName.equals("NUM_CURRENT")|| columnName.equals("NUM_PREVIOUS")|| columnName.equals("NUM_USAGE")|| columnName.equals("TOTAL_MONEY")){
value =Bytes.toDouble(cell.getValueArray())+"";}else{// 获取值
value =Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());}
log.info("列簇:列->值={}:{}->{}", cf, columnName, value);}}// 7. 关闭ResultScanner
resultScanner.close();// 8. 关闭表
table.close();}}
以上,完成了通过java api简单操作hbase的示例,如果需要更多更深入的使用,则需要参看官方文档。
版权归原作者 一瓢一瓢的饮 alanchan 所有, 如有侵权,请联系我们删除。