0


FlinkSQL连接Hive并动态插入进Hive数据库中

大家好,我是代码搬运工。最近在利用FlinkSQL进行开发连接Hive数据库的时候遇到了一些小问题,接下来分享给大家以免以后踩坑。

在一个项目中我主要利用FlinkSQL来连接Hive数据库并执行Insert动态插入语句来关联设备信息,话不多说我们直接开始。

maven的依赖如下

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.15</version>
    </dependency>

    <!--FLinkSQL依赖-->

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.12</artifactId>
        <version>1.12.2</version>

    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java</artifactId>
        <version>${flink.version}</version>

    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>

    </dependency>

    <!-- flink table包 -->

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-hive_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Hive Dependency -->
    <!--<dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.3.2</version>

    </dependency>-->

    <!--hadoop -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.8.1</version>
    </dependency>

    <!--Hive-->
    <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>2.2.11</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>3.1.2</version>
        <exclusions>
            <exclusion>
                <artifactId>log4j-api</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>log4j</artifactId>
                <groupId>log4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>log4j-core</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>log4j-slf4j-impl</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>slf4j-api</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-serde</artifactId>
        <version>${hive.version}</version>
        <exclusions>
            <exclusion>
                <artifactId>log4j-api</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>log4j-core</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>log4j-slf4j-impl</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>slf4j-api</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-jdbc</artifactId>
        <version>${hive.version}</version>
        <exclusions>
            <exclusion>
                <artifactId>slf4j-log4j12</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>log4j-slf4j-impl</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>log4j-web</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>log4j-core</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>log4j-api</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>log4j-1.2-api</artifactId>
                <groupId>org.apache.logging.log4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>netty-all</artifactId>
                <groupId>io.netty</groupId>
            </exclusion>
            <exclusion>
                <artifactId>hive-common</artifactId>
                <groupId>org.apache.hive</groupId>
            </exclusion>
            <exclusion>
                <artifactId>parquet-hadoop-bundle</artifactId>
                <groupId>org.apache.parquet</groupId>
            </exclusion>
            <exclusion>
                <groupId>xerces</groupId>
                <artifactId>xercesImpl</artifactId>
            </exclusion>
            <exclusion>
                <artifactId>hbase-client</artifactId>
                <groupId>org.apache.hbase</groupId>
            </exclusion>
            <exclusion>
                <artifactId>curator-framework</artifactId>
                <groupId>org.apache.curator</groupId>
            </exclusion>
            <exclusion>
                <artifactId>zookeeper</artifactId>
                <groupId>org.apache.zookeeper</groupId>
            </exclusion>
            <exclusion>
                <artifactId>slf4j-api</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
            <exclusion>
                <artifactId>httpcore</artifactId>
                <groupId>org.apache.httpcomponents</groupId>
            </exclusion>
            <exclusion>
                <artifactId>httpclient</artifactId>
                <groupId>org.apache.httpcomponents</groupId>
            </exclusion>
            <exclusion>
                <artifactId>commons-cli</artifactId>
                <groupId>commons-cli</groupId>
            </exclusion>
            <exclusion>
                <artifactId>commons-compress</artifactId>
                <groupId>org.apache.commons</groupId>
            </exclusion>
            <exclusion>
                <artifactId>commons-lang</artifactId>
                <groupId>commons-lang</groupId>
            </exclusion>
            <exclusion>
                <artifactId>guava</artifactId>
                <groupId>com.google.guava</groupId>
            </exclusion>
            <exclusion>
                <artifactId>gson</artifactId>
                <groupId>com.google.code.gson</groupId>
            </exclusion>
            <exclusion>
                <artifactId>avro</artifactId>
                <groupId>org.apache.avro</groupId>
            </exclusion>
            <exclusion>
                <artifactId>hbase-common</artifactId>
                <groupId>org.apache.hbase</groupId>
            </exclusion>
            <exclusion>
                <artifactId>hbase-hadoop2-compat</artifactId>
                <groupId>org.apache.hbase</groupId>
            </exclusion>
            <exclusion>
                <artifactId>hbase-server</artifactId>
                <groupId>org.apache.hbase</groupId>
            </exclusion>
            <exclusion>
                <artifactId>tephra-hbase-compat-1.0</artifactId>
                <groupId>co.cask.tephra</groupId>
            </exclusion>
            <exclusion>
                <artifactId>hbase-hadoop-compat</artifactId>
                <groupId>org.apache.hbase</groupId>
            </exclusion>
            <exclusion>
                <artifactId>log4j</artifactId>
                <groupId>log4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>

</dependencies>

1.首先我们先用FlinkSQL连接Hive

!注意,这里我们要使用阿里的Blanner 我在这里踩了巨坑,一定要用阿里的Blanner才可以执行动态insert

因为Flink是流式处理,

如果我们构建table的环境是流式环境的话,数据是源源不断得输入进来。如下所示

// 构建运行流处理的运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 构建table环境StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);

这样的话我们执行动态操作的时候,例如 Insert intoXXXX Select * from XXX等语句是会报错误的,Hive Table SInk是不支持这样的。

我们查看源码可知。
在这里插入图片描述

Hive Table的INSERT只支持带有"+I"的数据,不允许带有-U和+U就是会改变的数据。

那怎么解决呢,不可能我想写个动态sql都不行吧,晕。。。。。

还好国内阿里进行了FlinkSQL优化,解决方法是利用阿里的BlinkPlanner来构建表环境,阿里已经内部帮你优化好了。

//使用阿里的PlannerEnvironmentSettings settings =EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();// 构建table环境TableEnvironment tableEnv =TableEnvironment.create(settings);//TODO 设置env的checkpoint等其他信息//设置方言 不同数据库的语句有差别
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

2.HiveCatalog连接Hive

构建好表环境后我们直接利用HiveCatalog来连接Hive数据库(里面我只是举例子,公司里一般写在常量里,直接调用就行,在公司这样写代码不被骂死我都服你)

//构造hive catalog 直接调用hiveconstans就可以// Catalog名称,定义一个唯一的名称表示String NAME="myhive";// 默认Hive数据库名称String DEFAULTDATABASE="lwtest";//hive-site.xml路径  运行Flink的Linux目录下String HIVECONFDIRPATH="/etc/hive/conf";//hive版本String VERSION="3.1.2";HiveCatalog myHive=newHiveCatalog(NAME, DEFAULTDATABASE,HIVECONFDIRPATH, VERSION);//注册指定名字的catalog
        tableEnv.registerCatalog("myhive",myHive);//使用上面注册的catalog
        tableEnv.useCatalog("myhive");

3.编写sql

然后我们就可以写自己的sql代码啦

//执行逻辑Table tableResult = tableEnv.sqlQuery(sql);//获取的结果直接打印
        tableResult1.execute().print();//获取结果的迭代器,可以循环迭代器获取结果CloseableIterator<Row> rows = tableResult.execute().collect();//利用executeSql执行插入更新代码//例如insert into table xxxx select * from xxxx; TableResult tableResult1 = tableEnv.executeSql(sql);

调用executeSql不需要调用execute,如果里面有datastream api就需要execute

完整代码如下

//使用阿里的PlannerEnvironmentSettings settings =EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();// 构建table环境TableEnvironment tableEnv =TableEnvironment.create(settings);//TODO 设置env的checkpoint等其他信息//设置方言 不同数据库的语句有差别
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);//构造hive catalog 直接调用hiveconstans就可以// Catalog名称,定义一个唯一的名称表示String NAME="myhive";// 默认Hive数据库名称String DEFAULTDATABASE="lwtest";//hive-site.xml路径  运行Flink的Linux目录下String HIVECONFDIRPATH="/etc/hive/conf";//hive版本String VERSION="3.1.2";HiveCatalog myHive=newHiveCatalog(NAME, DEFAULTDATABASE,HIVECONFDIRPATH, VERSION);//注册指定名字的catalog
        tableEnv.registerCatalog("myhive",myHive);//使用上面注册的catalog
        tableEnv.useCatalog("myhive");// 执行逻辑String sql="select * from xxxx";Table tableResult1 = tableEnv.sqlQuery(sql);
        tableResult1.execute().print();//获取结果的迭代器,可以循环迭代器获取结果CloseableIterator<Row> rows = tableResult1.execute().collect();//执行executeSql 插入或更新数据库String executeSql="insert into table xxxx select * from xxxx";TableResult tableResult6 = tableEnv.executeSql(executeSql);
标签: hive 数据库 flink

本文转载自: https://blog.csdn.net/weixin_43911155/article/details/122977512
版权归原作者 撸码撸猫达人 所有, 如有侵权,请联系我们删除。

“FlinkSQL连接Hive并动态插入进Hive数据库中”的评论:

还没有评论