总览
1.生成运行时env
2.生成表环境
3.接上数据流,数据流数据生成表
4.把数据库中sink保存数据的表,在flink中生成一遍(相当于把flink生成的表,绑定到数据库中的表),配上数据库连接信息,并执行,及注册
5.查询表,可以根据注册表名查询
6.插入表,可以根据生成的flink表进行数据插入
完整案例:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors._
object SqlReadMysql {
def main(args: Array[String]): Unit = {
// creat env
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//parallelism
bsEnv.setParallelism(1)
//set env
val bsSetting = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
//create table env
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSetting)
//create ds
val dataStream = bsEnv.fromElements(Tuple2("01","lisi" ))
val table1 = bsTableEnv.fromDataStream(dataStream)
//create table
val sinkDDL =
"""
|create table student2_flink (
|code varchar(20) null,
|name varchar(20) null
|)with(
|'connector.type'='jdbc',
|'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',
|'connector.table'='student2',
|'connector.driver'='com.mysql.jdbc.Driver',
|'connector.username'='root',
|'connector.password'='root'
|)
|""".stripMargin
println(sinkDDL)
// execute the create table sql
bsTableEnv.executeSql(sinkDDL)
//register table
val myStudent = bsTableEnv.from("student2_flink")
//execute query
val result = bsTableEnv.sqlQuery(s"select * from $myStudent")
result.toRetractStream[(String, String)].print()
//insert data
table1.executeInsert("student2_flink")
//execute
bsEnv.execute()
}
}
POM文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.sinopharm.gksk</groupId>
<artifactId>gksk-bigdata</artifactId>
<version>1.0-SNAPSHOT</version>
<name>gksk-bigdata</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.25</slf4j.version>
<slf4j.api.version>1.7.25</slf4j.api.version>
</properties>
<!--Flink项目核心依赖-->
<dependencies>
<!--Flink Java 项目核心依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<!--Flink scala项目核心依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<!--Flink Table API 核心依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.14.4</version>
</dependency>
<!-- csv-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.14.4</version>
</dependency>
<!--以下用到什么引用什么-->
<!--Flink Kafka依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<!--Flink rocksdb状态后依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
<!--本地测试核心依赖-->
<!--Flink 本地测试客户端依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<!--Flink 本地测试wei ui依赖 http://127.0.0.1:8081/ -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.14.4</version>
<scope>runtime</scope>
</dependency>
<!--junit测试-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!--日志输出-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
</resource>
<resource>
<directory>src/main/scala</directory>
</resource>
</resources>
<plugins>
<!--这里没引打包插件 需要的自己引用-->
<!--Java compiler-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!--Java Compiler-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Could not instantiate the executor. Make sure a planner module is on the classpath
原因:pom文件中缺少 planner
解决办法:添加
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.14.4</version>
</dependency>
ps:注意有时候 配置两个planner也会报错
flinksql 连接mysql报错 JDBC-Class not found. - com.mysql.jdbc.Driver
原因:缺少mysql的jar包
解决:pom文件添加:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
open() failed.The server time zone value '�й���ʱ��' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.
原因:URL没有指定时区,jdbc 6.0以上都有这个问题
解决:在URL后边加时区
'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',
- useUnicode=true 表示使用Unicode字符,因此可以使用中文
- characterEncoding=utf8 设置编码方式
- useSSL=true 设置安全连接
- serverTimezone=UTC 设置全球标准时间
open() failed.Cannot load connection class because of underlying exception: com.mysql.cj.exceptions.WrongArgumentException: Malformed database URL, failed to parse the main URL sections.
原因:连接的URL写错了
解决:好好看看,字符 、格式
版权归原作者 hzp666 所有, 如有侵权,请联系我们删除。