手把手教学,flink connector打通clickhouse大数据库,通过下发flink sql,来使用ck。
组件版本jdk1.8flink1.17.2clickhouse23.12.2.59
1.背景
flink官方不支持clickhouse连接器,工作中难免会用到。
2.方案
利用GitHub大佬提供的源代码,我用的是release-1.16:https://github.com/itinycheng/flink-connector-clickhouse/tree/release-1.16
3.编译
导入IDEA,maven编译即可,生成flink-connector-clickhouse-1.16.0-SNAPSHOT.jar
4.将此依赖包,导入flink工程
spring boot工程
4.1)pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- <parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.13</version>
<relativePath/> <!– lookup parent from repository –>
</parent>-->
<parent>
<groupId>com.mit.microgrid</groupId>
<artifactId>mit-microgrid</artifactId>
<version>${project.build.version}</version>
</parent>
<artifactId>mit-microgrid-flink</artifactId>
<name>mit-microgrid-flink</name>
<description>flink connector clickhouse</description>
<properties>
<java.version>1.8</java.version>
<flink.version>1.17.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<!-- 排除SpringBoot自带的日志依赖 -->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--flink connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink connector clickhouse-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-clickhouse</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<!-- <artifactId>flink-clients_2.12</artifactId>-->
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- flink sql -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink JDBC Connector -->
<!-- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.14.6</version> <!– 与您的Flink版本匹配 –>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.2-1.17</version>
</dependency>
<!-- ClickHouse JDBC Driver -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2</version> <!-- 请根据实际情况选择最新稳定版本 -->
</dependency>
<!-- 添加clickhouse-maven依赖-->
<dependency>
<groupId>ru.ivi.opensource</groupId>
<artifactId>flink-clickhouse-sink</artifactId>
<version>1.2.0</version>
</dependency>
<!--module-->
<dependency>
<groupId>com.mit.microgrid</groupId>
<artifactId>mit-microgrid-common-core</artifactId>
<version>${project.build.version}</version>
</dependency>
<dependency>
<groupId>com.mit.microgrid</groupId>
<artifactId>mit-microgrid-api-history</artifactId>
<version>${project.build.version}</version>
</dependency>
<!--sql parse-->
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.37.0</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-server</artifactId>
<version>1.37.0</version>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-parser</artifactId>
<version>${flink.version}</version>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-uber</artifactId>
<version>1.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.12.0</version>
</dependency>
</dependencies>
<build>
<!--<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>-->
<!-- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>-->
<!-- </plugins>-->
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.7.3</version>
<configuration>
<mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass>
<fork>true</fork>
<layout>ZIP</layout>
<includeSystemScope>true</includeSystemScope>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
<configuration>
<classifier>-with-dependencies</classifier>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<addMavenDescriptor>false</addMavenDescriptor>
<manifest>
<mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptors>
<descriptor>src/main/resources/assembly/assembly.xml</descriptor>
</descriptors>
<outputDirectory>./../out</outputDirectory>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
4.2)核心方法:
/**
* multiple sql execute
*
* @param sqlList
*/
public static JobClient flinkSqlJobClientMultiple(List<String> sqlList) {
log.info("参数sqlList: {}", sqlList);
// StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
EnvironmentSettings setting = EnvironmentSettings.newInstance()
.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(setting);
if (CollectionUtil.isNullOrEmpty(sqlList)) {
log.warn("sqlList参数为空");
return null;
}
for (String s : sqlList) {
TableResult tableResult = tEnv.executeSql(s);
Optional<JobClient> jobClientOptional = tableResult.getJobClient();
if (jobClientOptional.isPresent()) {
JobClient jobClient = jobClientOptional.get();
log.info("jobClient: " + jobClient);
return jobClient;
}
}
log.error("没有可执行的job");
return null;
}
5.源码地址
版权归原作者 genghongsheng 所有, 如有侵权,请联系我们删除。