0


iceberg Flink操作

Flink操作

1.配置参数和jar包

1) Flink1.11 开始就不在提供 flink-shaded-hadoop-2-uber 的支持,所以如果需要 flink 支持hadoop 得配置环境变量 HADOOP_CLASSPATH

[root@hadoop103 flink-1.11.0]# vim bin/config.sh 
export HADOOP_COMMON_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_HDFS_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_YARN_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_CLASSPATH=`hadoop classpath`
export PATH=$PATH:$HADOOP_CLASSPATH

2) 目前 Iceberg 只支持 flink1.11.x 的版本,所以我这使用 flink1.11.0,将构建好的 Iceberg的 jar 包复制到 flink 下

/opt/module/iceberg-apache-iceberg-0.11.1/flink-runtime/build/libs/
[root@hadoop103 libs]# cp *.jar /opt/module/flink-1.11.0/lib/

2. Flink SQL Client

1)在 hadoop 环境下,启动一个单独的 flink 集群

root@hadoop103 flink-1.11.0]# bin/start-cluster.sh

启动:

./sql-client.sh embedded 
-j /home/softs/flink-1.11.1/lib/iceberg-flink-runtime-0.11.1.jar  
-j /home/softs/flink-1.11.1/lib/flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar  shell

3.使用catalog创建目录

--创建hadoop_catalog,执行下面命令CREATE CATALOG hadoop_catalog WITH('type'='iceberg','catalog-type'='hadoop','clients'='5','property-version'='1','warehouse'='hdfs://master:9000/user/hive/warehouse');--创建hive_catalog,执行下面命令:CREATE CATALOG hive_catalog WITH('type'='iceberg','catalog-type'='hive','uri'='thrift://master:9083','clients'='5','property-version'='1','warehouse'='hdfs://master:9000/user/hive/warehouse');--创建自定义目录CREATE CATALOG my_catalog WITH('type'='iceberg','catalog-impl'='com.my.custom.CatalogImpl','my-additional-catalog-config'='my-value');
  • uri: Hive 元存储的 thrift URI。(必需的)
  • clients:Hive Metastore 客户端池大小,默认值为 2。(可选)
  • warehouse:Hive仓库位置,如果既不设置hive-conf-dir指定包含hive-site.xml配置文件的位置也不添加正确hive-site.xml的类路径,用户应指定此路径。
  • hive-conf-dirhive-site.xml:包含将用于提供自定义 Hive配置值的配置文件的目录的路径。如果同时设置和创建冰山目录时,hive.metastore.warehouse.dirfrom/hive-site.xml(或来自类路径的 hive 配置文件)的值将被该值覆盖。warehousehive-conf-dirwarehouse

使用创建的catalog

Flink SQL>use catalog hadoop_catalog;

创建数据库

Flink SQL>createdatabase iceberg_sjc;

创建表

Flink SQL>>createtable testA(> id bigint,> name string,> age int,> dt string)> PARTITIONED by(dt);

插入数据

insertinto testA values(1001,' 张三',18,'2021-07-01'),(1001,' 李四',19,'2021-07-02');

重写数据

insert overwrite testA values(1010,' zz ',18,'2021-07-01'),(201,' 马 六',192,'2021-07-01');

4.Flink API操作

1)配置pom.xml

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink_demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.11.1</flink.version><scala.version>2.11.8</scala.version><scala.binary.version>2.11</scala.binary.version><log4j.version>1.2.17</log4j.version><slf4j.version>1.7.22</slf4j.version><iceberg.version>0.11.1</iceberg.version><hadoop.version>2.6.5</hadoop.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><!--
            https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!--
            https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-blink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-flink-runtime</artifactId><version>0.11.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency></dependencies><build><finalName>flink_demo</finalName><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><!--<manifest>--><!--<mainClass>com.sxd.util.QR_Code</mainClass>--><!--</manifest>--></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

2)读取表数据

1.batch read

packagecn.dp.icberg.flink;importcom.alibaba.fastjson.JSONObject;importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.typeutils.RowTypeInfo;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.data.GenericRowData;importorg.apache.flink.table.data.RowData;importorg.apache.flink.table.data.StringData;importorg.apache.iceberg.flink.TableLoader;importorg.apache.iceberg.flink.sink.FlinkSink;importorg.apache.iceberg.flink.source.FlinkSource;importjava.util.ArrayList;/**
 * @author: 商俊超
 * @version:
 * @date: 2022/5/26
 * @description:
 *//* 执行命令
  /home/softs/flink-1.11.1/bin/flink run -c cn.dp.icberg.flink.TableRead flink_demo-jar-with-dependencies.jar
 */publicclassTableRead{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();TableLoader tableLoader =TableLoader.fromHadoopTable("hdfs://master:9000/user/hive/warehouse/default/testA");batchRead(env, tableLoader);

        env.execute();}/**
     * 批次读取
     * @param env
     * @param tableLoader
     */publicstaticvoidbatchRead(StreamExecutionEnvironment env,TableLoader tableLoader){DataStream<RowData>    batch  =FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
        batch.map(item -> item.getLong(0)+"\t"+item.getString(1)+"\t"+item.getInt(2)+"\t"+item.getString(3)).print();}

2.streaming read读

packagecn.dp.icberg.flink;importcom.alibaba.fastjson.JSONObject;importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.typeutils.RowTypeInfo;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.data.GenericRowData;importorg.apache.flink.table.data.RowData;importorg.apache.flink.table.data.StringData;importorg.apache.iceberg.flink.TableLoader;importorg.apache.iceberg.flink.sink.FlinkSink;importorg.apache.iceberg.flink.source.FlinkSource;importjava.util.ArrayList;/**
 * @author: 商俊超
 * @version:
 * @date: 2022/5/26
 * @description:
 *//* 执行命令
  /home/softs/flink-1.11.1/bin/flink run -c cn.dp.icberg.flink.TableRead flink_demo-jar-with-dependencies.jar
 */publicclassTableRead{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();TableLoader tableLoader =TableLoader.fromHadoopTable("hdfs://master:9000/user/hive/warehouse/default/testA");streamingRead(env, tableLoader);

        env.execute();}/**
     * 批次读取
     * @param env
     * @param tableLoader
     */publicstaticvoidbatchRead(StreamExecutionEnvironment env,TableLoader tableLoader){DataStream<RowData>    batch  =FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
        batch.map(item -> item.getLong(0)+"\t"+item.getString(1)+"\t"+item.getInt(2)+"\t"+item.getString(3)).print();}/**
     * 流式读取
     * @param env
     * @param tableLoader
     */publicstaticvoidstreamingRead(StreamExecutionEnvironment env,TableLoader tableLoader){DataStream<RowData> stream =FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(true).build();
        stream.print();}

3)写数据

1.累加 写入 Appending Data

packagecn.dp.icberg.flink;importcom.alibaba.fastjson.JSONObject;importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.typeutils.RowTypeInfo;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.data.GenericRowData;importorg.apache.flink.table.data.RowData;importorg.apache.flink.table.data.StringData;importorg.apache.iceberg.flink.TableLoader;importorg.apache.iceberg.flink.sink.FlinkSink;importorg.apache.iceberg.flink.source.FlinkSource;importjava.util.ArrayList;/**
 * @author: 商俊超
 * @version:
 * @date: 2022/5/26
 * @description:
 *//* 执行命令
  /home/softs/flink-1.11.1/bin/flink run -c cn.dp.icberg.flink.TableRead flink_demo-jar-with-dependencies.jar
 */publicclassTableRead{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();TableLoader tableLoader =TableLoader.fromHadoopTable("hdfs://master:9000/user/hive/warehouse/default/testA");appendingData(env, tableLoader);

        env.execute();}/**
 * 累加写入
 * @param env
 * @param tableLoader
 */publicstaticvoidappendingData(StreamExecutionEnvironment env,TableLoader tableLoader){DataStream<RowData>    batch =FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();TableLoader    tableB =TableLoader.fromHadoopTable("hdfs://master:9000/user/hive/warehouse/default/testB");FlinkSink.forRowData(batch).tableLoader(tableB).build();}

2.覆盖写 Overwrite Data

packagecn.dp.icberg.flink;importcom.alibaba.fastjson.JSONObject;importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.typeutils.RowTypeInfo;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.data.GenericRowData;importorg.apache.flink.table.data.RowData;importorg.apache.flink.table.data.StringData;importorg.apache.iceberg.flink.TableLoader;importorg.apache.iceberg.flink.sink.FlinkSink;importorg.apache.iceberg.flink.source.FlinkSource;importjava.util.ArrayList;/**
 * @author: 商俊超
 * @version:
 * @date: 2022/5/26
 * @description:
 *//* 执行命令
  /home/softs/flink-1.11.1/bin/flink run -c cn.dp.icberg.flink.TableRead flink_demo-jar-with-dependencies.jar
 */publicclassTableRead{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();TableLoader tableLoader =TableLoader.fromHadoopTable("hdfs://master:9000/user/hive/warehouse/default/testA");overtData(env, tableLoader);

        env.execute();}/**
 * 覆盖写
 * @param env
 * @param tableLoader
 */publicstaticvoidovertData(StreamExecutionEnvironment env,TableLoader tableLoader){DataStream<RowData>  batch  =FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();TableLoader   tableB =TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testB");FlinkSink.forRowData(batch).tableLoader(tableB).overwrite(true).build();}

4)插入1W条测试数据

packagecn.dp.icberg.flink;importcom.alibaba.fastjson.JSONObject;importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.typeutils.RowTypeInfo;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.data.GenericRowData;importorg.apache.flink.table.data.RowData;importorg.apache.flink.table.data.StringData;importorg.apache.iceberg.flink.TableLoader;importorg.apache.iceberg.flink.sink.FlinkSink;importorg.apache.iceberg.flink.source.FlinkSource;importjava.util.ArrayList;/**
 * @author: 商俊超
 * @version:
 * @date: 2022/5/26
 * @description:
 *//* 执行命令
  /home/softs/flink-1.11.1/bin/flink run -c cn.dp.icberg.flink.TableRead flink_demo-jar-with-dependencies.jar
 */publicclassTableRead{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();TableLoader tableLoader =TableLoader.fromHadoopTable("hdfs://master:9000/user/hive/warehouse/default/testA");dataTest(env, tableLoader);

        env.execute();}/**
 * 添加 10000条测试数据
 * @param env
 * @param tableLoader
 */publicstaticvoiddataTest(StreamExecutionEnvironment env,TableLoader tableLoader){ArrayList<String> list =newArrayList<>();for(int i =1; i<=10000; i++){
        list.add("{\"id\":\""+i+"\",\"name\":\""+"liusi"+i+"\",\"age\":\""+i+1+"\",\"dt\":\""+"2022-05-26"+"\"}");}DataStream<String> data = env.fromCollection(list);DataStream<RowData> input = data.map(item ->{JSONObject jsonData =JSONObject.parseObject(item);//参数个数GenericRowData rowData =newGenericRowData(4);
        rowData.setField(0, jsonData.getLongValue("id"));
        rowData.setField(1,StringData.fromString(jsonData.getString("name")));
        rowData.setField(2, jsonData.getIntValue("age"));
        rowData.setField(3,StringData.fromString(jsonData.getString("dt")));return rowData;});FlinkSink.forRowData(input).tableLoader(tableLoader).overwrite(true).build();}

在这里插入图片描述
在这里插入图片描述

标签: hadoop flink hive

本文转载自: https://blog.csdn.net/m0_46538284/article/details/125001561
版权归原作者 披着狼皮的小红帽_ 所有, 如有侵权,请联系我们删除。

“iceberg Flink操作”的评论:

还没有评论