0


大数据开源框架之HBase编程实践

HBase的安装部署请看:

(30条消息) 大数据开源框架环境搭建(五)——Hbase完全分布式集群的安装部署_木子一个Lee的博客-CSDN博客

任务1:用HBase提供的HBase Shell命令实现以下指定功能:

1.列出HBase所有的表的相关信息

初始状态:

新建一个Person表,再查看:

属性有性别sex,年龄age,姓名name

2.在终端打印出指定的表的所有记录数据;

Person表

添加数据后再查看:

3.向已经创建好的表添加和删除指定的列族;

添加姓名zhang,行名555:

添加性别男,年龄18,行名555:

查看行名为555的信息:

删除555的sex列:

查看信息,发现sex没了

4.清空指定的表的所有记录数据;

利用truncate命令删除Person信息

5.统计表的行数。

第四步清除数据后行数为0:

添加数据后,再统计:

添加行p1,姓名为lee,年龄18

添加行p2,姓名为wang,年龄19

统计后,行数为2:

任务2:转换为适合于HBase存储的表并插入数据

实验要求:

现有以下关系型数据库中的表和数据,要求将其转换为适合于HBase存储的表并插入数据:

学生表(Student)

学号(S_No)

姓名(S_Name)

性别(S_Sex)

年龄(S_Age)

2015001

Zhangsan

male

23

2015002

Mary

female

22

2015003

Lisi

male

24

课程表(Course)

课程号(C_No)

课程名(C_Name)

学分(C_Credit)

123001

Math

2.0

123002

Computer Science

5.0

123003

English

3.0

选课表(SC)

学号(SC_Sno)

课程号(SC_Cno)

成绩(SC_Score)

2015001

123001

86

2015001

123003

69

2015002

123002

77

2015002

123003

99

2015003

123001

98

2015003

123002

95

实验步骤:

Student表:

创建:

插入数据:

查看:

Course表:

创建:

插入数据:

查看:

SC表:

创建:

插入数据:

查看:

任务3:使用Java API编程完成指定功能

实验要求:

基于任务2,使用Java API编程完成以下指定功能:

① createTable(String tableName, String[] fields)

创建表,参数tableName为表的名称,字符串数组fields为存储记录各个域名称的数组。要求当HBase已经存在名为tableName的表的时候,先删除原有的表,然后再创建新的表。

② addRecord(String tableName, String row, String[] fields, String[] values)

向表tableName、行row(用S_Name表示)和字符串数组files指定的单元格中添加对应的数据values。其中fields中每个元素如果对应的列族下还有相应的列限定符的话,用“columnFamily:column”表示。例如,同时向“Math”、“Computer Science”、“English”三列添加成绩时,字符串数组fields为{“Score:Math”,”Score;Computer Science”,”Score:English”},数组values存储这三门课的成绩。

③ scanColumn(String tableName, String column)

浏览表tableName某一列的数据,如果某一行记录中该列数据不存在,则返回null。要求当参数column为某一列族名称时,如果其中有若干个列限定符,则要列出每个列限定符代表的列的数据;当参数column为某一列具体名称(例如“Score:Math”)时,只需要列出该列的数据。

④ modifyData(String tableName, String row, String column)

修改表tableName,行row(可以用学生姓名S_Name表示),列column指定的单元格的数据。

⑤ deleteRow(String tableName, String row)

删除表tableName中row指定的行的记录。

基本思路:

在编写程序之前需要导jar包

首先编写建立连接函数,创建一个HBase的配置对象,进行配置hbase存储路径和zookeeper服务,然后使用ConnectionFactory建立连接:

再编写关闭连接代码,包括admin关闭和连接关闭:

对于createTable(String tableName, String[] fields),先调用init()开启连接,然后把tableName参数转换为TableName类型,使用tableExists判断表是否已存在,如果已存在,那就先调用disableTable(tablename)使其不可用,然后调用deleteTable(tablename)删除原来的表,最后createTable()创建表,最后关闭连接:

对于addRecord(String tableName,String row,String[] fields,String[] values),主要思路是使用split(“:”)分隔列族和列限定符,建立一个Table对象,使用put添加数据:

对于scanColumn(String tableName,String column),首先先获取表对象,再利用contains(“:”)判断输入的是具体的列(如Score:Math)还是列族(Score),然后创建Result对象获取数据。如果数据为空,则输出null,否则进行格式化输出showCell():

格式化输出:

对于modifyData(String tableName,String row,String column,String val),首先创建一个Table,然后用split(“:”)分隔列族和列限定符,使用put函数修改数据:

对于deleteRow(String tableName,String row),首先创建一个Table对象和Delete对象,然后调用函数delete(row.getBytes)进行删除

部分代码及运行结果:

(1) createTable(String tableName, String[] fields)

代码:

public static void createTable(String tableName,String[] fields) throws IOException {

    init();
    TableName tablename = TableName.valueOf(tableName);

    if(admin.tableExists(tablename)){
        System.out.println("该表已存在,删除后重新创建");
        admin.disableTable(tablename);
        admin.deleteTable(tablename);//删除原来的表
    }

    TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tablename);
    for(String str : fields){
        tableDescriptor.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build());
        admin.createTable(tableDescriptor.build());
    }
    close();
    System.out.println("创建成功!");
}

运行结果:

第一次运行前:

第一次运行后:

第二次运行:

(2) addRecord(String tableName, String row, String[] fields, String[] values)

代码:

public static void addRecord(String tableName,String row,String[] fields,String[] values) throws IOException {
    init();
    Table table = connection.getTable(TableName.valueOf(tableName));
    for(int i = 0;i != fields.length;i++){
        Put put = new Put(row.getBytes());
        String[] cols = fields[i].split(":");
        put.addColumn(cols[0].getBytes(), cols[1].getBytes(), values[i].getBytes());
        table.put(put);
    }
    table.close();
    close();
    System.out.println("添加成功!");
}

运行结果:

(3) scanColumn(String tableName, String column)

代码:

public static void scanColumn(String tableName,String column)throws  IOException{
      init();
      Table table = connection.getTable(TableName.valueOf(tableName));
      Scan scan = new Scan();
      if (column.contains(":")) {
       String[] cols = column.split(":");
       scan.addColumn(cols[0].getBytes(),cols[1].getBytes());
}
      else
       scan.addFamily(Bytes.toBytes(column));
      ResultScanner scanner = table.getScanner(scan);
      Result result = scanner.next();
      if (result==null) {
   System.out.println("null");
}
      for (; result != null; result = scanner.next()){
          showCell(result);
      }
      table.close();
      close();
  }
  //格式化输出
  public static void showCell(Result result){
      Cell[] cells = result.rawCells();
      for(Cell cell:cells){
          System.out.println("行名:"+new String(Bytes.toString(cell.getRowArray(),cell.getRowOffset(), cell.getRowLength()))+" ");
          System.out.println("时间戳:"+cell.getTimestamp()+" ");
          System.out.println("列族:"+new String(Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(), cell.getFamilyLength()))+" ");
          System.out.println("列限定符:"+new String(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(), cell.getQualifierLength()))+" ");
          System.out.println("值:"+new String(Bytes.toString(cell.getValueArray(),cell.getValueOffset(), cell.getValueLength()))+" ");           
      }
  }

运行结果:

查询整个列族Score:

查询某一列(这里为Score:Math)

执行完deleteRow(String tableName, String row)后,再查看:

(4) modifyData(String tableName, String row, String column)

代码:

public static void modifyData(String tableName,String row,String column,String val)throws IOException{
    init();
    Table table = connection.getTable(TableName.valueOf(tableName));
    Put put = new Put(row.getBytes());
    String[] cols = column.split(":");
    put.addColumn(cols[0].getBytes(),cols[1].getBytes(),val.getBytes());
    table.put(put);
    table.close();
    close();
    System.out.println("修改成功!");
}

运行结果:

把Lee的英语成绩改为100:

修改前:

修改后:

(5) deleteRow(String tableName, String row)

代码:

public static void deleteRow(String tableName,String row)throws IOException{
    init();
    Table table = connection.getTable(TableName.valueOf(tableName));
    Delete delete = new Delete(row.getBytes());        
    table.delete(delete);
    table.close();
    close();
    System.out.println("删除成功!");
}

运行结果:

完整代码:

package lab2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.thrift2.generated.THBaseService.AsyncProcessor.closeScanner;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class HbaseLab {
    public static Configuration configuration;
    public static Connection connection;
    public static Admin admin;

    public static void main(String[] args)throws IOException{
       //创建表
       //String[] fields1= {"Score"};
       //createTable("StuHbase", fields1);
       //添加数据
       //String[] fields2= {"Score:Math","Score:Computer Science","Score:English"};
       //String[] values= {"90","95","99"};
       //addRecord("StuHbase","Lee",fields2,values);
       //查询
        //scanColumn("StuHbase","Score");//查询整个列族
        scanColumn("StuHbase","Score:Math");//查询某一列
       //修改数据
       //modifyData("StuHbase", "Lee", "Score:English", "100");
       //删除数据
       //deleteRow("StuHbase","Lee");
    }
    //建立连接
    public static void init(){
        configuration  = HBaseConfiguration.create();
        configuration.set("hbase.rootdir","hdfs://master:9000/hbase");
        configuration.set("hbase.zookeeper.quorum","master,slave1,slave2");
        try{
            connection = ConnectionFactory.createConnection(configuration);
            admin = connection.getAdmin();
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    //关闭连接
    public static void close(){
        try{
            if(admin != null){
                admin.close();
            }
            if(null != connection){
                connection.close();
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }

   //创建表
    public static void createTable(String tableName,String[] fields) throws IOException {

        init();
        TableName tablename = TableName.valueOf(tableName);

        if(admin.tableExists(tablename)){
            System.out.println("该表已存在,删除后重新创建");
            admin.disableTable(tablename);
            admin.deleteTable(tablename);//删除原来的表
        }

        TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tablename);
        for(String str : fields){
            tableDescriptor.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build());
            admin.createTable(tableDescriptor.build());
        }
        close();
        System.out.println("创建成功!");
    }

    //添加数据
    public static void addRecord(String tableName,String row,String[] fields,String[] values) throws IOException {
        init();
        Table table = connection.getTable(TableName.valueOf(tableName));
        for(int i = 0;i != fields.length;i++){
            Put put = new Put(row.getBytes());
            String[] cols = fields[i].split(":");
            put.addColumn(cols[0].getBytes(), cols[1].getBytes(), values[i].getBytes());
            table.put(put);
        }
        table.close();
        close();
        System.out.println("添加成功!");
    }
    
    //浏览
    public static void scanColumn(String tableName,String column)throws  IOException{
        init();
        Table table = connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        if (column.contains(":")) {
           String[] cols = column.split(":");
           scan.addColumn(cols[0].getBytes(),cols[1].getBytes());
      }
        else
           scan.addFamily(Bytes.toBytes(column));
        ResultScanner scanner = table.getScanner(scan);
        Result result = scanner.next();
        if (result==null) {
         System.out.println("null");
      }
        for (; result != null; result = scanner.next()){
            showCell(result);
        }
        table.close();
        close();
    }
    //格式化输出
    public static void showCell(Result result){
        Cell[] cells = result.rawCells();
        for(Cell cell:cells){
            System.out.println("行名:"+new String(Bytes.toString(cell.getRowArray(),cell.getRowOffset(), cell.getRowLength()))+" ");
            System.out.println("时间戳:"+cell.getTimestamp()+" ");
            System.out.println("列族:"+new String(Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(), cell.getFamilyLength()))+" ");
            System.out.println("列限定符:"+new String(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(), cell.getQualifierLength()))+" ");
            System.out.println("值:"+new String(Bytes.toString(cell.getValueArray(),cell.getValueOffset(), cell.getValueLength()))+" ");           
        }
    }
    //修改数据
    public static void modifyData(String tableName,String row,String column,String val)throws IOException{
        init();
        Table table = connection.getTable(TableName.valueOf(tableName));
        Put put = new Put(row.getBytes());
        String[] cols = column.split(":");
        put.addColumn(cols[0].getBytes(),cols[1].getBytes(),val.getBytes());
        table.put(put);
        table.close();
        close();
        System.out.println("修改成功!");
    }
    //删除数据
    public static void deleteRow(String tableName,String row)throws IOException{
        init();
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(row.getBytes());        
        table.delete(delete);
        table.close();
        close();
        System.out.println("删除成功!");
    }
}
标签: hbase 大数据 hadoop

本文转载自: https://blog.csdn.net/qq_51246603/article/details/128519161
版权归原作者 木子一个Lee 所有, 如有侵权,请联系我们删除。

“大数据开源框架之HBase编程实践”的评论:

还没有评论