大家好,我是代码搬运工。最近在利用FlinkSQL进行开发连接Hive数据库的时候遇到了一些小问题,接下来分享给大家以免以后踩坑。
在一个项目中我主要利用FlinkSQL来连接Hive数据库并执行Insert动态插入语句来关联设备信息,话不多说我们直接开始。
maven的依赖如下
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.15</version>
</dependency>
<!--FLinkSQL依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink table包 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Hive Dependency -->
<!--<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.2</version>
</dependency>-->
<!--hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.1</version>
</dependency>
<!--Hive-->
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.2.11</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<exclusions>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-core</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-slf4j-impl</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-core</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-slf4j-impl</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-slf4j-impl</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-web</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-core</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-1.2-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>netty-all</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>hive-common</artifactId>
<groupId>org.apache.hive</groupId>
</exclusion>
<exclusion>
<artifactId>parquet-hadoop-bundle</artifactId>
<groupId>org.apache.parquet</groupId>
</exclusion>
<exclusion>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
</exclusion>
<exclusion>
<artifactId>hbase-client</artifactId>
<groupId>org.apache.hbase</groupId>
</exclusion>
<exclusion>
<artifactId>curator-framework</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>httpcore</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>commons-cli</artifactId>
<groupId>commons-cli</groupId>
</exclusion>
<exclusion>
<artifactId>commons-compress</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
<exclusion>
<artifactId>commons-lang</artifactId>
<groupId>commons-lang</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>gson</artifactId>
<groupId>com.google.code.gson</groupId>
</exclusion>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
<exclusion>
<artifactId>hbase-common</artifactId>
<groupId>org.apache.hbase</groupId>
</exclusion>
<exclusion>
<artifactId>hbase-hadoop2-compat</artifactId>
<groupId>org.apache.hbase</groupId>
</exclusion>
<exclusion>
<artifactId>hbase-server</artifactId>
<groupId>org.apache.hbase</groupId>
</exclusion>
<exclusion>
<artifactId>tephra-hbase-compat-1.0</artifactId>
<groupId>co.cask.tephra</groupId>
</exclusion>
<exclusion>
<artifactId>hbase-hadoop-compat</artifactId>
<groupId>org.apache.hbase</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
1.首先我们先用FlinkSQL连接Hive
!注意,这里我们要使用阿里的Blanner 我在这里踩了巨坑,一定要用阿里的Blanner才可以执行动态insert
因为Flink是流式处理,
如果我们构建table的环境是流式环境的话,数据是源源不断得输入进来。如下所示
// 构建运行流处理的运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 构建table环境StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);
这样的话我们执行动态操作的时候,例如 Insert intoXXXX Select * from XXX等语句是会报错误的,Hive Table SInk是不支持这样的。
我们查看源码可知。
Hive Table的INSERT只支持带有"+I"的数据,不允许带有-U和+U就是会改变的数据。
那怎么解决呢,不可能我想写个动态sql都不行吧,晕。。。。。
还好国内阿里进行了FlinkSQL优化,解决方法是利用阿里的BlinkPlanner来构建表环境,阿里已经内部帮你优化好了。
//使用阿里的PlannerEnvironmentSettings settings =EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();// 构建table环境TableEnvironment tableEnv =TableEnvironment.create(settings);//TODO 设置env的checkpoint等其他信息//设置方言 不同数据库的语句有差别
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
2.HiveCatalog连接Hive
构建好表环境后我们直接利用HiveCatalog来连接Hive数据库(里面我只是举例子,公司里一般写在常量里,直接调用就行,在公司这样写代码不被骂死我都服你)
//构造hive catalog 直接调用hiveconstans就可以// Catalog名称,定义一个唯一的名称表示String NAME="myhive";// 默认Hive数据库名称String DEFAULTDATABASE="lwtest";//hive-site.xml路径 运行Flink的Linux目录下String HIVECONFDIRPATH="/etc/hive/conf";//hive版本String VERSION="3.1.2";HiveCatalog myHive=newHiveCatalog(NAME, DEFAULTDATABASE,HIVECONFDIRPATH, VERSION);//注册指定名字的catalog
tableEnv.registerCatalog("myhive",myHive);//使用上面注册的catalog
tableEnv.useCatalog("myhive");
3.编写sql
然后我们就可以写自己的sql代码啦
//执行逻辑Table tableResult = tableEnv.sqlQuery(sql);//获取的结果直接打印
tableResult1.execute().print();//获取结果的迭代器,可以循环迭代器获取结果CloseableIterator<Row> rows = tableResult.execute().collect();//利用executeSql执行插入更新代码//例如insert into table xxxx select * from xxxx; TableResult tableResult1 = tableEnv.executeSql(sql);
调用executeSql不需要调用execute,如果里面有datastream api就需要execute
完整代码如下
//使用阿里的PlannerEnvironmentSettings settings =EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();// 构建table环境TableEnvironment tableEnv =TableEnvironment.create(settings);//TODO 设置env的checkpoint等其他信息//设置方言 不同数据库的语句有差别
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);//构造hive catalog 直接调用hiveconstans就可以// Catalog名称,定义一个唯一的名称表示String NAME="myhive";// 默认Hive数据库名称String DEFAULTDATABASE="lwtest";//hive-site.xml路径 运行Flink的Linux目录下String HIVECONFDIRPATH="/etc/hive/conf";//hive版本String VERSION="3.1.2";HiveCatalog myHive=newHiveCatalog(NAME, DEFAULTDATABASE,HIVECONFDIRPATH, VERSION);//注册指定名字的catalog
tableEnv.registerCatalog("myhive",myHive);//使用上面注册的catalog
tableEnv.useCatalog("myhive");// 执行逻辑String sql="select * from xxxx";Table tableResult1 = tableEnv.sqlQuery(sql);
tableResult1.execute().print();//获取结果的迭代器,可以循环迭代器获取结果CloseableIterator<Row> rows = tableResult1.execute().collect();//执行executeSql 插入或更新数据库String executeSql="insert into table xxxx select * from xxxx";TableResult tableResult6 = tableEnv.executeSql(executeSql);
版权归原作者 撸码撸猫达人 所有, 如有侵权,请联系我们删除。