0


大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(正在更新…)

章节内容

上节我们完成了如下的内容:

  • Kudu Java API
  • 增删改查 编写案例测试

在这里插入图片描述

实现思路

将数据从 Flink 下沉到 Kudu 的基本思路如下:

  • 环境准备:确保 Flink 和 Kudu 环境正常运行,并配置好相关依赖。
  • 创建 Kudu 表:在 Kudu 中定义要存储的数据表,包括主键和列类型。
  • 数据流设计:使用 Flink 的 DataStream API 读取输入数据流,进行必要的数据处理和转换。
  • 写入 Kudu:通过 Kudu 的连接器将处理后的数据写入 Kudu 表。需要配置 Kudu 客户端和表的相关信息。
  • 执行作业:启动 Flink 作业,实时将数据流中的数据写入 Kudu,便于后续查询和分析。

添加依赖

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-test</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><flink.version>1.11.1</flink.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.kudu</groupId><artifactId>kudu-client</artifactId><version>1.17.0</version></dependency></dependencies></project>

数据源

newUserInfo("001","Jack",18),newUserInfo("002","Rose",20),newUserInfo("003","Cris",22),newUserInfo("004","Lily",19),newUserInfo("005","Lucy",21),newUserInfo("006","Json",24),

自定义下沉器

packageicu.wzk.kudu;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importorg.apache.kudu.Schema;importorg.apache.kudu.Type;importorg.apache.kudu.client.*;importorg.apache.log4j.Logger;importjava.io.ByteArrayOutputStream;importjava.io.ObjectOutputStream;importjava.util.Map;publicclassMyFlinkSinkToKuduextendsRichSinkFunction<Map<String,Object>>{privatefinalstaticLogger logger =Logger.getLogger("MyFlinkSinkToKudu");privateKuduClient kuduClient;privateKuduTable kuduTable;privateString kuduMasterAddr;privateString tableName;privateSchema schema;privateKuduSession kuduSession;privateByteArrayOutputStream out;privateObjectOutputStream os;publicMyFlinkSinkToKudu(String kuduMasterAddr,String tableName){this.kuduMasterAddr = kuduMasterAddr;this.tableName = tableName;}@Overridepublicvoidopen(Configuration parameters)throwsException{
        out =newByteArrayOutputStream();
        os =newObjectOutputStream(out);
        kuduClient =newKuduClient.KuduClientBuilder(kuduMasterAddr).build();
        kuduTable = kuduClient.openTable(tableName);
        schema = kuduTable.getSchema();
        kuduSession = kuduClient.newSession();
        kuduSession.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);}@Overridepublicvoidinvoke(Map<String,Object> map,Context context)throwsException{if(null== map){return;}try{int columnCount = schema.getColumnCount();Insert insert = kuduTable.newInsert();PartialRow row = insert.getRow();for(int i =0; i < columnCount; i ++){Object value = map.get(schema.getColumnByIndex(i).getName());insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value);OperationResponse response = kuduSession.apply(insert);if(null!= response){
                    logger.error(response.getRowError().toString());}}}catch(Exception e){
            logger.error(e);}}@Overridepublicvoidclose()throwsException{try{
            kuduSession.close();
            kuduClient.close();
            os.close();
            out.close();}catch(Exception e){
            logger.error(e);}}privatevoidinsertData(PartialRow row,Type type,String columnName,Object value){try{switch(type){case STRING:
                    row.addString(columnName, value.toString());return;case INT32:
                    row.addInt(columnName,Integer.valueOf(value.toString()));return;case INT64:
                    row.addLong(columnName,Long.valueOf(value.toString()));return;case DOUBLE:
                    row.addDouble(columnName,Double.valueOf(value.toString()));return;case BOOL:
                    row.addBoolean(columnName,Boolean.valueOf(value.toString()));return;case BINARY:
                    os.writeObject(value);
                    row.addBinary(columnName, out.toByteArray());return;case FLOAT:
                    row.addFloat(columnName,Float.valueOf(value.toString()));default:thrownewUnsupportedOperationException("Unknown Type: "+ type);}}catch(Exception e){
            logger.error("插入数据异常: "+ e);}}}

编写实体

packageicu.wzk.kudu;publicclassUserInfo{privateString id;privateString name;privateInteger age;publicUserInfo(String id,String name,Integer age){this.id = id;this.name = name;this.age = age;}publicStringgetId(){return id;}publicvoidsetId(String id){this.id = id;}publicStringgetName(){return name;}publicvoidsetName(String name){this.name = name;}publicIntegergetAge(){return age;}publicvoidsetAge(Integer age){this.age = age;}}

执行建表

packageicu.wzk.kudu;importorg.apache.kudu.ColumnSchema;importorg.apache.kudu.Schema;importorg.apache.kudu.Type;importorg.apache.kudu.client.CreateTableOptions;importorg.apache.kudu.client.KuduClient;importorg.apache.kudu.client.KuduException;importjava.util.ArrayList;importjava.util.List;publicclassKuduCreateTable{publicstaticvoidmain(String[] args)throwsKuduException{String masterAddress ="localhost:7051,localhost:7151,localhost:7251";KuduClient.KuduClientBuilder kuduClientBuilder =newKuduClient.KuduClientBuilder(masterAddress);KuduClient kuduClient = kuduClientBuilder.build();String tableName ="user";List<ColumnSchema> columnSchemas =newArrayList<>();ColumnSchema id =newColumnSchema
                .ColumnSchemaBuilder("id",Type.INT32).key(true).build();
        columnSchemas.add(id);ColumnSchema name =newColumnSchema
                .ColumnSchemaBuilder("name",Type.STRING).key(false).build();
        columnSchemas.add(name);ColumnSchema age =newColumnSchema
                .ColumnSchemaBuilder("age",Type.INT32).key(false).build();
        columnSchemas.add(age);Schema schema =newSchema(columnSchemas);CreateTableOptions options =newCreateTableOptions();// 副本数量为1
        options.setNumReplicas(1);List<String> colrule =newArrayList<>();
        colrule.add("id");
        options.addHashPartitions(colrule,3);

        kuduClient.createTable(tableName, schema, options);
        kuduClient.close();}}

主逻辑代码

packageicu.wzk.kudu;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importjava.util.HashMap;importjava.util.Map;importjava.util.stream.Stream;publicclassSinkToKuduTest{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<UserInfo> dataSource = env.fromElements(newUserInfo("001","Jack",18),newUserInfo("002","Rose",20),newUserInfo("003","Cris",22),newUserInfo("004","Lily",19),newUserInfo("005","Lucy",21),newUserInfo("006","Json",24));SingleOutputStreamOperator<Map<String,Object>> mapSource = dataSource
                .map(newMapFunction<UserInfo,Map<String,Object>>(){@OverridepublicMap<String,Object>map(UserInfo value)throwsException{Map<String,Object> map =newHashMap<>();
                        map.put("id", value.getId());
                        map.put("name", value.getName());
                        map.put("age", value.getAge());return map;}});String kuduMasterAddr ="localhost:7051,localhost:7151,localhost:7251";String tableInfo ="user";
        mapSource.addSink(newMyFlinkSinkToKudu(kuduMasterAddr, tableInfo));

        env.execute("SinkToKuduTest");}}

解释分析

环境设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();:初始化 Flink 的执行环境,这是 Flink 应用的入口。

数据源创建

DataStreamSource dataSource = env.fromElements(…):创建了一个包含多个 UserInfo 对象的数据源,模拟了一个输入流。

数据转换

SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource.map(…):使用 map 函数将 UserInfo 对象转换为 Map<String, Object>,便于后续处理和写入 Kudu。每个 UserInfo 的属性都被放入一个 HashMap 中。

Kudu 配置信息

String kuduMasterAddr = “localhost:7051,localhost:7151,localhost:7251”; 和 String tableInfo = “user”;:定义 Kudu 的主节点地址和目标表的信息。

数据下沉

mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));:将转换后的数据流添加到 Kudu 的自定义 Sink 中。MyFlinkSinkToKudu 类应该实现了将数据写入 Kudu 的逻辑。

执行作业

env.execute(“SinkToKuduTest”);:启动 Flink 作业,执行整个数据流处理流程。

测试运行

  • 先运行建表
  • 再运行主逻辑

我们建表之后,确认user表存在。然后我们运行Flink程序,将数据写入Kudu。
在这里插入图片描述
确认有表后,执行 Flink 程序:
在这里插入图片描述

注意事项

  • 并发性:根据 Kudu 集群的规模和配置,可以调整 Flink 作业的并发性,以提高写入性能。
  • 批量写入:Kudu 支持批量插入,可以通过适当配置 Flink 的 sink 来提高性能。
  • 故障处理:确保在作业中处理异常和重试逻辑,以确保数据不会丢失。
  • 监控与调试:使用 Flink 的监控工具和 Kudu 的工具(如 Kudu UI)来监控数据流和性能。
标签: 大数据 apache flink

本文转载自: https://blog.csdn.net/w776341482/article/details/142514031
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。

“大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu”的评论:

还没有评论