0


SpringBoot使用Hbase

SpringBoot使用Hbase

文章目录

一,引入依赖

<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.3.2</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency>

二,配置文件添加自己的属性

hbase:zookeeper:quorum: 10.xxx.xx.153,10.xxx.xx.154,10.xxx.xx.155
    property:clientPort:2181master:port:9001

三,配置类注入HBASE配置

packagecom.hbase.config;importorg.apache.hadoop.hbase.HBaseConfiguration;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.cloud.context.config.annotation.RefreshScope;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * @ClassName: HBaseConfig
 * @author: Leemon
 * @Description: TODO
 * @date: 2023/4/12 18:06
 * @version: 1.0
 */@Configuration@RefreshScopepublicclassHBaseConfig{@Value("${hbase.zookeeper.quorum}")privateString zookeeperQuorum;@Value("${hbase.zookeeper.property.clientPort}")privateString clientPort;@Value("${hbase.master.port}")privateString masterPort;@Beanpublicorg.apache.hadoop.conf.ConfigurationhbaseConfiguration(){org.apache.hadoop.conf.Configuration conf =HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
        conf.set("hbase.zookeeper.property.clientPort", clientPort);// 如果hbase是集群,这个必须加上// 这个ip和端口是在hadoop/mapred-site.xml配置文件配置的
        conf.set("hbase.master", zookeeperQuorum +":"+ masterPort);

        conf.set("hbase.client.keyvalue.maxsize","20971520");
        conf =HBaseConfiguration.create(conf);return conf;}}

四,配置Hbase连接池

这里没有使用懒加载模式,减少启动后第一次访问时访问时间过长

packagecom.hbase.config;importlombok.extern.slf4j.Slf4j;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.client.Connection;importorg.apache.hadoop.hbase.client.ConnectionFactory;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;importjavax.annotation.Resource;importjava.io.IOException;importjava.util.Enumeration;importjava.util.Vector;/**
 * @ClassName: HbaseConnectionPool
 * @author: Leemon
 * @Description: TODO
 * @date: 2023/4/13 9:45
 * @version: 1.0
 */@Component@Slf4jpublicclassHbaseConnectionPool{/**
     * 连接池最大的大小
     */privateint nMaxConnections =20;/**
     * 连接池自动增加的大小
     */privateint nIncrConnectionAmount =3;/**
     * 连接池的初始大小
     */privateint nInitConnectionAmount =3;/**
     * 存放连接池中数据库连接的向量,初始时为null
     */privateVector vcConnections =null;@ResourceprivateConfiguration hbaseConfiguration;@PostConstructpublicvoidinit(){try{
            vcConnections =newVector();createConnections(nInitConnectionAmount);}catch(Exception e){
            e.printStackTrace();}}publicsynchronizedConnectiongetConnection(){// 确保连接池己被创建if(vcConnections ==null){// 连接池还没创建,则返回nullreturnnull;}// 获得一个可用的数据库连接Connection conn =getFreeConnection();// 如果目前没有可以使用的连接,即所有的连接都在使用中while(conn ==null){// 等一会再试try{wait(250);}catch(InterruptedException e){
                e.printStackTrace();}// 重新再试,直到获得可用的连接,如果getFreeConnection()返回的为null,则表明创建一批连接后也不可获得可用连接
            conn =getFreeConnection();}// 返回获得的可用的连接return conn;}/**
     * 本函数从连接池向量 connections 中返回一个可用的的数据库连接,如果
     * 当前没有可用的数据库连接,本函数则根据 incrementalConnections 设置
     * 的值创建几个数据库连接,并放入连接池中。
     * 如果创建后,所有的连接仍都在使用中,则返回 null
     * @return
     *         返回一个可用的数据库连接
     */privateConnectiongetFreeConnection(){// 从连接池中获得一个可用的数据库连接Connection conn =findFreeConnection();if(conn ==null){// 如果目前连接池中没有可用的连接// 创建一些连接try{createConnections(nIncrConnectionAmount);}catch(Exception e){// TODO Auto-generated catch block
                e.printStackTrace();
                log.error("create new connection fail.", e);}// 重新从池中查找是否有可用连接
            conn =findFreeConnection();if(conn ==null){// 如果创建连接后仍获得不到可用的连接,则返回 nullreturnnull;}}return conn;}/**
     * 创建由 numConnections 指定数目的数据库连接 , 并把这些连接
     * 放入 connections 向量中
     * @param _nNumConnections 要创建的数据库连接的数目
     * @throws Exception
     */privatevoidcreateConnections(int _nNumConnections)throwsException{// 循环创建指定数目的数据库连接for(int x =0; x < _nNumConnections; x++){// 是否连接池中的数据库连接的数量己经达到最大?最大值由类成员 maxConnections// 指出,如果 maxConnections 为 0 或负数,表示连接数量没有限制。// 如果连接数己经达到最大,即退出。if(this.nMaxConnections >0&&this.vcConnections.size()>=this.nMaxConnections){
                log.warn("已达到最大连接数,不能再增加连接");thrownewException("已达到最大连接数"+ nMaxConnections+",不能再增加连接");}// 增加一个连接到连接池中(向量 connections 中)
            vcConnections.addElement(newConnectionWrapper(newConnection()));

            log.info("HBase数据库连接己创建 ...... "+ x);}}/**
     * 查找池中所有的連接,查找一个可用的數據庫連接,
     * 如果没有可用的連結,返回null
     * @return
     *         返回一個可用的數據庫連接
     */privateConnectionfindFreeConnection(){Connection conn =null;ConnectionWrapper connWrapper =null;//獲得連接池向量中所有的對象Enumeration enumerate = vcConnections.elements();//遍歷所有的对象,看是否有可用的連接while(enumerate.hasMoreElements()){
            connWrapper =(ConnectionWrapper) enumerate.nextElement();if(!connWrapper.isBusy()){//如果此對象不忙,則獲得它的數據庫連接并把它設為忙
                conn = connWrapper.getConnection();
                connWrapper.setBusy(true);// 己经找到一个可用的連接,退出break;}}// 返回找到的可用連接return conn;}/**
     *创建一个新的数据库连接并返回它
     * @return
     *         返回一个新创建的数据库连接
     */privateConnectionnewConnection(){/** hbase 连接 */Connection conn =null;// 创建一个数据库连接try{
            conn =ConnectionFactory.createConnection(hbaseConfiguration);}catch(IOException e){
            log.error("创建HBase数据库连接失败!");
            e.printStackTrace();}// 返回创建的新的数据库连接return conn;}publicsynchronizedvoidreleaseConnection(Connection conn){if(this.vcConnections ==null){
            log.info("连接池不存在,无法返回此连接到连接池中!!");}else{ConnectionWrapper connWrapper =null;Enumeration enumerate =this.vcConnections.elements();while(enumerate.hasMoreElements()){
                connWrapper =(ConnectionWrapper) enumerate.nextElement();if(conn == connWrapper.getConnection()){
                    connWrapper.setBusy(false);break;}}}}classConnectionWrapper{/**
         * 数据库连接
         */privateConnection connection =null;/**
         * 此连接是否正在使用的标志,默认没有正在使用
         */privateboolean busy =false;/**
         * 构造函数,根据一个 Connection 构告一个 PooledConnection 对象
         */publicConnectionWrapper(Connection connection){this.connection = connection;}/**
         * 返回此对象中的连接
         */publicConnectiongetConnection(){return connection;}/**
         * 设置此对象的连接
         */publicvoidsetConnection(Connection connection){this.connection = connection;}/**
         * 获得对象连接是否忙
         */publicbooleanisBusy(){return busy;}/**
         * 设置对象的连接正在忙
         */publicvoidsetBusy(boolean busy){this.busy = busy;}}}

init()方法实现在初始化连接池的时候创建默认数值的连接。

五,配置操作服务类

操作类接口 HbaseService.java

packagecom.hbase.service;importorg.apache.hadoop.hbase.client.Scan;importjava.util.Map;/**
 * @InterfaceName: HbaseService
 * @author: Leemon
 * @Description: TODO
 * @date: 2023/4/12 18:11
 * @version: 1.0
 */publicinterfaceHbaseService{Map<String,Map<String,String>>getResultScanner(String tableName,String startRowKey,String stopRowKey);Map<String,String>getRowData(String tableName,String rowKey);Map<String,String>getFamilyValue(String tableName,String rowKey,String familyName);StringgetColumnValue(String tableName,String rowKey,String familyName,String columnName);Map<String,Map<String,String>>queryData(String tableName,Scan scan);}

接口实现类 HbaseServiceImpl.java

packagecom.hbase.service.impl;importcom.hbase.config.HbaseConnectionPool;importcom.hbase.service.HbaseService;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.StringUtils;importorg.apache.hadoop.hbase.Cell;importorg.apache.hadoop.hbase.TableName;importorg.apache.hadoop.hbase.client.*;importorg.apache.hadoop.hbase.filter.Filter;importorg.apache.hadoop.hbase.filter.PrefixFilter;importorg.apache.hadoop.hbase.util.Bytes;importorg.springframework.stereotype.Service;importjavax.annotation.Resource;importjava.io.IOException;importjava.text.MessageFormat;importjava.util.*;/**
 * @ClassName: HbaseServiceImpl
 * @author: Leemon
 * @Description: TODO
 * @date: 2023/4/12 18:13
 * @version: 1.0
 */@Slf4j@ServicepublicclassHbaseServiceImplimplementsHbaseService{@ResourceprivateHbaseConnectionPool pool;@OverridepublicMap<String,Map<String,String>>getResultScanner(String tableName,String startRowKey,String stopRowKey){Scan scan =newScan();if(StringUtils.isNotBlank(startRowKey)&&StringUtils.isNotBlank(stopRowKey)){
            scan.withStartRow(Bytes.toBytes(startRowKey));
            scan.withStopRow(Bytes.toBytes(stopRowKey));}returnthis.queryData(tableName,scan);}publicMap<String,Map<String,String>>getResultScannerPrefixFilter(String tableName,String prefix){Scan scan =newScan();if(StringUtils.isNotBlank(prefix)){Filter filter =newPrefixFilter(Bytes.toBytes(prefix));
            scan.setFilter(filter);}returnthis.queryData(tableName,scan);}@OverridepublicMap<String,Map<String,String>>queryData(String tableName,Scan scan){Map<String,Map<String,String>> result =newHashMap<>();ResultScanner rs =null;// 获取表Table table=null;Connection connection =null;try{
            connection = pool.getConnection();
            table =getTable(connection, tableName);
            rs = table.getScanner(scan);for(Result r : rs){//每一行数据Map<String,String> columnMap =newHashMap<>();String rowKey =null;for(Cell cell : r.listCells()){if(rowKey ==null){
                        rowKey =Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength());}
                    columnMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));}if(rowKey !=null){
                    result.put(rowKey,columnMap);}}}catch(IOException e){
            log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}",tableName),e);}finally{close(null, rs, table, connection);}return result;}@OverridepublicMap<String,String>getRowData(String tableName,String rowKey){//返回的键值对Map<String,String> result =newHashMap<>();Get get =newGet(Bytes.toBytes(rowKey));// 获取表Table table=null;Connection connection =null;try{
            connection = pool.getConnection();
            table =getTable(connection, tableName);Result hTableResult = table.get(get);if(hTableResult !=null&&!hTableResult.isEmpty()){for(Cell cell : hTableResult.listCells()){
                    result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));}// 某些应用场景需要插入到数据库的时间if(hTableResult.listCells().size()>0){
                    result.put("Timestamp", hTableResult.listCells().get(0).getTimestamp()+"");}}}catch(IOException e){
            log.error(MessageFormat.format("查询一行的数据失败,tableName:{0},rowKey:{1}",tableName,rowKey),e);}finally{close(null,null, table, connection);}return result;}@OverridepublicMap<String,String>getFamilyValue(String tableName,String rowKey,String familyName){//返回的键值对Map<String,String> result =newHashMap<>(2);Get get =newGet(Bytes.toBytes(rowKey));
        get.addFamily(Bytes.toBytes(familyName));// 获取表Table table=null;Connection connection =null;try{
            connection = pool.getConnection();
            table =getTable(connection, tableName);Result getResult = table.get(get);if(getResult !=null&&!getResult.isEmpty()){for(Cell cell : getResult.listCells()){
                    result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));}}}catch(IOException e){
            log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2}", tableName, rowKey, familyName), e);}finally{close(null,null, table, connection);}return result;}@OverridepublicStringgetColumnValue(String tableName,String rowKey,String familyName,String columnName){String str =null;Get get =newGet(Bytes.toBytes(rowKey));// 获取表Table table=null;Connection connection =null;try{
            connection = pool.getConnection();
            table =getTable(connection, tableName);Result result = table.get(get);if(result !=null&&!result.isEmpty()){Cell cell = result.getColumnLatestCell(Bytes.toBytes(familyName),Bytes.toBytes(columnName));if(cell !=null){
                    str =Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());}}}catch(IOException e){
            log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}",tableName,rowKey,familyName,columnName),e);}finally{close(null,null, table, connection);}return str;}privateTablegetTable(Connection connection,String tableName)throwsIOException{Table table = connection.getTable(TableName.valueOf(tableName));return table;}privatevoidclose(Admin admin,ResultScanner rs,Table table,Connection connection){if(admin !=null){try{
                admin.close();}catch(IOException e){
                log.error("关闭Admin失败",e);}}if(rs !=null){
            rs.close();}if(table !=null){try{
                table.close();}catch(IOException e){
                log.error("关闭Table失败",e);}}// 释放连接if(Objects.nonNull(connection)){
            pool.releaseConnection(connection);}}}

ok,现在就可以操作使用了。

以前都是在非Spring环境下使用Hbase的,一开始会出现:当服务使用时间过久,某些会使用hbase的接口调用次数过多的时候,会报【已超过最大的连接数】,只能每一次调用接口后最后一行加上释放连接。(以前的做法每次调用都要在代码里手动获取一个连接)

这次将释放连接都集成在操作服务类的实现方法中,避免了开发接口可能遗漏的错误,可能不会再出现这个问题。

标签: hbase spring boot java

本文转载自: https://blog.csdn.net/lmchhh/article/details/130165988
版权归原作者 李奈 - Leemon 所有, 如有侵权,请联系我们删除。

“SpringBoot使用Hbase”的评论:

还没有评论