点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(已更完)
- ClickHouse(已更完)
- Kudu(正在更新…)
章节内容
上节我们完成了如下的内容:
- Kudu Java API
- 增删改查 编写案例测试
实现思路
将数据从 Flink 下沉到 Kudu 的基本思路如下:
- 环境准备:确保 Flink 和 Kudu 环境正常运行,并配置好相关依赖。
- 创建 Kudu 表:在 Kudu 中定义要存储的数据表,包括主键和列类型。
- 数据流设计:使用 Flink 的 DataStream API 读取输入数据流,进行必要的数据处理和转换。
- 写入 Kudu:通过 Kudu 的连接器将处理后的数据写入 Kudu 表。需要配置 Kudu 客户端和表的相关信息。
- 执行作业:启动 Flink 作业,实时将数据流中的数据写入 Kudu,便于后续查询和分析。
添加依赖
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-test</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><flink.version>1.11.1</flink.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.kudu</groupId><artifactId>kudu-client</artifactId><version>1.17.0</version></dependency></dependencies></project>
数据源
newUserInfo("001","Jack",18),newUserInfo("002","Rose",20),newUserInfo("003","Cris",22),newUserInfo("004","Lily",19),newUserInfo("005","Lucy",21),newUserInfo("006","Json",24),
自定义下沉器
packageicu.wzk.kudu;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importorg.apache.kudu.Schema;importorg.apache.kudu.Type;importorg.apache.kudu.client.*;importorg.apache.log4j.Logger;importjava.io.ByteArrayOutputStream;importjava.io.ObjectOutputStream;importjava.util.Map;publicclassMyFlinkSinkToKuduextendsRichSinkFunction<Map<String,Object>>{privatefinalstaticLogger logger =Logger.getLogger("MyFlinkSinkToKudu");privateKuduClient kuduClient;privateKuduTable kuduTable;privateString kuduMasterAddr;privateString tableName;privateSchema schema;privateKuduSession kuduSession;privateByteArrayOutputStream out;privateObjectOutputStream os;publicMyFlinkSinkToKudu(String kuduMasterAddr,String tableName){this.kuduMasterAddr = kuduMasterAddr;this.tableName = tableName;}@Overridepublicvoidopen(Configuration parameters)throwsException{
out =newByteArrayOutputStream();
os =newObjectOutputStream(out);
kuduClient =newKuduClient.KuduClientBuilder(kuduMasterAddr).build();
kuduTable = kuduClient.openTable(tableName);
schema = kuduTable.getSchema();
kuduSession = kuduClient.newSession();
kuduSession.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);}@Overridepublicvoidinvoke(Map<String,Object> map,Context context)throwsException{if(null== map){return;}try{int columnCount = schema.getColumnCount();Insert insert = kuduTable.newInsert();PartialRow row = insert.getRow();for(int i =0; i < columnCount; i ++){Object value = map.get(schema.getColumnByIndex(i).getName());insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value);OperationResponse response = kuduSession.apply(insert);if(null!= response){
logger.error(response.getRowError().toString());}}}catch(Exception e){
logger.error(e);}}@Overridepublicvoidclose()throwsException{try{
kuduSession.close();
kuduClient.close();
os.close();
out.close();}catch(Exception e){
logger.error(e);}}privatevoidinsertData(PartialRow row,Type type,String columnName,Object value){try{switch(type){case STRING:
row.addString(columnName, value.toString());return;case INT32:
row.addInt(columnName,Integer.valueOf(value.toString()));return;case INT64:
row.addLong(columnName,Long.valueOf(value.toString()));return;case DOUBLE:
row.addDouble(columnName,Double.valueOf(value.toString()));return;case BOOL:
row.addBoolean(columnName,Boolean.valueOf(value.toString()));return;case BINARY:
os.writeObject(value);
row.addBinary(columnName, out.toByteArray());return;case FLOAT:
row.addFloat(columnName,Float.valueOf(value.toString()));default:thrownewUnsupportedOperationException("Unknown Type: "+ type);}}catch(Exception e){
logger.error("插入数据异常: "+ e);}}}
编写实体
packageicu.wzk.kudu;publicclassUserInfo{privateString id;privateString name;privateInteger age;publicUserInfo(String id,String name,Integer age){this.id = id;this.name = name;this.age = age;}publicStringgetId(){return id;}publicvoidsetId(String id){this.id = id;}publicStringgetName(){return name;}publicvoidsetName(String name){this.name = name;}publicIntegergetAge(){return age;}publicvoidsetAge(Integer age){this.age = age;}}
执行建表
packageicu.wzk.kudu;importorg.apache.kudu.ColumnSchema;importorg.apache.kudu.Schema;importorg.apache.kudu.Type;importorg.apache.kudu.client.CreateTableOptions;importorg.apache.kudu.client.KuduClient;importorg.apache.kudu.client.KuduException;importjava.util.ArrayList;importjava.util.List;publicclassKuduCreateTable{publicstaticvoidmain(String[] args)throwsKuduException{String masterAddress ="localhost:7051,localhost:7151,localhost:7251";KuduClient.KuduClientBuilder kuduClientBuilder =newKuduClient.KuduClientBuilder(masterAddress);KuduClient kuduClient = kuduClientBuilder.build();String tableName ="user";List<ColumnSchema> columnSchemas =newArrayList<>();ColumnSchema id =newColumnSchema
.ColumnSchemaBuilder("id",Type.INT32).key(true).build();
columnSchemas.add(id);ColumnSchema name =newColumnSchema
.ColumnSchemaBuilder("name",Type.STRING).key(false).build();
columnSchemas.add(name);ColumnSchema age =newColumnSchema
.ColumnSchemaBuilder("age",Type.INT32).key(false).build();
columnSchemas.add(age);Schema schema =newSchema(columnSchemas);CreateTableOptions options =newCreateTableOptions();// 副本数量为1
options.setNumReplicas(1);List<String> colrule =newArrayList<>();
colrule.add("id");
options.addHashPartitions(colrule,3);
kuduClient.createTable(tableName, schema, options);
kuduClient.close();}}
主逻辑代码
packageicu.wzk.kudu;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importjava.util.HashMap;importjava.util.Map;importjava.util.stream.Stream;publicclassSinkToKuduTest{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<UserInfo> dataSource = env.fromElements(newUserInfo("001","Jack",18),newUserInfo("002","Rose",20),newUserInfo("003","Cris",22),newUserInfo("004","Lily",19),newUserInfo("005","Lucy",21),newUserInfo("006","Json",24));SingleOutputStreamOperator<Map<String,Object>> mapSource = dataSource
.map(newMapFunction<UserInfo,Map<String,Object>>(){@OverridepublicMap<String,Object>map(UserInfo value)throwsException{Map<String,Object> map =newHashMap<>();
map.put("id", value.getId());
map.put("name", value.getName());
map.put("age", value.getAge());return map;}});String kuduMasterAddr ="localhost:7051,localhost:7151,localhost:7251";String tableInfo ="user";
mapSource.addSink(newMyFlinkSinkToKudu(kuduMasterAddr, tableInfo));
env.execute("SinkToKuduTest");}}
解释分析
环境设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();:初始化 Flink 的执行环境,这是 Flink 应用的入口。
数据源创建
DataStreamSource dataSource = env.fromElements(…):创建了一个包含多个 UserInfo 对象的数据源,模拟了一个输入流。
数据转换
SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource.map(…):使用 map 函数将 UserInfo 对象转换为 Map<String, Object>,便于后续处理和写入 Kudu。每个 UserInfo 的属性都被放入一个 HashMap 中。
Kudu 配置信息
String kuduMasterAddr = “localhost:7051,localhost:7151,localhost:7251”; 和 String tableInfo = “user”;:定义 Kudu 的主节点地址和目标表的信息。
数据下沉
mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));:将转换后的数据流添加到 Kudu 的自定义 Sink 中。MyFlinkSinkToKudu 类应该实现了将数据写入 Kudu 的逻辑。
执行作业
env.execute(“SinkToKuduTest”);:启动 Flink 作业,执行整个数据流处理流程。
测试运行
- 先运行建表
- 再运行主逻辑
我们建表之后,确认user表存在。然后我们运行Flink程序,将数据写入Kudu。
确认有表后,执行 Flink 程序:
注意事项
- 并发性:根据 Kudu 集群的规模和配置,可以调整 Flink 作业的并发性,以提高写入性能。
- 批量写入:Kudu 支持批量插入,可以通过适当配置 Flink 的 sink 来提高性能。
- 故障处理:确保在作业中处理异常和重试逻辑,以确保数据不会丢失。
- 监控与调试:使用 Flink 的监控工具和 Kudu 的工具(如 Kudu UI)来监控数据流和性能。
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。