0


执行flink sql连接clickhouse库

手把手教学,flink connector打通clickhouse大数据库,通过下发flink sql,来使用ck。
组件版本jdk1.8flink1.17.2clickhouse23.12.2.59
1.背景

flink官方不支持clickhouse连接器,工作中难免会用到。

2.方案

利用GitHub大佬提供的源代码,我用的是release-1.16:https://github.com/itinycheng/flink-connector-clickhouse/tree/release-1.16

3.编译

导入IDEA,maven编译即可,生成flink-connector-clickhouse-1.16.0-SNAPSHOT.jar

4.将此依赖包,导入flink工程

spring boot工程

4.1)pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
<!--    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.13</version>
        <relativePath/> &lt;!&ndash; lookup parent from repository &ndash;&gt;
    </parent>-->

    <parent>
        <groupId>com.mit.microgrid</groupId>
        <artifactId>mit-microgrid</artifactId>
        <version>${project.build.version}</version>
    </parent>

    <artifactId>mit-microgrid-flink</artifactId>
    <name>mit-microgrid-flink</name>
    <description>flink connector clickhouse</description>

    <properties>
        <java.version>1.8</java.version>
        <flink.version>1.17.2</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <!-- 排除SpringBoot自带的日志依赖 -->
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>

        <!--flink-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--flink connector-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--flink connector clickhouse-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-clickhouse</artifactId>
            <version>1.16.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
<!--            <artifactId>flink-clients_2.12</artifactId>-->
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <!-- flink sql -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Flink JDBC Connector -->
<!--        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>1.14.6</version> &lt;!&ndash; 与您的Flink版本匹配 &ndash;&gt;
        </dependency>-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.2-1.17</version>
        </dependency>
        <!-- ClickHouse JDBC Driver -->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.3.2</version> <!-- 请根据实际情况选择最新稳定版本 -->
        </dependency>
        <!-- 添加clickhouse-maven依赖-->
        <dependency>
            <groupId>ru.ivi.opensource</groupId>
            <artifactId>flink-clickhouse-sink</artifactId>
            <version>1.2.0</version>
        </dependency>

        <!--module-->
        <dependency>
            <groupId>com.mit.microgrid</groupId>
            <artifactId>mit-microgrid-common-core</artifactId>
            <version>${project.build.version}</version>
        </dependency>
        <dependency>
            <groupId>com.mit.microgrid</groupId>
            <artifactId>mit-microgrid-api-history</artifactId>
            <version>${project.build.version}</version>
        </dependency>

        <!--sql parse-->
        <dependency>
            <groupId>org.apache.calcite</groupId>
            <artifactId>calcite-core</artifactId>
            <version>1.37.0</version>
        </dependency>
<!--        <dependency>
            <groupId>org.apache.calcite</groupId>
            <artifactId>calcite-server</artifactId>
            <version>1.37.0</version>
        </dependency>-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-parser</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.30</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-uber</artifactId>
            <version>1.17.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-text</artifactId>
            <version>1.12.0</version>
        </dependency>
    </dependencies>

    <build>
        <!--<plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>-->
<!--            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers combine.children="append">
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>-->
<!--        </plugins>-->

        <finalName>${project.artifactId}</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.7.3</version>
                <configuration>
                    <mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass>
                    <fork>true</fork>
                    <layout>ZIP</layout>
                    <includeSystemScope>true</includeSystemScope>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                        <configuration>
                            <classifier>-with-dependencies</classifier>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <archive>
                        <addMavenDescriptor>false</addMavenDescriptor>
                        <manifest>
                            <mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptors>
                        <descriptor>src/main/resources/assembly/assembly.xml</descriptor>
                    </descriptors>
                    <outputDirectory>./../out</outputDirectory>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

4.2)核心方法:

    /**
     * multiple sql execute
     *
     * @param sqlList
     */
    public static JobClient flinkSqlJobClientMultiple(List<String> sqlList) {
        log.info("参数sqlList: {}", sqlList);
//        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
        EnvironmentSettings setting = EnvironmentSettings.newInstance()
                .inBatchMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(setting);
        if (CollectionUtil.isNullOrEmpty(sqlList)) {
            log.warn("sqlList参数为空");
            return null;
        }
        for (String s : sqlList) {
            TableResult tableResult = tEnv.executeSql(s);
            Optional<JobClient> jobClientOptional = tableResult.getJobClient();
            if (jobClientOptional.isPresent()) {
                JobClient jobClient = jobClientOptional.get();
                log.info("jobClient: " + jobClient);
                return jobClient;
            }
        }
        log.error("没有可执行的job");
        return null;
    }

5.源码地址

https://github.com/genghongsheng0/mit-microgrid-flink


本文转载自: https://blog.csdn.net/genghongsheng/article/details/143758034
版权归原作者 genghongsheng 所有, 如有侵权,请联系我们删除。

“执行flink sql连接clickhouse库”的评论:

还没有评论