点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(已更完)
- ClickHouse(已更完)
- Kudu(正在更新…)
章节内容
上节我们完成了如下的内容:
- Apache Kudu 的 Dockerfile
- Dockerfile 详解
- Kudu 启动访问
新建工程
由于重复了太多次,这里直接跳过了。
导入依赖
<dependency><groupId>org.apache.kudu</groupId><artifactId>kudu-client</artifactId><version>1.4.0</version></dependency>
创建新表
- 必须指定表连接到的Master节点主机名
- 必须定义Schema
- 必须指定副本数量、分区策略、数量
编写代码
packageicu.wzk.kudu;importorg.apache.kudu.ColumnSchema;importorg.apache.kudu.Schema;importorg.apache.kudu.Type;importorg.apache.kudu.client.CreateTableOptions;importorg.apache.kudu.client.KuduClient;importorg.apache.kudu.client.KuduException;importjava.util.ArrayList;importjava.util.List;publicclassKuduCreateTable{publicstaticvoidmain(String[] args)throwsKuduException{String masterAddress ="localhost:7051,localhost:7151,localhost:7251";KuduClient.KuduClientBuilder kuduClientBuilder =newKuduClient.KuduClientBuilder(masterAddress);KuduClient kuduClient = kuduClientBuilder.build();String tableName ="student";List<ColumnSchema> columnSchemas =newArrayList<>();ColumnSchema id =newColumnSchema
.ColumnSchemaBuilder("id",Type.INT32).key(true).build();
columnSchemas.add(id);ColumnSchema name =newColumnSchema
.ColumnSchemaBuilder("name",Type.STRING).key(false).build();
columnSchemas.add(name);Schema schema =newSchema(columnSchemas);CreateTableOptions options =newCreateTableOptions();// 副本数量为1
options.setNumReplicas(1);List<String> colrule =newArrayList<>();
colrule.add("id");
options.addHashPartitions(colrule,3);
kuduClient.createTable(tableName, schema, options);
kuduClient.close();}}
测试运行
控制台未输出内容
运行结果如下图所示:
查看Kudu
我们查看Kudu的Tables,可以看到刚才创建的表如下:
删除表
编写代码
packageicu.wzk.kudu;importorg.apache.kudu.client.KuduClient;importorg.apache.kudu.client.KuduException;publicclassKuduDeleteTable{publicstaticvoidmain(String[] args)throwsKuduException{String masterAddress ="localhost:7051,localhost:7151,localhost:7251,";KuduClient client =newKuduClient.KuduClientBuilder(masterAddress).defaultAdminOperationTimeoutMs(5000).build();
client.deleteTable("student");
client.close();}}
测试运行
控制台没有输出内容,这里运行截图如下:
查看Kudu
查看Kudu服务的 Table 页,里边的数据表已经删除了。
插入数据
- 获取客户端
- 打开一张表
- 创建会话
- 设置刷新模式
- 获取插入实例
- 声明带插入的数据
- 刷入数据
- 应用插入实例
- 关闭会话
创建新表
我们运行刚才的创建新表代码,把student表先生成出来,具体运行这里跳过了。
编写代码
packageicu.wzk.kudu;importorg.apache.kudu.client.*;publicclassKuduInsert{publicstaticvoidmain(String[] args)throwsKuduException{String masterAddr ="localhost:7051,localhost:7151,localhost:7251";KuduClient client =newKuduClient
.KuduClientBuilder(masterAddr).defaultAdminOperationTimeoutMs(5000).build();KuduTable stuTable = client.openTable("student");KuduSession kuduSession = client.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);Insert insert = stuTable.newInsert();
insert.getRow().addInt("id",1);
insert.getRow().addString("name","wzk");
kuduSession.flush();
kuduSession.apply(insert);
kuduSession.close();
client.close();}}
在代码中,有一个叫:kuduSession.setFlushMode:
- AUTO_FLUSH_SYNC(默认):意思是调用KuduSession apply方法后,客户端会在当前刷新到服务器后再返回,这种情况不能够批量插入数据,调用 flush 方法不会起作用,应为此时缓冲区已经被刷新到了服务器。
- AUTO_FLUSH_BACKGROUD:意思是调用apply方法后,客户端会立即返回,但是写入将在后台发送,可能与来自同一会话的其他写入一起进行批处理。如果没有足够的缓冲空间,KuduSession apply会阻塞,缓冲空间不可用。因为写入操作是在后台进行的,因此任何一个错误都将存储在一个会话本地缓冲区中。注意:这个模式可能会导致插入是乱序的,这是因为在这种模式下,多个写操作可以并发的发送到服务器。且这是一个Kudu的BUG,详细请看:https://issues.apache.org/jira/browse/KUDU-1767https://issues.apache.org/jira/browse/KUDU-1767
- MANUAL_FLUSH:调用apply后,会非常快的返回,但是写操作不会发送,直到用户使用flush函数,如果缓冲区超过了限制大小,apply就会返回一个错误。
测试运行
控制台无输出内容,运行的截图如下图所示:
查询数据
编写代码
Kudu的查询数据用Scanner
packageicu.wzk.kudu;importorg.apache.kudu.client.*;publicclassKuduSelect{publicstaticvoidmain(String[] args)throwsKuduException{String masterAddr ="localhost:7051,localhost:7151,localhost:7251";KuduClient client =newKuduClient
.KuduClientBuilder(masterAddr).build();KuduTable kuduTable = client.openTable("user");KuduScanner kuduScanner = client.newScannerBuilder(kuduTable).build();while(kuduScanner.hasMoreRows()){for(RowResult result : kuduScanner.nextRows()){int id = result.getInt("id");String name = result.getString("name");int age = result.getInt("age");System.out.println("id: "+ id +", name: "+ name +", age: "+ age);}}
client.close();}}
测试运行
id: 1, name: wzk
Process finished with exit code 0
运行结果如下图所示:
更改数据
编写代码
packageicu.wzk.kudu;importorg.apache.kudu.client.*;publicclassKuduUpdate{publicstaticvoidmain(String[] args)throwsKuduException{String masterAddress ="localhost:7051,localhost:7151,localhost:7251";KuduClient client =newKuduClient
.KuduClientBuilder(masterAddress).build();KuduTable stuTable = client.openTable("student");KuduSession kuduSession = client.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);Update update = stuTable.newUpdate();
update.getRow().addInt("id",1);
update.getRow().addString("name","wzk_icu");
kuduSession.apply(update);
kuduSession.close();
client.close();}}
删除指定行
编写代码
packageicu.wzk.kudu;importorg.apache.kudu.client.*;publicclassKuduDelete{publicstaticvoidmain(String[] args)throwsKuduException{String masterAddress ="localhost:7051,localhost:7151,localhost:7251";KuduClient client =newKuduClient
.KuduClientBuilder(masterAddress).build();KuduSession kuduSession = client.newSession();KuduTable stuTable = client.openTable("student");
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);Delete delete = stuTable.newDelete();PartialRow row = delete.getRow();
row.addInt("id",1);
kuduSession.flush();
kuduSession.apply(delete);
kuduSession.close();
client.close();}}
测试运行
控制台没有输出任何内容,运行过程截图如下:
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。