一、环境准备
- 创建一个 Maven 工程并引入依赖
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.3.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.3.1</version></dependency>
- 创建 API 测试类并获取 HBase 客户端连接
publicclassTestAPI{privatestaticConnection conn =null;privatestaticAdmin admin =null;// DDL 操作客户端static{try{// 1.获取配置信息对象// 过时API// HBaseConfiguration conf1 = new HBaseConfiguration();// conf1.set("hbase.zookeeper.quorum","hadoop102,hadoop103,hadoop104");// 新APIConfiguration conf =HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","hadoop102,hadoop103,hadoop104");// 2.创建连接对象(新API) conn =ConnectionFactory.createConnection(conf);// 3.创建hbase管理员对象// 过时API// HBaseAdmin admin1 = new HBaseAdmin(conf1);// 新API admin = conn.getAdmin();}catch(Exception e){ e.printStackTrace();}}// 关闭资源publicstaticvoidclose(){try{if(admin !=null){ admin.close();}if(conn !=null){ conn.close();}}catch(Exception e){ e.printStackTrace();}}}
二、HBase API 实操
1. 判断表是否存在
publicstaticbooleanisTableExist(String tableName){// 过时API// boolean exist = admin1.tableExists(tableName);// 新APIboolean exist = admin.tableExists(TableName.valueOf(tableName));return exist;}
2. 创建表
publicstaticvoidcreateTable(String tableName,String... cfs){// 1.判断传入列族信息if(cfs.length()<=0){System.out.println("必须设置一个列族信息");return;}// 2.判断表是否存在if(isTableExist(tableName)){System.out.println(tableName +"表已经存在");return;}// 3.创建表描述器HTableDescriptor hTableDescriptor =newHTableDescriptor(TableName.valueOf(tableName));// 4.循环添加列族for(String cfs : cfs){// 5.创建列族描述器HColumnDescriptor hColumnDescriptor =newHColumnDescriptor(cfs);// 6.添加单个列族
hTableDescriptor.addFamily(hColumnDescriptor);}// 7.创建表
admin.createTable(hTableDescriptor);System.out.println(tableName +"表创建成功!");}
3. 删除表
publicstaticvoiddeleteTable(String tableName){// 1.判断表是否存在if(!isTableExist(tableName)){System.out.println(tableName +"表已经不存在!");return;}// 2.使表下线
admin.disableTable(TableName.valueOf(tableName));// 3.删除表
admin.deleteTable(TableName.valueOf(tableName));System.out.println(tableName +"表删除成功!");}
4. 创建命名空间
publicstaticvoidcreateNamespace(String ns){// 1.创建命名空间描述器NamespaceDescriptor namespaceDescriptor =NamespaceDescriptor.create(ns).build();try{// 2.创建命名空间
admin.createNamespace(namespaceDescriptor);System.out.println(ns +"命名空间创建成功!");}catch(NamespaceExistException e){// 生产上判断 ns 是否存在System.out.println(ns +"命名空间已经存在!");}catch(Exception e){
e.printStackTrace();}}
5. 插入数据
publicstaticvoidputData(String tableName,String rowKey,String cf,String cn,String value){// 1.创建hbase表操作对象Table table = conn.getTable(TableName.valueOf(tableName));// 2.创建 put 对象// HBase 底层存储的数据格式都是 byte[],可以通过 hbase.util 包下的 Bytes 类进行数据类型转换Put put =newPut(Bytes.toBytes(rowKey));// 3.给put对象赋值
put.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn),Bytes.toBytes(value));// 4.插入数据
table.put(put);// 5.关闭资源
table.close();}
6. 获取数据
6.1 get
publicstaticvoidgetData(String tableName,String rowKey,String cf,String cn){// 1.创建hbase表操作对象Table table = conn.getTable(TableName.valueOf(tableName));// 2.创建get对象Get get =newGet(Bytes.toBytes(rowKey));// 2.1.指定获取的列族// get.addFamily(Bytes.toBytes(cf));// 2.2.指定获取的列族和列// get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));// 2.3.指定获取数据的版本数// get.setMaxVersions(); // 相当于 scan 'tb',{RAW=>TRUE,VERSIONS=>10}// 3.获取一行的数据Result result = table.get(get);// 4.获取一行的所有 cellfor(Cell cell : result.rawCells()){// 5.打印数据信息System.out.println("family:"+Bytes.toString(CellUtil.cloneFamily(cell))+",qualifier:"+Bytes.toString(CellUtil.cloneQualifier(cell))+",value:"+Bytes.toString(CellUtil.cloneValue(cell)));}// 6.关闭资源
table.close();}
6.2 scan
publicstaticvoidscanTable(String tableName){// 1.创建hbase表操作对象Table table = conn.getTable(TableName.valueOf(tableName));// 2.构建scan对象Scan scan =newScan();// 全表// Scan scan = new Scan(Bytes.toBytes("1001"),Bytes.toBytes("1003")); // 限定 rowKey 范围,左闭右开// 3.扫描表ResultScanner resultScanner = table.getScanner(scan);// 4.解析resultScannerfor(Result result : resultScanner){// 按 rowKey 从小到大遍历获取// 5.解析resultfor(Cell cell : result.rawCells()){// 6.打印数据信息System.out.println("rk:"+Bytes.toString(CellUtil.cloneRow(cell))+",family:"+Bytes.toString(CellUtil.cloneFamily(cell))+",qualifier:"+Bytes.toString(CellUtil.cloneQualifier(cell))+",value:"+Bytes.toString(CellUtil.cloneValue(cell)));}}// 7.关闭资源
table.close();}
7. 删除数据
delete 操作最终还是 put 操作
publicstaticvoiddeleteData(String tableName,String rowKey,String cf,String cn){// 1.创建hbase表操作对象Table table = conn.getTable(TableName.valueOf(tableName));// 2.构建delete对象Delete delete =newDelete(Bytes.toBytes(rowKey));// 相当于 deleteall 命令// 2.1.设置删除列// delete.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn)); // 删除指定版本的列,不指定则为最大版本,小于该版本的列信息可以查询出来(慎用)// delete.addColumns(Bytes.toBytes(cf),Bytes.toBytes(cn)); // 删除列的所有版本// 2.2.设置删除的列族// delete.addFamily(Bytes.toBytes(cf));// 3.删除数据
table.delete(delete);// 4.关闭资源
table.close();}
- 只指定 rowKey 删除:删除指定 rowKey 的所有列族所有版本数据,标记为 deleteFamily
- 指定 rowKey + 列族 [+ 版本]:删除指定 rowKey 的指定列族所有版本 (小于等于该版本) 数据,标记为 deleteFamily
- 指定 rowKey + 列族 + 列 [+ 版本]: - addColumns():删除指定 rowKey 的指定列族的指定列的所有版本 (小于等于该版本) 数据,标记为 deleteColumn- addColumn():删除指定 rowKey 的指定列族的指定列的最新版本 (该指定版本) 数据,标记为 delete,生产上慎用
三、与 MapReduce 交互
1. 环境搭建
- 查看 MR 操作 HBase 所需的 jar 包:
cd /opt/module/hbasebin/hbase mapredcp
- 在 Hadoop 中导入环境变量- 临时生效:
# 在命令行执行exportHBASE_HOME=/opt/module/hbaseexportHADOOP_HOME=/opt/module/hadoop-2.7.2exportHADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp````- 永久生效:
# 在 /etc/profile.d/my_env.sh 配置exportHBASE_HOME=/opt/module/hbaseexportHADOOP_HOME=/opt/module/hadoop-2.7.2# 在 hadoop-env.sh 中配置,在有关 HADOOP_CLASSPATH 的 for 循环之后添加exportHADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*``` - 分发配置到其他节点
- 启动 Hadoop 集群和 HBase 集群
2. 案例实操
2.1 官方案例
- 统计 stu 表中有多少行数据
/opt/module/hadoop-2.7.2/bin/yarn jar \/opt/module/hbase/lib/hbase-server-1.3.1.jar \ rowcounter student
- 使用 MapReduce 将本地数据导入到 HBase
# 在本地创建一个 tsv 格式的文件:fruit.tsvvim fruit.tsv1001 Apple Red1002 Pear Yellow1003 Pineapple Yellow# 在 HDFS 中创建 input_fruit 文件夹并上传 fruit.tsv 文件/opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit//opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/# 创建 Hbase 表Hbase(main):001:0> create 'fruit','info'# 执行 MapReduce 到 HBase 的 fruit 表中(若表不存在则报错)/opt/module/hadoop-2.7.2/bin/yarn jar \/opt/module/hbase/lib/hbase-server-1.3.1.jar \importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \hdfs://hadoop102:9000/input_fruit/fruit.tsv# 使用 scan 命令查看导入后的结果Hbase(main):001:0> scan 'fruit'
2.2 自定义案例
- 案例 1:实现将 HDFS 中的数据写入到 Hbase 表中- 编码:
// 构建 FruitMapper 用于读取 HDFS 中的文件数据publicclassFruitMapperextendsMapper<LongWritable,Text,LongWritable,Text>{@overrideprotectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{ context.write(key, value);}}// 构建 FruitReducer 用于将 HDFS 中的文件数据写入 Hbase// TableReducer 默认的输出value类型是 Mutation,其子类有 put/delete 等publicclassFruitReducerextendsTableReducer<LongWritable,Text,NullWritable>{@overrideprotectedvoidreduce(LongWritable key,Iterable<Text> values,Context context)throwsIOException,InterruptedException{// 1.遍历获取每行数据for(Text value : values){// 2.切割一行数据String[] fields = value.toString().split("\t");// 3.构建put对象Put put =newPut(Bytes.toBytes(fields[0]));// 4.为put对象赋值 put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(fields[1])); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(fields[2]));// 5.数据写入hbase context.write(NullWritable.get(),put);}}}// 构建 FruitDriverpublicclassFruitDriverimplementsTool{privateConfiguration conf =null;@overridepublicintrun(String[] args)throwsException{// 1.获取jobJob job =Job.getInstance(conf);// 2.设置驱动类路径 job.setJarByClass(FruitDriver.class);// 3.设置Mapper及输出KV类型 job.setMapperClass(FruitMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class);// 4.设置Reducer(不能使用普通的 Reducer 设置)TableMapReduceUtil.initTableReducerJob(args[1],FruitReducer.class, job);// 5.设置输入路径FileInputFormat.setInputPaths(job,newPath(args[0]));// 6.提交jobboolean result = job.waitForCompletion(true);return result ?0:1;}@overridepublicvoidsetConf(Configuration conf){this.conf = conf;}@overridepublicConfigurationgetConf(){return conf;}publicstaticvoidmain(String[] args){try{Configuration conf =newConfiguration();int status =ToolRunner.run(conf,newFruitDriver(), args);System.exit(status);}catch(Exception e){ e.printStackTrace();}}}
- 将代码打包后上传到 hbase 集群服务器并执行# 首先创建 hbase fruit 表# 执行:/opt/module/hadoop-2.7.2/bin/yarn jar \/opt/module/hbase/Hbase-0.0.1-SNAPSHOT.jar com.xxx.FruitDriver \ /input_fruit/fruit.tsv fruit
- 案例 2:将 fruit 表中的一部分数据,通过 MR 迁入到 fruit2 表中- 编码
// 构建 Fruit2Mapper 用于读取 Hbase 中的 Fruit 表数据// TableMapper 默认的KV输入类型为 ImmutableBytesWritable, ResultpublicclassFruit2MapperextendsTableMapper<ImmutableBytesWritable,Put>{@overrideprotectedvoidmap(ImmutableBytesWritable key,Result value,Context context)throwsIOException,InterruptedException{// 1.构建PUT对象Put put =newPut(key.get());// key 是 rowKey 值// 2.解析Resultfor(Cell cell : value.rawCells()){// 3.获取列为 name 的 cellif("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){// 4.为put对象赋值 put.add(cell);}}// 5.写出 context.write(key, put);}}// 构建 Fruit2Reducer 用于将数据写入 Hbase// TableReducer 默认的输出value类型是 Mutation,其子类有 put/delete 等publicclassFruit2ReducerextendsTableReducer<ImmutableBytesWritable,Put,NullWritable>{@overrideprotectedvoidreduce(ImmutableBytesWritable key,Iterable<Put> values,Context context)throwsIOException,InterruptedException{// 遍历写出for(Put put : values){ context.write(NullWritable.get(), put);}}}// 构建 Fruit2DriverpublicclassFruit2DriverimplementsTool{privateConfiguration conf =null;@overridepublicintrun(String[] args)throwsException{// 1.获取jobJob job =Job.getInstance(conf);// 2.设置驱动类路径 job.setJarByClass(Fruit2Driver.class);// 3.设置Mapper及输出KV类型(不能使用普通的 Mapper 设置)TableMapReduceUtil.initTableMapperJob(args[0],newScan(),Fruit2Mapper.class,ImmutableBytesWritable.class,Put.class, job);// 4.设置Reducer(不能使用普通的 Reducer 设置)TableMapReduceUtil.initTableReducerJob(args[1],Fruit2Reducer.class, job);// 5.提交jobboolean result = job.waitForCompletion(true);return result ?0:1;}@overridepublicvoidsetConf(Configuration conf){this.conf = conf;}@overridepublicConfigurationgetConf(){return conf;}publicstaticvoidmain(String[] args){try{// Configuration conf = new Configuration();Configuration conf =HbaseConfiguration.create();int status =ToolRunner.run(conf,newFruit2Driver(), args);System.exit(status);}catch(Exception e){ e.printStackTrace();}}}
- 集群测试:将代码打包后上传到 hbase 集群服务器并执行# 首先创建 hbase fruit2 表# 执行:/opt/module/hadoop-2.7.2/bin/yarn jar \/opt/module/hbase/Hbase-0.0.1-SNAPSHOT.jar com.xxx.Fruit2Driver \fruit fruit2
- 本地测试:- 在 Maven 工程的 resources 目录下创建hbase-site.xml
文件- 将hbase/conf/hbase-site.xml
的内容拷贝到上面创建的文件中- 执行 Fruit2Driver 程序 main 方法
四、集成 Hive
1. 与 Hive 对比
- Hive - 数据仓库:Hive 的本质其实就相当于将 HDFS 中已经存储的文件在 Mysql 中做了一个双射关系,以方便使用 HQL 去管理查询- 用于数据分析、清洗:Hive 适用于离线的数据分析和清洗,延迟较高- 基于 HDFS、MapReduce:Hive 存储的数据依旧在 DataNode 上,编写的 HQL 语句终将是转换为 MapReduce 代码执行
- HBase - 数据库:是一种面向列族存储的非关系型数据库- 用于存储结构化和非结构化的数据:适用于单表非关系型数据的存储,不适合做关联查询,类似 JOIN 等操作- 基于 HDFS:数据持久化存储的体现形式是 HFile,存放于 DataNode 中,被 ResionServer 以 region 的形式进行管理- 延迟较低,接入在线业务使用:面对大量的企业数据, HBase 可以直线单表大量数据的存储,同时提供了高效的数据访问速度
2. 集成使用
2.1 环境搭建
- 在
/etc/profile.d/my_env.sh
中配置 Hive 和 HBase 环境变量vim /etc/profile.d/my_env.shexportHBASE_HOME=/opt/module/hbaseexportHIVE_HOME=/opt/module/hive
- 使用软链接将操作 HBase 的 Jar 包关联到 Hive
ln-s$HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib/hbase-common-1.3.1.jarln-s$HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbaseserver-1.3.1.jarln-s$HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jarln-s$HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jarln-s$HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jarln-s$HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jarln-s$HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jarln-s$HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
- 在
hive-site.xml
中添加 Zookeeper 连接信息<property><name>hive.zookeeper.quorum</name><value>hadoop102,hadoop103,hadoop104</value><description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description></property><property><name>hive.zookeeper.client.port</name><value>2181</value><description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description></property>
- 下载 Hive 对应版本的源码包,使用 Eclipse 工具创建一个普通 Java 工程 import 这个 hbase-handler 文件夹到工程,在工程下创建一个 lib 文件并导入
hive/lib
下所有的.jar
文件(add build path
),使用 export 重新编译hive-hbase-handler-1.2.2.jar
(取消勾选 lib 目录),将新编译的 jar 包替换掉hive/lib
下的 jar 包
2.2 案例实操
- 案例 1:建立 Hive 表,关联 HBase 表,插入数据到 Hive 表的同时能够影响 HBase 表
# 在 Hive 中创建表同时关联 HBase,然后分别进入 Hive 和 HBase 查看CREATE TABLE hive_hbase_emp_table( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int)STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'WITH SERDEPROPERTIES ("hbase.columns.mapping"=":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")TBLPROPERTIES ("hbase.table.name"="hbase_emp_table");# 在 Hive 中创建临时中间表,用于 load 文件中的数据# 提示:不能将数据直接 load 进 Hive 所关联 HBase 的那张表中CREATE TABLE emp( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int)row format delimited fields terminated by '\t';# 向 Hive 中间表中 load 数据load data local inpath '/home/admin/softwares/data/emp.txt' into table emp;# 通过 insert 命令将中间表中的数据导入到 Hive 关联 Hbase 的那张表中insert into table hive_hbase_emp_table select * from emp;# 查看 Hive 以及关联的 HBase 表中是否已经成功的同步插入了数据select * from hive_hbase_emp_table;scan 'hbase_emp_table'
- 案例 2:针对已经存在的 HBase 表
hbase_emp_table
,在 Hive 中创建一个外部表来关联这张表,并使用 HQL 操作# 在 Hive 中创建外部表关联 HBase 表(必须创建外部表)CREATE EXTERNAL TABLE relevance_hbase_emp( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int)STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'WITH SERDEPROPERTIES ("hbase.columns.mapping"=":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") TBLPROPERTIES ("hbase.table.name"="hbase_emp_table");# 使用 HQL 操作select * from relevance_hbase_emp;
本文转载自: https://blog.csdn.net/weixin_44480009/article/details/139754716
版权归原作者 文刀小桂 所有, 如有侵权,请联系我们删除。
版权归原作者 文刀小桂 所有, 如有侵权,请联系我们删除。