这篇文章的源码比较多,需要的私信我
需求与数据集
某自来水公司,需要存储大量的缴费明细数据,以下截取了缴费明细的一部分内容:
用户id
姓名
地址
性别
缴费时间
表示数(本次)
表示数(上次)
用量(立方)
合计金额
查表日期
最迟缴费日期
4944191
张三
河北省石家庄市裕华区万达校区2-1-401
男
2022-3-27
308.1
283.1
25
150
2022-2-25
2022-4-24
因为缴费明细的数据记录非常庞大,该公司的信息部门决定使用hbase来存储这些数据,并且可以使用java来访问这些数据。
准备工作
idea****:社区版免费,企业版收费
eclipse****:开源免费
下载安装maven
下载安装idea
配置国内的maven镜像库
创建一个maven工程
修改pom文件,导入相关的依赖
复制Hadoop的配置文件core-site.xml和HBase的配置文件hbase-site.xml到resources目录中
先导出到本地计算机
再添加一个日志log4j的配置文件
创建包结构
创建hbase连接类及管理对象
测试
这个是一个小测试,来测试环境,下面那个才是真正的连接器
package cn.edu.hgu.dashuju19.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
/**
* @description: 创建hbase的连接及管理对象
* @author:
* @date: 2022-3-28
*/
public class HbaseConnect {
public static void main(String[] args){
//1、创建hbase的配置
Configuration configuration = new Configuration();
//2、创建hbase的连接
Connection connection;
{
try {
connection = ConnectionFactory.createConnection(configuration);
System.out.println(connection);
//3、创建admin对象
Admin admin = connection.getAdmin();
System.out.println(admin);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
创建
package cn.edu.hgu.dashuju19.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
/**
* Hbase_connect
* @author
* @date 2022-03-30 11:58
*/
public class HbaseConnect {
private static Connection connection;
private static Admin admin;
public static void main(String[] args) throws IOException {
// 1. 使用HbaseConfiguration.create()创建Hbase配置
Configuration configuration = HBaseConfiguration.create();
// 2. 使用ConnectionFactory.createConnection()创建Hbase连接
connection = ConnectionFactory.createConnection(configuration);
// 3. 要创建表,需要基于Hbase连接获取admin管理对象
// 要创建表、删除表需要和HMaster连接,所以需要有一个admin对象
admin = connection.getAdmin();
TableName tableName = TableName.valueOf("WATER_BILL2");
// 4. 判断表是否存在
if(admin.tableExists(tableName)) {
// a) 存在,则退出
return;
}
// 构建表
// 5. 使用TableDescriptorBuilder.newBuilder构建表描述构建器
// TableDescriptor: 表描述器,描述这个表有几个列蔟、其他的属性都是在这里可以配置
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
// 6. 使用ColumnFamilyDescriptorBuilder.newBuilder构建列蔟描述构建器
// 创建列蔟也需要有列蔟的描述器,需要用一个构建起来构建ColumnFamilyDescriptor
// 经常会使用到一个工具类:Bytes(hbase包下的Bytes工具类)
// 这个工具类可以将字符串、long、double类型转换成byte[]数组
// 也可以将byte[]数组转换为指定类型
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("C1"));
// 7. 构建列蔟描述,构建表描述
ColumnFamilyDescriptor cfDes = columnFamilyDescriptorBuilder.build();
// 建立表和列蔟的关联
tableDescriptorBuilder.setColumnFamily(cfDes);
TableDescriptor tableDescriptor = tableDescriptorBuilder.build();
// 8. 创建表
admin.createTable(tableDescriptor);
// 9. 使用admin.close、connection.close关闭连接
admin.close();
connection.close();
}
}
这下准备工作就做好了
接下来就是八个案例了
还是那句话,需要源码包的私信我,当然这里面也有源码可以复制
案例一到案例八的源码
这几个案例放在一个java文件里了
源码放在这里
大家在做案例的时候需要根据介绍来打开或者取消注释
package cn.edu.hgu.dashuju19.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.List;
/**
* @description 创建hbase的连接以及管理对象
* @date 2022-3-30
*/
public class HbaseJavaAPIMain {
public static void main(String[] args) throws IOException {
Admin admin = getAdmin();
// //调用删除表的方法
// boolean result = deleteTable(admin, "WATER_BILL");
// if (result) {
// System.out.println("删除成功");
// }else {
// System.out.println("表不存在");
// }
//
//
// 调用插入数据的方法
// putTable(admin.getConnection(),"water_bill","4944191","info","addr","石家庄市裕华区");
// //获取name值
// String name = getValue(admin.getConnection(),"water_bill","4944191","info","name");
// System.out.println(name);
// //输出rowkey
// getOne(admin.getConnection(),"water_bill","4944191");
删除某行数据
// deleteOne(admin.getConnection(),"water_bill","4944191");
// //查询六月份数据
queryDate(admin.getConnection(), "WATER_BILL","C1","RECORD_DATE","2020-06-01", "2020-06-30");
// // 关闭admin
// admin.close();
}
/**
* 获取admin对象
*
* @return
* @throws IOException
*/
public static Admin getAdmin() throws IOException {
//1.创建hbase的配置
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorom", "192.168.153.100");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.master", "192.168.153.100:16010");
//2.创建hbase连接
Connection connection = null;
Admin admin = null;
try {
//通过工厂模式,根据配置来创建连接
connection = ConnectionFactory.createConnection(configuration);
System.out.println(connection);
//3.创建admin对象
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
return admin;
}
/**
* 删除表
*
* @param admin
* @param name
* @throws IOException
* @return
*/
public static boolean deleteTable(Admin admin, String name) throws IOException {
//定义表名
TableName tableName = TableName.valueOf(name);
//判断表是否存在
if (admin.tableExists(tableName)) {
//禁用表名
admin.disableTable(tableName);
//删除表
admin.deleteTable(tableName);
return true;
} else {
return false;
}
}
/**
* 往表中插入数据
* @param connection
* @param tableName
* @param rowkey
* @param columnFamily
* @param column
* @param value
* @throws IOException
*/
public static void putTable(Connection connection,String tableName,String rowkey,String columnFamily,String column,String value) throws IOException {
//获取table对象
Table table = connection.getTable(TableName.valueOf(tableName));
//根据rowkey获取put对象
Put put = new Put(Bytes.toBytes(rowkey));
//添加姓名列
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value));
//插入数据
table.put(put);
//关闭table
table.close();
}
/**
* 获取某列的值
* @param connection
* @param tableName
* @param rowkey
* @param columnFamily
* @param column
* @return
* @throws IOException
*/
public static String getValue(Connection connection, String tableName, String rowkey, String columnFamily, String column) throws IOException {
//1.获取htable
Table table = connection.getTable(TableName.valueOf(tableName));
//2.使用rowkey构建get对象
Get get = new Get(Bytes.toBytes(rowkey));
//3.执行get请求。获取result对象
Result result = table.get(get);//result对象创建快捷键ctrl+alt+v
//4.某列的值
String name = Bytes.toString(result.getValue(columnFamily.getBytes(),column.getBytes()));
//System.out.println(name);
//System.out.println(result.toString());
// byte[] row = result.getRow();
//5.关闭表
table.close();
return name;
// System.out.println("rowkey=>" + Bytes.toString(row));
//获取所有单元格
//List<Cell> cells = result.listCells();
}
/**
* 获取并显示某行的数据
*/
public static void getOne(Connection connection,String tableName,String rowkey) throws IOException {
//1.获取htable
Table table = connection.getTable(TableName.valueOf(tableName));
//2.使用rowkey构建get对象
Get get = new Get(Bytes.toBytes(rowkey));
//3.执行get请求。获取result对象
Result result = table.get(get);//result对象创建快捷键ctrl+alt+v
//4.获取rowkey
byte[] row = result.getRow();
System.out.println("rowkey=>" + Bytes.toString(row));
//5.获取所有的单元格
List<Cell> cells = result.listCells();
//6.迭代处理每个单元格
for (Cell cell:cells) {
System.out.print(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()));
System.out.println("=>" + Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()));
}
//7.关闭表
table.close();
}
/**
* 删除指定行
*/
public static void deleteOne(Connection connection,String tableName,String rowkey) throws IOException {
//1.获取htable
Table table = connection.getTable(TableName.valueOf(tableName));
//2.使用rowkey构建delete对象
Delete delete = new Delete(Bytes.toBytes(rowkey));
//3.执行delete请求
table.delete(delete);
//4.关闭表
table.close();
}
/**
* 查询某列某个日期范围的数据
* @param connection
* @param tableName
* @param columnFamily
* @param column
* @param startValue
* @param endValue
* @throws IOException
*/
public static void queryDate(Connection connection,String tableName,String columnFamily,String column,String startValue,String endValue) throws IOException {
//1.获取htable
Table table = connection.getTable(TableName.valueOf(tableName));
//2.构建scan对象
Scan scan = new Scan();
//3.构建两个过滤器
//构建日期范围的过滤器
//构建开始日期的过滤器
SingleColumnValueFilter startDateFilter = new SingleColumnValueFilter(Bytes.toBytes(columnFamily),Bytes.toBytes(column),
CompareOperator.GREATER_OR_EQUAL,Bytes.toBytes(startValue));
//构建结束日期的过滤器
SingleColumnValueFilter endDateFilter = new SingleColumnValueFilter(Bytes.toBytes(columnFamily),Bytes.toBytes(column),
CompareOperator.LESS_OR_EQUAL,Bytes.toBytes(endValue));
//3.2 构建过滤器列表
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL,startDateFilter,endDateFilter);
//4.构建扫描器
scan.setFilter(filterList);
//5.执行scan扫描操作
ResultScanner resultScanner = table.getScanner(scan);
//6.迭代打印result
for (Result result:resultScanner) {
//6.1 打印rowkey
System.out.println("rowkey=>" + Bytes.toString(result.getRow()));
System.out.println("-------------------------------------------");
//6.2 迭代单元格列表
List<Cell> cells = result.listCells();
for (Cell cell:cells) {
//6.3打印数据
// 打印列簇名
System.out.print(Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()));
//打印列名
//解决数值型数据乱码
String columnName = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());
//System.out.print(":" + Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()));
System.out.println(":" + columnName);
//判断是否为数值型的列
if (columnName.equals("NUM_CURRENT") || columnName.equals("NUM_PREVIOUS")
|| columnName.equals("NUM_USAGE") || columnName.equals("TOTAL_MONEY")) {
//打印数值型值
System.out.println("=>" + Bytes.toDouble(cell.getValueArray(),cell.getValueOffset()));
}else{
//打印字符串值
System.out.println("=>" + Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()));}
}
System.out.println("--------------------------------------------");
}
//7.关闭资源
resultScanner.close();
table.close();
}
}
案例一、使用java api创建hbase的表
源码在上边总的
创建一个名为water_bill的表,包含一个列簇info。
编写代码
- 定义表名,判断表是否存在
- 表描述构建器,建立表描述对象
- 列簇描述构建器,建立列簇描述对象
- 表描述对象和列簇描述对象建立关系
- 创建表
运行
查看创建的表
案例二、使用java api删除表
源码在上边总的
删除刚刚创建的表
编写删除表的方法代码
- 定义表名,判断表是否存在
- 禁用表
- 删除表
调用方法
查看结果
案例三、往创建的表中插入数据
往water_bill****中插入姓名列的数据
源码在上边总的
这里需要先把案例一在运行一遍把表创建好
编写插入列数据的方法
- 使用hbase的连接获取Htable
- 构建rowkey、列簇名、列名、值
- 构建Put对象(对应put命令)
- 添加某列(列簇、列名、值)
- Htable对象执行put操作
- 关闭htable对象
在main方法中调用
查看执行结果
出错了不知道为啥
重新运行一下
运行成功
案例四、查看一条数据
查询显示rowkey为4944191的某列或者所有列的数据,
编写方法
获取某列的值
获取某行的数据
调用方法
查看结果
案例五、删除一条数据
删除rowkey为“4944191”的数据
编写方法
调用方法
查看结果
案例六、导入数据
需求
有一份10W条记录的抄表数据文件,需求将其导入hbase中
网盘链接
https://pan.baidu.com/s/1UEewxFODFPa2aREa-YjM2w?pwd=1234 提取码:1234
Import JOB导入大量的数据
在hbase中,有一个import的MR作业,可以专门用来将数据导入到hbase中
用法:
hbase org.apache.hadoop.hbase.mapreduce.Import 表名 hdfs数据文件路径
上传数据文件到hdfs上
导入数据
- 启动yarn
- 创建表
运行导入命令
hbase org.apache.hadoop.hbase.mapreduce.Import WATER_BILL /water_bill/
运行结果
查看数据
count计数
mapreduce计数
hbase org.apache.hadoop.hbase.mapreduce.RowCounter "WATER_BILL"
案例七、查询2020年6月份所有用户的用水量
需求分析
在hbase中用scan+filter实现过滤查询。2020年6月份其实就是从2020年6月1号到2020年6月30日的所有抄表数据
编写代码(源码也在上面总的)
调用方法
查看结果
输出结果代码改进
解决数值型数据显示乱码的问题
打印显示字符串数据是正常,但是如果HBase存储的是int、double、float等数值型数据时,显示就会乱码,解决的方法就是判断是否是数值型数据,如果是则进行相应的转换
显示结果
案例八:Export Job导出数据
用法:
hbase org.apache.hadoop.hbase.mapreduce.Export 表名 hdfs路径
这篇hbase的java aip实例在这里也算是告一段落了,下一章(4)是关于hbase高可用的相关介绍和实现,希望大家一起学习,一起进步。
如遇侵权,请联系删除。
版权归原作者 星欲冷hx 所有, 如有侵权,请联系我们删除。