0


Kerberos安全认证-连载11-HBase Kerberos安全配置及访问

技术连载系列,前面内容请参考前面连载10内容:​​​​​​​​​​​​​​Kerberos安全认证-连载10-Hive Kerberos 安全配置及访问_IT贫道的博客-CSDN博客

大数据组件HBase也可以通过Kerberos进行安全认证,由于HBase中需要zookeeper进行元数据管理、主节点选举、故障恢复,所以这里对HBase进行Kerberos安全认证时,建议也对Zookeeper进行安全认证。

1. Zookeeper Kerberos配置

zookeeper集群节点分布如下:

节点IP

节点名称

Zookeeper

192.168.179.4

node1

192.168.179.5

node2

192.168.179.6

node3

192.168.179.7

node4

192.168.179.8

node5

目前启动zookeeper集群后,可以通过任意Zookeeper客户端操作Zookeeper,可以通过Kerberos对Zookeeper进行认证避免非认证用户操作Zookeeper。按照如下步骤进行zookeeper kerberos认证。

1) 创建zookeeper Princial服务主体

在node1节点执行如下命令创建zookeeper Princial服务主体。

[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node3"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node4"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node5"

​​​​​​​2) 将zookeeper服务主体写入到keytab文件

在node1节点执行如下命令,将zookeeper服务主体写入到keytab文件:

[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/[email protected]"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/[email protected]"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/[email protected]"

以上命令执行完成后,可以在node1节点/home/keytabs目录下生成zookeeper.service.keytab文件。

3) 将keytab文件分发到其他节点并修改权限

这里将node1节点生成的zookeeper.service.keytab文件分发到node2~node5节点,这里也可以直接分发到zookeeper服务所在节点,为了保持集群各个节点keytab文件一致,这里将该keytab文件分发到集群各个节点中。

#在node1节点中执行如下命令进行分发
[root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node2:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node3:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node4:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node5:/home/keytabs/

分发到node2~node5各个节点后,执行如下命令对zookeeper.service.keytab修改权限。

#node1~node5所有节点执行如下命令
chown root:hadoop /home/keytabs/zookeeper.service.keytab
chmod 770 /home/keytabs/zookeeper.service.keytab

​​​​​​​4) 配置zoo.cfg文件

在node3~node5 zookeeper节点上配置zoo.cfg配置文件,每个节点在该文件最后追加如下配置:

kerberos.removeHostFromPrincipal=true
kerberos.removeRealmFromPrincipal=true
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
sessionRequireClientSASLAuth=false
skipACL=yes

以上配置项的解释如下:

  • kerberos.removeHostFromPrincipal

kerberos认证时是否从Principal中移除主机名部分。

  • kerberos.removeRealmFromPrincipal

kerberos认证时是否从Principal中移除领域(Realm)部分。

  • authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

指定ZooKeeper对客户端进行身份验证时使用的身份验证类。

  • jaasLoginRenew

Jaas登录(Java Authentication and Authorization Service)的续约时间,这里是每隔1小时需要续约Jaas登录。

  • sessionRequireClientSASLAuth

是否要求客户端进行SASL认证,SASL(Simple Authentication and Security Layer)认证是一种安全机制,用于在客户端和服务器之间进行身份验证和加密通信。当设置为要求客户端进行SASL认证时,客户端必须提供有效的凭证(如用户名和密码)来证明其身份,并与ZooKeeper服务器进行安全的通信。

这里需要将该值设置为false,否则Hadoop HA 不能正常启动。

  • skipACL=yes

跳过Zookeeper 访问控制列表(ACL)验证,允许连接zookeper后进行读取和写入。这里建议跳过,否则配置HBase 启动后不能向Zookeeper中写入数据。

这里在node3节点进行zoo.cfg文件的配置,配置完成后,将zoo.cfg文件分发到node4、node5 zookeeper节点上。

[root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/zoo.cfg node4:/software/apache-zookeeper-3.6.3-bin/conf/
[root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/zoo.cfg node5:/software/apache-zookeeper-3.6.3-bin/conf/

​​​​​​​​​​​​​​5) 配置jaas.conf文件

在node3~node5各个zookeeper节点中ZOOKEEPER_HOME/conf/目录下创建jaas.conf文件,该文件用于zookeeper服务端与客户端进行认证。配置内容如下:

Server {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/zookeeper.service.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/[email protected]";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/zookeeper.service.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/[email protected]";
};

以上配置文件是在node3 zookeeper节点配置,将以上文件分发到node4、node5节点后,需要将principal修改成对应节点的信息。

node4 zookeeper节点jaas.conf配置如下:

Server {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/zookeeper.service.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/[email protected]";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/zookeeper.service.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/[email protected]";
};

node5 zookeeper节点jaas.conf配置如下:

Server {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/zookeeper.service.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/[email protected]";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/zookeeper.service.keytab"
  storeKey=true
  useTicketCache=false
  principal="zookeeper/[email protected]";
};

​​​​​​​6) 配置java.env文件

在zookeeper各个节点ZOOKEEPER_HOME/conf目录下创建java.env文件,写入如下内容,这里在node3~node5各个节点都需配置。

#指定jaas.conf文件路径
export JVMFLAGS="-Djava.security.auth.login.config=/software/apache-zookeeper-3.6.3-bin/conf/jaas.conf"

以上可以先在node3节点进行配置,然后分发到node4~node5节点上。如下:

[root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/java.env node4:/software/apache-zookeeper-3.6.3-bin/conf/
[root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/java.env node5:/software/apache-zookeeper-3.6.3-bin/conf/

​​​​​​​​​​​​​​7) 启动zookeeper

在node3~node5各个节点执行如下命令,启动zookeeper。

#启动zookeeper
zkServer.sh start

#检查zookeeper状态
zkServer.sh status

​​​​​​​2. HBase配置Kerberos

HBase也支持Kerberos安全认证,经过测试,HBase2.5.x版本在经过Kerberos认证后与Hadoop通信认证有异常,具体报错如下:

hdfs.DataStreamer: Exception in createBlockOutputStream java.io.IOException: Invalid token in javax.security.sasl.qop: DI

这里建议使用HBase2.5.x以下版本与kerberos进行整合,但HBase版本的选择也需要参考Hadoop的版本,可以在Hbase官网找到Hadoop版本匹配的HBase版本,地址为:https://hbase.apache.org/book.html#hadoop

我们这里使用的Hadoop版本为3.3.4,HBase版本选择2.2.6版本。HBase Kerberos安全认证配置步骤如下:

1) 创建HBase服务用户并两两节点进行免密

在Hadoop体系中操作HBase时通常使用hbase用户,这里在集群各个节点创建hbase用户并设置在HBase用户下,节点之间的免密。

在node1~node5节点创建hbase用户,所属hadoop组。

#这里设置用户密码为123456
useradd hbase -g hadoop
passwd hbase

各个节点切换到hbase用户,并设置各个节点两两免密。

#所有节点切换成hbase用户
su hbase
cd ~

#node1~node5所有节点生成SSH密钥
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa

#node1~node5所有节点公钥复制到node1节点上,这个过程需要输入yes和密码
ssh-copy-id node1

#将node1 authorized_keys文件分发到node1~node5各节点,这个过程需要输入密码
[hbase@node1 ~]$ cd ~/.ssh/
[hbase@node1 .ssh]$ scp authorized_keys node2:`pwd`
[hbase@node1 .ssh]$ scp authorized_keys node3:`pwd`
[hbase@node1 .ssh]$ scp authorized_keys node4:`pwd`
[hbase@node1 .ssh]$ scp authorized_keys node5:`pwd`

#两两节点进行ssh测试,这一步骤必须做,然后node1~node5节点退出当前hbase用户
exit

​​​​​​​2) 创建HBase服务Princial主体并写入到keytab文件

在kerberos服务端node1节点执行如下命令创建HBase服务主体:

[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node3"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node4"
[root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node5"

在kerberos服务端node1节点执行如下命令将hbase主体写入到keytab文件:

#node1节点执行命令,将主体写入到keytab
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/[email protected]"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/[email protected]"
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/[email protected]"

以上命令执行完成后,在node1节点/home/keytabs目录下生成hbase.service.keytab文件,将该文件分发到各个节点并赋权,这里可以只发送到node3~node5 Hbase所在节点,为了保证各个大数据集群节点的keytabs一致,这里分发到所有节点。

[root@node1 ~]# scp /home/keytabs/hbase.service.keytab node2:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/hbase.service.keytab node3:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/hbase.service.keytab node4:/home/keytabs/
[root@node1 ~]# scp /home/keytabs/hbase.service.keytab node5:/home/keytabs/

分发完成后,在集群各个节点上执行如下命令,修改hbase.service.keytab密钥文件所属用户和组信息:

chown root:hadoop /home/keytabs/hbase.service.keytab
chmod 770 /home/keytabs/hbase.service.keytab

​​​​​​​3) 修改hbase-site.xml配置文件

在所有hbase节点中修改HBASE_HOME/conf/hbase-site.xml文件内容,配置支持kerberos,向hbase-site.xml文件中追加如下内容:

  <!-- hbase开启安全认证 -->
  <property>
    <name>hbase.security.authorization</name>
    <value>true</value>
  </property>
  <!-- hbase配置kerberos安全认证 -->
  <property>
      <name>hbase.security.authentication</name>
      <value>kerberos</value>
  </property>
  <!-- HMaster配置kerberos安全凭据认证 -->
  <property>
      <name>hbase.master.kerberos.principal</name>
      <value>hbase/[email protected]</value>
  </property>
  <!-- HMaster配置kerberos安全证书keytab文件位置 -->
  <property>
      <name>hbase.master.keytab.file</name>
      <value>/home/keytabs/hbase.service.keytab</value>
  </property>
  <!-- Regionserver配置kerberos安全凭据认证 -->
  <property>
      <name>hbase.regionserver.kerberos.principal</name>
      <value>hbase/[email protected]</value>
  </property>
  <!-- Regionserver配置kerberos安全证书keytab文件位置 -->
  <property>
      <name>hbase.regionserver.keytab.file</name>
      <value>/home/keytabs/hbase.service.keytab</value>
  </property>

这里可以先在node3节点进行配置,配置完成后分发到node4、node5节点:

[root@node3 ~]# scp /software/hbase-2.2.6/conf/hbase-site.xml node4:/software/hbase-2.2.6/conf/
[root@node3 ~]# scp /software/hbase-2.2.6/conf/hbase-site.xml node5:/software/hbase-2.2.6/conf/

​​​​​​​4) 准备hdfs-site.xml和core-site.xml文件

HBase底层存储使用HDFS,这里需要将Hadoop中hdfs-site.xml和core-site.xml文件发送到hbase各个节点HBASE_HOME/conf/目录中。在node3~node5各个HBase节点中执行如下命令:

#node3~node5各个节点执行
cp /software/hadoop-3.3.4/etc/hadoop/hdfs-site.xml /software/hbase-2.2.6/conf/
cp /software/hadoop-3.3.4/etc/hadoop/core-site.xml /software/hbase-2.2.6/conf/

​​​​​​​​​​​​​​5) 准备zk-jaas.conf文件

HBase中会使用zookeeper管理元数据和选主,这里由于Zookeeper中已经开启了Kerberos认证,所以需要准备zk-jaas.conf文件连接zookeeper时进行认证。在HBase各个节点HBASE_HOME/conf目录下创建zk-jaas.conf文件,写入如下内容,不同的节点设置的principal是对应HBase主机节点。

node3节点HBASE_HOME/conf/zk-jaas.conf文件内容:

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/hbase.service.keytab"
  useTicketCache=false
  principal="hbase/[email protected]";
};

node4节点HBASE_HOME/conf/zk-jaas.conf文件内容:

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/hbase.service.keytab"
  useTicketCache=false
  principal="hbase/[email protected]";
};

node5节点HBASE_HOME/conf/zk-jaas.conf文件内容:

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/keytabs/hbase.service.keytab"
  useTicketCache=false
  principal="hbase/[email protected]";
};

​​​​​​​​​​​​​​6) 修改hbase-env.sh配置

在各个hbase节点上配置HBASE_HOME/conf/hbase-env.sh,将HBASE_OPTS选项修改为如下:

#node3~node5各个节点修改hbase-env.sh中的HBASE_OPTS配置
export HBASE_OPTS="$HBASE_OPTS -XX:+UseConcMarkSweepGC -Djava.security.auth.login.config=/software/hbase-2.2.6/conf/zk-jaas.conf"

​​​​​​​​​​​​​​7) 修改HBase安装目录权限

在HBase各个节点创建HBASE_HOME/logs目录,如果该目录不存在需要提前创建,存在即可忽略,该目录为HBase日志目录,需要将该目录权限修改为root:hadoop、访问权限为770,否则hbase用户在启动Hbase集群时没有向该目录写日志权限。

#在node3~node5 HBase节点执行命令创建HBASE_HOME/logs
mkdir -p /software/hbase-2.2.6/logs

将各个节点的hbase安装目录权限修改为root:hadoop,logs目录访问权限为770:

#在node3~node5 HBase节点执行命令
chown -R root:hadoop  /software/hbase-2.2.6
chmod -R 770 /software/hbase-2.2.6/logs

​​​​​​​3. HBase启动及访问验证

在启动HBase集群之前,需要将Zookeeper和HDFS集群重启,Zookeeper和HDFS启动命令如下:

#停止hadoop集群
[root@node1 ~]# stop-all.sh 

#node3~node5节点,重启zookeeper
[root@node3 ~]# zkServer.sh restart
[root@node3 ~]# zkServer.sh restart
[root@node3 ~]# zkServer.sh restart

#启动HDFS集群
[root@node1 ~]# start-all.sh 

如果没有配置Kerberos认证时HBase之前启动过,在HDFS中将会有/hbase 目录,需要使用hdfs超级管理员用户将该目录的用户和所属组修改成hbase:hadoop。

#在node5节点执行
[root@node5 ~]# su hdfs
[hdfs@node5 ~]$ kinit hdfs
Password for [email protected]: 123456

[hdfs@node5 logs]$ hdfs dfs -ls /
Found 6 items
drwxr-xr-x   - root     supergroup          0 2023-05-23 10:50 /hbase

#修改用户和所属组
[hdfs@node5 logs]$ hadoop dfs -chown -R hbase:hadoop /hbase

在HBase主节点node4上执行如下命令启动HBase集群:

#无需切换hbase用户,可以直接在node4节点执行如下命令启动hbase集群
[root@node4 ~]# sudo -i -u hbase start-hbase.sh

注意:以上启动Hbase集群命令中没有切换到hbase用户,sudo为使用超级用户权限执行命令,-i表示交互模式执行命令,-u指定切换的用户。

HBase启动后,可以通过http://node4:16010进行访问:

​​​​​​​4. HBase Shell操作HBase

在node3~node5任意节点登录hbase shell,可以看到如果节点没有进行用户认证,读取HBase数据时没有权限:

#假设在node4登录hbase客户端
[root@node4 ~]# klist
klist: No credentials cache found (filename: /tmp/krb5cc_0)

[root@node4 ~]# hbase shell
hbase(main):001:0> list
TABLE                                                                                                                                            

ERROR: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)

如果想要正常读取HBase中的数据需要进行客户端认证,这里以zhangsan用户为例,进行Kerberos主体认证,认证后操作HBase,操作如下:

#进行用户认证
[root@node4 ~]# kinit zhangsan
Password for [email protected]: 123456
[root@node4 ~]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: [email protected]

#登录hbase shell
[root@node4 ~]# hbase shell
                                                                                                                           
hbase(main):001:0> create 'person','cf1','cf2'
Created table person

hbase(main):002:0> list
person

hbase(main):003:0> put 'person','row1','cf1:id','1'
                                                                                                                        
hbase(main):004:0> put 'person','row1','cf1:name','zs'
                                                                                                                          
hbase(main):005:0> put 'person','row1','cf1:age',18
                                                                                                                       
hbase(main):006:0> scan 'person'
ROW                                   COLUMN+CELL                                                                                                
 row1                                 column=cf1:age, timestamp=1684317908851, value=18                                                          
 row1                                 column=cf1:id, timestamp=1684317879179, value=1                                                            
 row1                                 column=cf1:name, timestamp=1684317894358, value=zs 

​​​​​​​5. Java API操作HBase

java API操作经过Kerberos安全认证的HBase按照如下步骤实现即可。

1) 准备krb5.conf文件

将node1 kerberos服务端/etc/krb5.conf文件存放在IDEA项目中的resources资源目录中或者本地Window固定的某个目录中,用于编写代码时指定访问Kerberos的Realm。

2) 生成用户keytab文件

在kerberos服务端node1节点上,执行如下命令,对zhangsan用户主体生成keytab密钥文件。

#在node1 kerberos服务端执行
[root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /root/zhangsan.keytab [email protected]"

以上命令执行之后,在/root目录下会生成zhangsan.keytab文件,将该文件复制到IDEA项目中的resources资源目录中或者本地window固定的某个目录中,该文件用于编写代码时认证kerberos。

3) 准备访问HDFS需要的资源文件

将HDFS中的core-site.xml 、hdfs-site.xml 、yarn-site.xml、hbase-site.xml文件上传到项目resources资源目录中。

4) 编写代码操作****HBase

项目代码中引入如下maven依赖:

<!-- hbase依赖包 -->
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-server</artifactId>
  <version>2.2.6</version>
</dependency>
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-client</artifactId>
  <version>2.2.6</version>
</dependency>

编写Java代码操作Kerberos认证的HBase代码如下:

/**
 * Java API 操作Kerberos 认证的HBase
 */
public class OperateAuthHBase {

    private static Configuration conf;
    private static Connection connection;
    private static Admin admin;

    /**
     * 初始化:进行kerberos认证并设置hbase configuration
     */
    static {
        try {
            String principal = "[email protected]";
            String keytabPath = "D:\\idea_space\\KerberosAuth\\KerberosAuthHBase\\src\\main\\resources\\zhangsan.keytab";

            conf = HBaseConfiguration.create();
            // 设置Kerberos 认证
            System.setProperty("java.security.krb5.conf", "D:\\idea_space\\KerberosAuth\\KerberosAuthHBase\\src\\main\\resources\\krb5.conf");
            conf.set("hadoop.security.authentication", "kerberos");
            UserGroupInformation.setConfiguration(conf);
            UserGroupInformation.loginUserFromKeytab(principal, keytabPath);

            //设置zookeeper集群地址及端口号
            conf.set("hbase.zookeeper.quorum","node3,node4,node5");
            conf.set("hbase.zookeeper.property.clientPort","2181");

            //创建连接
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) throws IOException {
        try{
            //获取Admin对象
            admin = connection.getAdmin();

            //创建表
            createHBaseTable("tbl1", "cf1");
            System.out.println("===========================");

            //查看Hbase中所有表
            listHBaseTables();
            System.out.println("===========================");

            //向表中插入数据
            putTableData("tbl1","row1", "cf1", "id", "100");
            putTableData("tbl1","row1", "cf1", "name", "zs");
            putTableData("tbl1","row1", "cf1", "age", "18");

            putTableData("tbl1","row2", "cf1", "id", "200");
            putTableData("tbl1","row2", "cf1", "name", "ls");
            putTableData("tbl1","row2", "cf1", "age", "19");
            System.out.println("===========================");

            //查询表中的数据
            scanTableData("tbl1");
            System.out.println("===========================");

            //查询指定row数据
            getRowData("tbl1", "row1","cf1","name");
            System.out.println("===========================");

            //删除指定row数据
            deleteRowData("tbl1", "row1","cf1","name");
            System.out.println("===========================");

            //查询表中的数据
            scanTableData("tbl1");
            System.out.println("===========================");

            //删除表
            deleteHBaseTable("tbl1");

            //查看Hbase中所有表
            listHBaseTables();
            System.out.println("===========================");
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(admin != null) {
                admin.close();
            }
            if(connection != null) {
                connection.close();
            }
        }

    }

    //判断表是否存在
    private static boolean isTableExist(String tableName) {
        try {
            return admin.tableExists(TableName.valueOf(tableName));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }

    private static void createHBaseTable(String tableName, String... cfs) {
        if(isTableExist(tableName)) {
            System.out.println("表已经存在!");
            return;
        }
        if(cfs == null || cfs.length == 0){
            System.out.println("列族不能为空!");
            return;
        }
        TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
        List<ColumnFamilyDescriptor> cFDBList = new ArrayList<>();
        for (String cf : cfs) {
            cFDBList.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build());
        }
        tableDescriptorBuilder.setColumnFamilies(cFDBList);
        try {
            admin.createTable(tableDescriptorBuilder.build());
            System.out.println("创建表" + tableName + "成功!");
        } catch (IOException e) {
            System.out.println("创建失败!");
            e.printStackTrace();
        }

    }

    //查看Hbase中所有表
    private static void listHBaseTables() {
        try {
            TableName[] tableNames = admin.listTableNames();
            System.out.println("打印所有命名空间表名:");
            for (TableName tableName : tableNames) {
                System.out.println(tableName);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //向表中插入数据
    private static void putTableData(String tableName, String rowKey, String cf, String column, String value) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            Put p = new Put(Bytes.toBytes(rowKey));
            p.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
            table.put(p);
            System.out.println("插入数据成功!");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeTable(table);
        }

    }

    //查询表中的数据
    private static void scanTableData(String tableName) {
        if(tableName == null || tableName.length() == 0) {
            System.out.println("请正确输入表名!");
            return;
        }
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();

            ResultScanner resultScanner = table.getScanner(scan);
            Iterator<Result> iterator = resultScanner.iterator();
            while(iterator.hasNext()) {
                Result result = iterator.next();
                Cell[] cells = result.rawCells();
                for(Cell cell : cells) {
                    //得到rowkey
                    System.out.println("rowkey: " + Bytes.toString(CellUtil.cloneRow(cell)));
                    //得到列族
                    System.out.println("列族: " + Bytes.toString(CellUtil.cloneFamily(cell)));
                    System.out.println("列: " + Bytes.toString(CellUtil.cloneQualifier(cell)));
                    System.out.println("值: " + Bytes.toString(CellUtil.cloneValue(cell)));
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeTable(table);
        }
    }

    //查询指定row数据
    private static void getRowData(String tableName, String rowkey,String colFamily, String col) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            Get g = new Get(Bytes.toBytes(rowkey));
            // 获取指定列族数据
            if(col == null && colFamily != null) {
                g.addFamily(Bytes.toBytes(colFamily));
            } else if(col != null && colFamily != null) {
                // 获取指定列数据
                g.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
            }
            Result result = table.get(g);
            System.out.println("查询指定rowkey 数据如下");

            for(Cell cell : result.rawCells()) {
                //得到rowkey
                System.out.println("rowkey: " + Bytes.toString(CellUtil.cloneRow(cell)));
                //得到列族
                System.out.println("列族: " + Bytes.toString(CellUtil.cloneFamily(cell)));
                System.out.println("列: " + Bytes.toString(CellUtil.cloneQualifier(cell)));
                System.out.println("值: " + Bytes.toString(CellUtil.cloneValue(cell)));
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeTable(table);
        }

    }

    //删除指定row数据
    private static void deleteRowData(String tableName , String rowKey, String colFamily, String ... cols) {
        if(tableName == null || tableName.length() == 0) {
            System.out.println("请正确输入表名!");
            return;
        }
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(tableName));
            Delete del = new Delete(Bytes.toBytes(rowKey));
            // 删除指定列族
            del.addFamily(Bytes.toBytes(colFamily));
            // 删除指定列
            for (String col : cols) {
                del.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
            }
            table.delete(del);
            System.out.println("删除数据成功!");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeTable(table);
        }

    }

    //删除表
    private static void deleteHBaseTable(String tableName) {
        try {
            if(!isTableExist(tableName)) {
                System.out.println("要删除的表不存在!");
                return;
            }
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //关闭表
    private static void closeTable(Table table) {
        if(table != null) {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

欢迎点赞、评论、收藏,关注IT贫道,获取IT技术知识!

标签: hbase 安全 大数据

本文转载自: https://blog.csdn.net/qq_32020645/article/details/131344728
版权归原作者 IT贫道 所有, 如有侵权,请联系我们删除。

“Kerberos安全认证-连载11-HBase Kerberos安全配置及访问”的评论:

还没有评论