0


【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse

【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse

需求描述:

1、数据从 Kafka 写入 ClickHouse。

2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。

3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。

4、先在 ClickHouse 中创建表然后动态获取 ClickHouse 的表结构。

5、Kafka 数据为 Json 格式,通过 FlatMap 扁平化处理后,根据表结构封装到 Row 中后完成写入。

6、写入时转换成临时视图模式,利用 Flink-Sql 实现数据写入。

7、本地测试时可以编辑 resources.flink_backup_local.yml 通过 ConfigTools.initConf 方法获取配置。

1)导入相关依赖

这里的依赖比较冗余,大家可以根据各自需求做删除或保留。

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>gaei.cn.x5l</groupId><artifactId>kafka2ch</artifactId><version>1.0.0</version><properties><hbase.version>2.3.3</hbase.version><hadoop.version>3.1.1</hadoop.version><spark.version>3.0.2</spark.version><scala.version>2.12.10</scala.version><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.0</flink.version><scala.binary.version>2.12</scala.binary.version><target.java.version>1.8</target.java.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.2</log4j.version><hadoop.version>3.1.2</hadoop.version><hive.version>3.1.2</hive.version></properties><dependencies><!-- 基础依赖  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 基础依赖  结束--><!-- TABLE  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>1.14.0</version><scope>provided</scope></dependency><!-- 使用 hive sql时注销,其他时候可以放开 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- TABLE  结束--><!-- sql  开始--><!-- sql解析 开始 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- sql解析 结束 --><!-- sql连接 kafka --><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>--><!--            <version>${flink.version}</version>--><!--        </dependency>--><!-- sql  结束--><!-- 检查点 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-state-processor-api_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 有状态的函数依赖 开始 --><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>statefun-sdk-java</artifactId>--><!--            <version>3.0.0</version>--><!--        </dependency>--><!-- 有状态的函数依赖 结束 --><!-- 连接Kafka --><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>--><!--            <version>${flink.version}</version>--><!--        </dependency>--><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version><scope>compile</scope></dependency><!-- DataStream 开始 --><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>statefun-flink-datastream</artifactId>--><!--            <version>3.0.0</version>--><!--        </dependency>--><!-- DataStream 结束 --><!-- 本地监控任务 开始 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 本地监控任务 结束 --><!-- DataStream 开始 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><!-- hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.1</version><!--            <exclusions>--><!--                <exclusion>--><!--                    <groupId>org.apache.curator</groupId>--><!--                    <artifactId>curator-client</artifactId>--><!--                </exclusion>--><!--            </exclusions>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.curator/curator-client --><!--        <dependency>--><!--            <groupId>org.apache.curator</groupId>--><!--            <artifactId>curator-client</artifactId>--><!--            <version>5.3.0</version>--><!--        </dependency>--><!-- 重点,容易被忽略的jar --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>${hadoop.version}</version></dependency><!-- rocksdb_2 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 其他 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency><dependency><groupId>gaei.cn.x5l.bigdata.common</groupId><artifactId>x5l-bigdata-common</artifactId><version>1.3-SNAPSHOT</version><exclusions><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId></exclusion></exclusions></dependency><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>--><!--            <version>${flink.version}</version>--><!--        </dependency>--><!--   将  flink-connector-kafka_2.12  改为  flink-sql-connector-kafka_2.12 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-clickhouse</artifactId><version>1.14.3-SNAPSHOT</version><!--<systemPath>${project.basedir}/lib/flink-connector-clickhouse-1.12.0-SNAPSHOT.jar</systemPath>--><!--<scope>system</scope>--></dependency><dependency><groupId>gaei.cn.x5l</groupId><artifactId>tsp-gb-decode</artifactId><version>1.0.0</version></dependency><dependency><groupId>org.jyaml</groupId><artifactId>jyaml</artifactId><version>1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version><scope>runtime</scope></dependency><dependency><groupId>gaei.cn.x5l.flink.common</groupId><artifactId>x5l-flink-common</artifactId><version>1.4-SNAPSHOT</version></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude><exclude>org.apache.flink:flink-runtime-web_2.11</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.owp.flink.kafka.KafkaSourceDemo</mainClass></transformer><!-- flink sql 需要  --><!-- The service transformer is needed to merge META-INF/services files --><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><!-- ... --></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.0.0,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>

2)代码实现

2.1.resources

2.1.1.appconfig.yml

mysql.url:"jdbc:mysql://1.1.1.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&connectTimeout=60000&socketTimeout=60000"mysql.username:"test"mysql.password:"123456"mysql.driver:"com.mysql.jdbc.Driver"

2.1.2.log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.1.3.log4j2.xml

<?xml version="1.0"encoding="UTF-8"?><configuration monitorInterval="5"><Properties><property name="LOG_PATTERN"value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" /><property name="LOG_LEVEL"value="ERROR" /></Properties><appenders><console name="console"target="SYSTEM_OUT"><PatternLayout pattern="${LOG_PATTERN}"/><ThresholdFilter level="${LOG_LEVEL}"onMatch="ACCEPT"onMismatch="DENY"/></console><File name="log"fileName="tmp/log/job.log"append="false"><PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/></File></appenders><loggers><root level="${LOG_LEVEL}"><appender-ref ref="console"/><appender-ref ref="log"/></root></loggers></configuration>

2.1.4.flink_backup_local.yml

clickhouse:connector:'clickhouse'database-name:'dwd'driver:'ru.yandex.clickhouse.ClickHouseDriver'jdbcurl:'jdbc:clickhouse://10.1.1.1:8123/dwd?socket_timeout=480000'password:'X8v@123456!%$'reissueInterval:3sink.batch-size:'200000'sink.flush-interval:'3000000'sink.ignore-delete:'true'sink.max-retries:'3'sink.partition-key:'toYYYYMMDD(sample_date_time)'sink.partition-strategy:'balanced'table-name:'test_local'url:'clickhouse://10.1.1.1:8123,10.1.1.1:8123,10.1.1.1:8123,10.1.1.1:8123,10.1.1.1:8123'username:'test'hdfs:checkPointPath:'hdfs://nameserver/user/flink/rocksdbcheckpoint'checkpointTimeout:360000checkpointing:300000maxConcurrentCheckpoints:1minPauseBetweenCheckpoints:10000restartInterval:60restartStrategy:3kafka-consumer:prop:auto.offset.reset:'earliest'bootstrap.servers:'kfk01:9092,kfk02:9092,kfk03:9092'enable.auto.commit:'false'fetch.max.bytes:'52428700'group.id:'test'isKerberized:'1'keytab:'D:/keytab/test.keytab'krb5Conf:'D:/keytab/krb5.conf'max.poll.interval.ms:'300000'max.poll.records:'1000'principal:'[email protected]'security_protocol:'SASL_PLAINTEXT'serviceName:'kafka'session.timeout.ms:'600000'useTicketCache:'false'topics:'topicA,topicB'kafka-producer:defaultTopic:'kafka2hive_error'prop:acks:'all'batch.size:'1048576'bootstrap.servers:'kfk01:9092,kfk02:9092,kfk03:9092'compression.type:'lz4'key.serializer:'org.apache.kafka.common.serialization.StringSerializer'retries:'3'value.serializer:'org.apache.kafka.common.serialization.StringSerializer'

2.2.utils

2.2.1.DBConn

importjava.sql.*;publicclassDBConn{privatestaticfinalString driver ="com.mysql.jdbc.Driver";//mysql驱动privatestaticConnection conn =null;privatestaticPreparedStatement ps =null;privatestaticResultSet rs =null;privatestaticfinalCallableStatement cs =null;/**
     * 连接数据库
     * @return
     */publicstaticConnectionconn(String url,String username,String password){Connection conn =null;try{Class.forName(driver);//加载数据库驱动try{
                conn =DriverManager.getConnection(url, username, password);//连接数据库}catch(SQLException e){
                e.printStackTrace();}}catch(ClassNotFoundException e){
            e.printStackTrace();}return conn;}/**
     * 关闭数据库链接
     * @return
     */publicstaticvoidclose(){if(conn !=null){try{
                conn.close();//关闭数据库链接}catch(SQLException e){
                e.printStackTrace();}}}}

2.2.2.CommonUtils

@Slf4jpublicclassCommonUtils{publicstaticStreamExecutionEnvironmentsetCheckpoint(StreamExecutionEnvironment env)throwsIOException{//        ConfigTools.initConf("local");Map hdfsMap =(Map)ConfigTools.mapConf.get("hdfs");
        env.enableCheckpointing(((Integer) hdfsMap.get("checkpointing")).longValue(),CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(((Integer) hdfsMap.get("minPauseBetweenCheckpoints")).longValue());
        env.getCheckpointConfig().setCheckpointTimeout(((Integer) hdfsMap.get("checkpointTimeout")).longValue());
        env.getCheckpointConfig().setMaxConcurrentCheckpoints((Integer) hdfsMap.get("maxConcurrentCheckpoints"));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((Integer) hdfsMap.get("restartStrategy"),// 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次Time.of(((Integer) hdfsMap.get("restartInterval")).longValue(),TimeUnit.SECONDS)// 延时));//设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);//设置状态后端存储方式
        env.setStateBackend(newRocksDBStateBackend((String) hdfsMap.get("checkPointPath"),true));//        env.setStateBackend(new FsStateBackend((String) hdfsMap.get("checkPointPath"), true));//        env.setStateBackend(new HashMapStateBackend(());return env;}publicstaticFlinkKafkaConsumer<ConsumerRecord<String,String>>getKafkaConsumer(Map<String,Object> kafkaConf)throwsIOException{String[] topics =((String) kafkaConf.get("topics")).split(",");
        log.info("监听的topic: {}", topics);Properties properties =newProperties();Map<String,String> kafkaProp =(Map<String,String>) kafkaConf.get("prop");for(String key : kafkaProp.keySet()){
            properties.setProperty(key, kafkaProp.get(key).toString());}if(!StringUtils.isBlank((String) kafkaProp.get("isKerberized"))&&"1".equals(kafkaProp.get("isKerberized"))){System.setProperty("java.security.krb5.conf", kafkaProp.get("krb5Conf"));
            properties.put("security.protocol", kafkaProp.get("security_protocol"));
            properties.put("sasl.jaas.config","com.sun.security.auth.module.Krb5LoginModule required "+"useTicketCache="+ kafkaProp.get("useTicketCache")+" "+"serviceName=\""+ kafkaProp.get("serviceName")+"\" "+"useKeyTab=true "+"keyTab=\""+ kafkaProp.get("keytab").toString()+"\" "+"principal=\""+ kafkaProp.get("principal").toString()+"\";");}

        properties.put("key.serializer","org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.serializer","org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");FlinkKafkaConsumer<ConsumerRecord<String,String>> consumerRecordFlinkKafkaConsumer =newFlinkKafkaConsumer<ConsumerRecord<String,String>>(Arrays.asList(topics),newKafkaDeserializationSchema<ConsumerRecord<String,String>>(){@OverridepublicTypeInformation<ConsumerRecord<String,String>>getProducedType(){returnTypeInformation.of(newTypeHint<ConsumerRecord<String,String>>(){});}@OverridepublicbooleanisEndOfStream(ConsumerRecord<String,String> stringStringConsumerRecord){returnfalse;}@OverridepublicConsumerRecord<String,String>deserialize(ConsumerRecord<byte[],byte[]> record)throwsException{returnnewConsumerRecord<String,String>(
                        record.topic(),
                        record.partition(),
                        record.offset(),
                        record.timestamp(),
                        record.timestampType(),
                        record.checksum(),
                        record.serializedKeySize(),
                        record.serializedValueSize(),newString(record.key()==null?"".getBytes(StandardCharsets.UTF_8): record.key(),StandardCharsets.UTF_8),newString(record.value()==null?"{}".getBytes(StandardCharsets.UTF_8): record.value(),StandardCharsets.UTF_8));}}, properties);return consumerRecordFlinkKafkaConsumer;}}

2.2.3.RemoteConfigUtil

publicclassRemoteConfigUtil{privatestaticfinalLogger log =LoggerFactory.getLogger(RemoteConfigUtil.class);privatestaticConnection conn =null;privatestaticPreparedStatement ps =null;privatestaticResultSet rs =null;publicstaticMap<String,Object> mapConf;publicRemoteConfigUtil(){}publicstaticMap<String,Object>getByAppNameAndConfigName(String appName,StringConfigName)throwsSQLException{if(mapConf !=null&& mapConf.size()>0){return mapConf;}else{Map<String,String> ymlMap =LocalConfigUtil.getYmlMap("/appconfig");String username =(String)ymlMap.get("mysql.username");String password =(String)ymlMap.get("mysql.password");String url =(String)ymlMap.get("mysql.url");String driver =(String)ymlMap.get("mysql.driver");Connection conn =JdbcUtil.getConnection(url, username, password, driver);PreparedStatement preparedStatement =null;Map var14;try{String sql ="select config_context from base_app_config where app_name = '%s' and config_name = '%s'";
                preparedStatement = conn.prepareStatement(String.format(sql, appName,ConfigName));ResultSet rs = preparedStatement.executeQuery();String config_context;for(config_context =""; rs.next(); config_context = rs.getString("config_context")){}
                rs.close();
                log.info("配置信息config_context: {}", config_context);if(StringUtils.isNotBlank(config_context)){System.out.println(JSONObject.toJSONString(JSONObject.parseObject(config_context),newSerializerFeature[]{SerializerFeature.PrettyFormat}));}
                mapConf =(Map)JSON.parseObject(config_context,Map.class);
                var14 = mapConf;}finally{if(preparedStatement !=null){
                    preparedStatement.close();}if(conn !=null){
                    conn.close();}}return var14;}}}

2.2.4.ClickhouseUtil

publicclassClickhouseUtil{publicClickhouseUtil(){}publicstaticList<SchemaPo>getSchemaPoList(Map<String,Object> chMapConf)throwsSQLException{List schemaPos =newArrayList();Connection connection =null;try{String jdbcurl =(String) chMapConf.get("jdbcurl");String driver =(String) chMapConf.get("driver");String userName =(String) chMapConf.get("username");String password =(String) chMapConf.get("password");String databaseName =(String) chMapConf.get("database-name");String tableName =(String) chMapConf.get("table-name");
            connection =JdbcUtil.getConnection(jdbcurl, userName, password, driver);DatabaseMetaData metaData = connection.getMetaData();ResultSet colRet = metaData.getColumns((String)null, databaseName, tableName,"%");System.out.println("表字段信息:");while(colRet.next()){String columnName = colRet.getString("COLUMN_NAME");String columnType = colRet.getString("TYPE_NAME");
                schemaPos.add(newSchemaPo(columnName, columnType));System.out.println(columnName +"   "+ columnType);}}finally{try{if(connection !=null){
                    connection.close();}}catch(SQLException var18){
                var18.printStackTrace();}}return schemaPos;}publicstaticStringgetCreateSinkTableSql(Map<String,Object> clickhouse,String sinkTableName,List<SchemaPo> schemaPos){StringBuilder sinkTableSql =newStringBuilder();String userName =(String) clickhouse.get("username");String password =(String) clickhouse.get("password");String connector =(String) clickhouse.get("connector");String databaseName =(String) clickhouse.get("database-name");String url =(String) clickhouse.get("url");String tableName =(String) clickhouse.get("table-name");String sinkBatchSize =(String) clickhouse.get("sink.batch-size");String sinkFlushInterval =(String) clickhouse.get("sink.flush-interval");String sinkMaxRetries =(String) clickhouse.get("sink.max-retries");String sinkPartitionStrategy =(String) clickhouse.get("sink.partition-strategy");String sinkPartitionKey =(String) clickhouse.get("sink.partition-key");String sinkIgnoreDelete =(String) clickhouse.get("sink.ignore-delete");
        sinkTableSql.append(String.format("CREATE TABLE %s (\n", sinkTableName));int i =0;Iterator var17 = schemaPos.iterator();while(var17.hasNext()){SchemaPo schemaPo =(SchemaPo) var17.next();++i;String signal = schemaPo.getSignal();String type = schemaPo.getType();if("UInt64".equalsIgnoreCase(type)){
                type ="BIGINT";}elseif("Map(String,String)".equalsIgnoreCase(type)){
                type ="Map<String,String>";}elseif("Datetime".equalsIgnoreCase(type)){
                type ="Timestamp(0)";}else{
                type ="String";}

            sinkTableSql.append(String.format("    `%s` %s", signal, type));
            sinkTableSql.append(i == schemaPos.size()?")":",\n");}

        sinkTableSql.append("WITH(\n");
        sinkTableSql.append(String.format("'connector' = '%s',\n", connector));
        sinkTableSql.append(String.format("'url' = '%s',\n", url));
        sinkTableSql.append(String.format("'username' = '%s',\n", userName));
        sinkTableSql.append(String.format("'password' = '%s',\n", password));
        sinkTableSql.append(String.format("'url' = '%s',\n", url));
        sinkTableSql.append(String.format("'database-name' = '%s',\n", databaseName));
        sinkTableSql.append(String.format("'table-name' = '%s',\n", tableName));
        sinkTableSql.append(String.format("'sink.batch-size' = '%s',\n", sinkBatchSize));
        sinkTableSql.append(String.format("'sink.flush-interval' = '%s',\n", sinkFlushInterval));
        sinkTableSql.append(String.format("'sink.max-retries' = '%s',\n", sinkMaxRetries));
        sinkTableSql.append(String.format("'sink.partition-strategy' = 'hash',\n"));
        sinkTableSql.append(String.format("'sink.partition-key' = 'sample_date_time',\n"));
        sinkTableSql.append(String.format("'sink.ignore-delete' = '%s'\n", sinkIgnoreDelete));
        sinkTableSql.append(" )");return sinkTableSql.toString();}//转换成ck需要的格式publicstaticRowconvertRow(Map<String,String> resultMap,List<SchemaPo> schemaPos){Row row =newRow(schemaPos.size());for(int i =0; i < schemaPos.size(); i++){SchemaPo schemaPo = schemaPos.get(i);String valueStr = resultMap.get(schemaPo.getSignal());if(StringUtils.isBlank(valueStr)){
                row.setField(i,null);continue;}if("UInt64".equalsIgnoreCase(schemaPo.getType())){Long svalue =Long.valueOf(valueStr);
                row.setField(i,Math.abs(svalue));}elseif("Int64".equalsIgnoreCase(schemaPo.getType())){Long svalue =Long.valueOf(valueStr);
                row.setField(i,Math.abs(svalue));}elseif("Int32".equalsIgnoreCase(schemaPo.getType())){Integer svalue =Integer.valueOf(valueStr);
                row.setField(i, svalue);}elseif("datetime".equalsIgnoreCase(schemaPo.getType())){try{Date svalue =(newSimpleDateFormat("yyyy-MM-dd HH:mm:ss")).parse(valueStr);Timestamp timestamp =newTimestamp(svalue.getTime());
                    row.setField(i, timestamp);}catch(Exception ex){System.out.println(ex.getMessage());System.out.println(Arrays.toString(ex.getStackTrace()));}}else{
                row.setField(i, valueStr);}}return row;}}

2.3.flatmap

2.3.1.FlatMapFunction

publicinterfaceFlatMapFunction{publicFlatMapFunction<ConsumerRecord<String,String>,Row>newInstance(List<SchemaPo> schemaPos);}

2.4.sink

2.4.1.ClickHouseCatalog

publicclassClickHouseCatalogextendsAbstractCatalog{privatestaticfinalLoggerLOG=LoggerFactory.getLogger(ClickHouseCatalog.class);publicstaticfinalStringDEFAULT_DATABASE="default";privatefinalString baseUrl;privatefinalString username;privatefinalString password;privatefinalboolean ignorePrimaryKey;privatefinalMap<String,String> properties;privateClickHouseConnection connection;publicClickHouseCatalog(String catalogName,Map<String,String> properties){this(catalogName,(String)properties.get("database-name"),(String)properties.get("url"),(String)properties.get("username"),(String)properties.get("password"), properties);}publicClickHouseCatalog(String catalogName,@NullableString defaultDatabase,String baseUrl,String username,String password){this(catalogName, defaultDatabase, baseUrl, username, password,Collections.emptyMap());}publicClickHouseCatalog(String catalogName,@NullableString defaultDatabase,String baseUrl,String username,String password,Map<String,String> properties){super(catalogName, defaultDatabase ==null?"default": defaultDatabase);Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl),"baseUrl cannot be null or empty");Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(username),"username cannot be null or empty");Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(password),"password cannot be null or empty");this.baseUrl = baseUrl.endsWith("/")? baseUrl : baseUrl +"/";this.username = username;this.password = password;this.ignorePrimaryKey = properties.get("catalog.ignore-primary-key")==null||Boolean.parseBoolean((String)properties.get("catalog.ignore-primary-key"));this.properties =Collections.unmodifiableMap(properties);}publicvoidopen()throwsCatalogException{try{Properties configuration =newProperties();
            configuration.putAll(this.properties);
            configuration.setProperty(ClickHouseQueryParam.USER.getKey(),this.username);
            configuration.setProperty(ClickHouseQueryParam.PASSWORD.getKey(),this.password);
            configuration.setProperty("socket_timeout","600000");String jdbcUrl =ClickHouseUtil.getJdbcUrl(this.baseUrl,this.getDefaultDatabase());BalancedClickhouseDataSource dataSource =newBalancedClickhouseDataSource(jdbcUrl, configuration);
            dataSource.actualize();this.connection = dataSource.getConnection();LOG.info("Created catalog {}, established connection to {}",this.getName(), jdbcUrl);}catch(Exception var4){thrownewCatalogException(String.format("Opening catalog %s failed.",this.getName()), var4);}}publicsynchronizedvoidclose()throwsCatalogException{try{this.connection.close();LOG.info("Closed catalog {} ",this.getName());}catch(Exception var2){thrownewCatalogException(String.format("Closing catalog %s failed.",this.getName()), var2);}}publicOptional<Factory>getFactory(){returnOptional.of(newClickHouseDynamicTableFactory());}publicsynchronizedList<String>listDatabases()throwsCatalogException{try{PreparedStatement stmt =this.connection.prepareStatement("SELECT name from `system`.databases");Throwable var2 =null;try{ResultSet rs = stmt.executeQuery();Throwable var4 =null;try{List<String> databases =newArrayList();while(rs.next()){
                        databases.add(rs.getString(1));}return databases;}catch(Throwable var31){
                    var4 = var31;throw var31;}finally{if(rs !=null){if(var4 !=null){try{
                                rs.close();}catch(Throwable var30){
                                var4.addSuppressed(var30);}}else{
                            rs.close();}}}}catch(Throwable var33){
                var2 = var33;throw var33;}finally{if(stmt !=null){if(var2 !=null){try{
                            stmt.close();}catch(Throwable var29){
                            var2.addSuppressed(var29);}}else{
                        stmt.close();}}}}catch(Exception var35){thrownewCatalogException(String.format("Failed listing database in catalog %s",this.getName()), var35);}}publicCatalogDatabasegetDatabase(String databaseName)throwsDatabaseNotExistException,CatalogException{if(this.listDatabases().contains(databaseName)){returnnewCatalogDatabaseImpl(Collections.emptyMap(),(String)null);}else{thrownewDatabaseNotExistException(this.getName(), databaseName);}}publicbooleandatabaseExists(String databaseName)throwsCatalogException{Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));returnthis.listDatabases().contains(databaseName);}publicvoidcreateDatabase(String name,CatalogDatabase database,boolean ignoreIfExists)throwsDatabaseAlreadyExistException,CatalogException{thrownewUnsupportedOperationException();}publicvoiddropDatabase(String name,boolean ignoreIfNotExists,boolean cascade)throwsDatabaseNotEmptyException,CatalogException{thrownewUnsupportedOperationException();}publicvoidalterDatabase(String name,CatalogDatabase newDatabase,boolean ignoreIfNotExists)throwsDatabaseNotExistException,CatalogException{thrownewUnsupportedOperationException();}publicsynchronizedList<String>listTables(String databaseName)throwsDatabaseNotExistException,CatalogException{if(!this.databaseExists(databaseName)){thrownewDatabaseNotExistException(this.getName(), databaseName);}else{try{PreparedStatement stmt =this.connection.prepareStatement(String.format("SELECT name from `system`.tables where database = '%s'", databaseName));Throwable var3 =null;try{ResultSet rs = stmt.executeQuery();Throwable var5 =null;try{List<String> tables =newArrayList();while(rs.next()){
                            tables.add(rs.getString(1));}return tables;}catch(Throwable var32){
                        var5 = var32;throw var32;}finally{if(rs !=null){if(var5 !=null){try{
                                    rs.close();}catch(Throwable var31){
                                    var5.addSuppressed(var31);}}else{
                                rs.close();}}}}catch(Throwable var34){
                    var3 = var34;throw var34;}finally{if(stmt !=null){if(var3 !=null){try{
                                stmt.close();}catch(Throwable var30){
                                var3.addSuppressed(var30);}}else{
                            stmt.close();}}}}catch(Exception var36){thrownewCatalogException(String.format("Failed listing tables in catalog %s database %s",this.getName(), databaseName), var36);}}}publicList<String>listViews(String databaseName)throwsDatabaseNotExistException,CatalogException{thrownewUnsupportedOperationException();}publicCatalogBaseTablegetTable(ObjectPath tablePath)throwsTableNotExistException,CatalogException{if(!this.tableExists(tablePath)){thrownewTableNotExistException(this.getName(), tablePath);}else{Map<String,String> configuration =newHashMap(this.properties);
            configuration.put("url",this.baseUrl);
            configuration.put("database-name", tablePath.getDatabaseName());
            configuration.put("table-name", tablePath.getObjectName());
            configuration.put("username",this.username);
            configuration.put("password",this.password);String databaseName = tablePath.getDatabaseName();String tableName = tablePath.getObjectName();try{DistributedEngineFullSchema engineFullSchema =ClickHouseUtil.getAndParseDistributedEngineSchema(this.connection, tablePath.getDatabaseName(), tablePath.getObjectName());if(engineFullSchema !=null){
                    databaseName = engineFullSchema.getDatabase();
                    tableName = engineFullSchema.getTable();}}catch(Exception var6){thrownewCatalogException(String.format("Failed getting engine full of %s.%s.%s",this.getName(), databaseName, tableName), var6);}returnnewCatalogTableImpl(this.createTableSchema(databaseName, tableName),this.getPartitionKeys(databaseName, tableName), configuration,"");}}privatesynchronizedTableSchemacreateTableSchema(String databaseName,String tableName){try{PreparedStatement stmt =this.connection.prepareStatement(String.format("SELECT * from `%s`.`%s` limit 0", databaseName, tableName));Throwable var4 =null;TableSchema var24;try{ClickHouseResultSetMetaData metaData =(ClickHouseResultSetMetaData)stmt.getMetaData().unwrap(ClickHouseResultSetMetaData.class);Method getColMethod = metaData.getClass().getDeclaredMethod("getCol",Integer.TYPE);
                getColMethod.setAccessible(true);List<String> primaryKeys =this.getPrimaryKeys(databaseName, tableName);TableSchema.Builder builder =TableSchema.builder();for(int idx =1; idx <= metaData.getColumnCount();++idx){ClickHouseColumnInfo columnInfo =(ClickHouseColumnInfo)getColMethod.invoke(metaData, idx);String columnName = columnInfo.getColumnName();DataType columnType =ClickHouseTypeUtil.toFlinkType(columnInfo);if(primaryKeys.contains(columnName)){
                        columnType =(DataType)columnType.notNull();}

                    builder.field(columnName, columnType);}if(!primaryKeys.isEmpty()){
                    builder.primaryKey((String[])primaryKeys.toArray(newString[0]));}

                var24 = builder.build();}catch(Throwable var21){
                var4 = var21;throw var21;}finally{if(stmt !=null){if(var4 !=null){try{
                            stmt.close();}catch(Throwable var20){
                            var4.addSuppressed(var20);}}else{
                        stmt.close();}}}return var24;}catch(Exception var23){thrownewCatalogException(String.format("Failed getting columns in catalog %s database %s table %s",this.getName(), databaseName, tableName), var23);}}privateList<String>getPrimaryKeys(String databaseName,String tableName){if(this.ignorePrimaryKey){returnCollections.emptyList();}else{try{PreparedStatement stmt =this.connection.prepareStatement(String.format("SELECT name from `system`.columns where `database` = '%s' and `table` = '%s' and is_in_primary_key = 1", databaseName, tableName));Throwable var4 =null;try{ResultSet rs = stmt.executeQuery();Throwable var6 =null;try{List<String> primaryKeys =newArrayList();while(rs.next()){
                            primaryKeys.add(rs.getString(1));}return primaryKeys;}catch(Throwable var33){
                        var6 = var33;throw var33;}finally{if(rs !=null){if(var6 !=null){try{
                                    rs.close();}catch(Throwable var32){
                                    var6.addSuppressed(var32);}}else{
                                rs.close();}}}}catch(Throwable var35){
                    var4 = var35;throw var35;}finally{if(stmt !=null){if(var4 !=null){try{
                                stmt.close();}catch(Throwable var31){
                                var4.addSuppressed(var31);}}else{
                            stmt.close();}}}}catch(Exception var37){thrownewCatalogException(String.format("Failed getting primary keys in catalog %s database %s table %s",this.getName(), databaseName, tableName), var37);}}}privateList<String>getPartitionKeys(String databaseName,String tableName){try{PreparedStatement stmt =this.connection.prepareStatement(String.format("SELECT name from `system`.columns where `database` = '%s' and `table` = '%s' and is_in_partition_key = 1", databaseName, tableName));Throwable var4 =null;try{ResultSet rs = stmt.executeQuery();Throwable var6 =null;try{List<String> partitionKeys =newArrayList();while(rs.next()){
                        partitionKeys.add(rs.getString(1));}return partitionKeys;}catch(Throwable var33){
                    var6 = var33;throw var33;}finally{if(rs !=null){if(var6 !=null){try{
                                rs.close();}catch(Throwable var32){
                                var6.addSuppressed(var32);}}else{
                            rs.close();}}}}catch(Throwable var35){
                var4 = var35;throw var35;}finally{if(stmt !=null){if(var4 !=null){try{
                            stmt.close();}catch(Throwable var31){
                            var4.addSuppressed(var31);}}else{
                        stmt.close();}}}}catch(Exception var37){thrownewCatalogException(String.format("Failed getting partition keys of %s.%s.%s",this.getName(), databaseName, tableName), var37);}}publicbooleantableExists(ObjectPath tablePath)throwsCatalogException{try{returnthis.databaseExists(tablePath.getDatabaseName())&&this.listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName());}catch(DatabaseNotExistException var3){returnfalse;}}publicvoiddropTable(ObjectPath tablePath,boolean ignoreIfNotExists)throwsTableNotExistException,CatalogException{thrownewUnsupportedOperationException();}publicvoidrenameTable(ObjectPath tablePath,String newTableName,boolean ignoreIfNotExists)throwsTableNotExistException,TableAlreadyExistException,CatalogException{thrownewUnsupportedOperationException();}publicvoidcreateTable(ObjectPath tablePath,CatalogBaseTable table,boolean ignoreIfExists)throwsTableAlreadyExistException,DatabaseNotExistException,CatalogException{thrownewUnsupportedOperationException();}publicvoidalterTable(ObjectPath tablePath,CatalogBaseTable newTable,boolean ignoreIfNotExists)throwsTableNotExistException,CatalogException{thrownewUnsupportedOperationException();}publicList<CatalogPartitionSpec>listPartitions(ObjectPath tablePath)throwsTableNotExistException,TableNotPartitionedException,CatalogException{returnCollections.emptyList();}publicList<CatalogPartitionSpec>listPartitions(ObjectPath tablePath,CatalogPartitionSpec partitionSpec)throwsTableNotExistException,TableNotPartitionedException,PartitionSpecInvalidException,CatalogException{returnCollections.emptyList();}publicList<CatalogPartitionSpec>listPartitionsByFilter(ObjectPath tablePath,List<Expression> filters)throwsTableNotExistException,TableNotPartitionedException,CatalogException{returnCollections.emptyList();}publicCatalogPartitiongetPartition(ObjectPath tablePath,CatalogPartitionSpec partitionSpec)throwsPartitionNotExistException,CatalogException{thrownewPartitionNotExistException(this.getName(), tablePath, partitionSpec);}publicbooleanpartitionExists(ObjectPath tablePath,CatalogPartitionSpec partitionSpec)throwsCatalogException{thrownewUnsupportedOperationException();}publicvoidcreatePartition(ObjectPath tablePath,CatalogPartitionSpec partitionSpec,CatalogPartition partition,boolean ignoreIfExists)throwsTableNotExistException,TableNotPartitionedException,PartitionSpecInvalidException,PartitionAlreadyExistsException,CatalogException{thrownewUnsupportedOperationException();}publicvoiddropPartition(ObjectPath tablePath,CatalogPartitionSpec partitionSpec,boolean ignoreIfNotExists)throwsPartitionNotExistException,CatalogException{thrownewUnsupportedOperationException();}publicvoidalterPartition(ObjectPath tablePath,CatalogPartitionSpec partitionSpec,CatalogPartition newPartition,boolean ignoreIfNotExists)throwsPartitionNotExistException,CatalogException{thrownewUnsupportedOperationException();}publicList<String>listFunctions(String dbName)throwsDatabaseNotExistException,CatalogException{returnCollections.emptyList();}publicCatalogFunctiongetFunction(ObjectPath functionPath)throwsFunctionNotExistException,CatalogException{thrownewFunctionNotExistException(this.getName(), functionPath);}publicbooleanfunctionExists(ObjectPath functionPath)throwsCatalogException{returnfalse;}publicvoidcreateFunction(ObjectPath functionPath,CatalogFunction function,boolean ignoreIfExists)throwsFunctionAlreadyExistException,DatabaseNotExistException,CatalogException{thrownewUnsupportedOperationException();}publicvoidalterFunction(ObjectPath functionPath,CatalogFunction newFunction,boolean ignoreIfNotExists)throwsFunctionNotExistException,CatalogException{thrownewUnsupportedOperationException();}publicvoiddropFunction(ObjectPath functionPath,boolean ignoreIfNotExists)throwsFunctionNotExistException,CatalogException{thrownewUnsupportedOperationException();}publicCatalogTableStatisticsgetTableStatistics(ObjectPath tablePath)throwsTableNotExistException,CatalogException{returnCatalogTableStatistics.UNKNOWN;}publicCatalogColumnStatisticsgetTableColumnStatistics(ObjectPath tablePath)throwsTableNotExistException,CatalogException{returnCatalogColumnStatistics.UNKNOWN;}publicCatalogTableStatisticsgetPartitionStatistics(ObjectPath tablePath,CatalogPartitionSpec partitionSpec)throwsPartitionNotExistException,CatalogException{returnCatalogTableStatistics.UNKNOWN;}publicCatalogColumnStatisticsgetPartitionColumnStatistics(ObjectPath tablePath,CatalogPartitionSpec partitionSpec)throwsPartitionNotExistException,CatalogException{returnCatalogColumnStatistics.UNKNOWN;}publicvoidalterTableStatistics(ObjectPath tablePath,CatalogTableStatistics tableStatistics,boolean ignoreIfNotExists)throwsTableNotExistException,CatalogException{thrownewUnsupportedOperationException();}publicvoidalterTableColumnStatistics(ObjectPath tablePath,CatalogColumnStatistics columnStatistics,boolean ignoreIfNotExists)throwsTableNotExistException,CatalogException,TablePartitionedException{thrownewUnsupportedOperationException();}publicvoidalterPartitionStatistics(ObjectPath tablePath,CatalogPartitionSpec partitionSpec,CatalogTableStatistics partitionStatistics,boolean ignoreIfNotExists)throwsPartitionNotExistException,CatalogException{thrownewUnsupportedOperationException();}publicvoidalterPartitionColumnStatistics(ObjectPath tablePath,CatalogPartitionSpec partitionSpec,CatalogColumnStatistics columnStatistics,boolean ignoreIfNotExists)throwsPartitionNotExistException,CatalogException{thrownewUnsupportedOperationException();}}

2.5.Kafka2ClickHouse

2.5.1.Kafka2chApp

publicclassKafka2chApp{privatestaticfinalLogger log =LoggerFactory.getLogger(Kafka2chApp.class);privatestaticStringSINK_TABLE="sinkTable";privatestaticStringKAFKA_TEMP_VIEW="kafkaTempView";/**
     * @param appName            mysql配置表对应字段
     * @param configName         mysql配置表对应字段
     * @throws Exception
     */publicstaticvoidrun(String appName,String configName,FlatMapFunctionFlatMapFunction)throwsException{
        log.info("Kafka2chApp.run传参appName:{}, configName:{}", appName, configName);// 获得数据库中的配置Map<String,Object> mapConf =RemoteConfigUtil.getByAppNameAndConfigName(appName, configName);if(mapConf ==null|| mapConf.size()==0)return;Map<String,Object> clickhouseConf =(Map<String,Object>) mapConf.get("clickhouse");Map<String,Object> kafkaConsumerConf =(Map<String,Object>) mapConf.get("kafka-consumer");Map<String,Object> hdfsConf =(Map<String,Object>) mapConf.get("hdfs");// long beforeTime2Dropout = System.currentTimeMillis() - (Long) mapConf.get("before2DropoutHourStep") * 3600;// long after2DropoutTime = System.currentTimeMillis();// 初始化TableEnv & 获得流StreamExecutionEnvironment streamEnv =StreamEnv.getStreamEnv(hdfsConf);
        
        streamEnv.setParallelism(ckP);StreamTableEnvironment tableEnv =TableEnv.getTableEnv();// 处理List<SchemaPo> schemaPos =ClickhouseUtil.getSchemaPoList(clickhouseConf);TypeInformation[] types =getTypeInformationArray(schemaPos);// TypeInformation[] types = (schemaPos);String[] fieldNames =SchemaPoUtil.getFieldLists(schemaPos);FlatMapFunction<ConsumerRecord<String,String>,Row> flatMapFunction = x5lFlatMapFunction.newInstance(schemaPos);DataStreamSource<ConsumerRecord<String,String>> stream;SingleOutputStreamOperator<Row> infos;

        stream = streamEnv.addSource(CommonUtils.getKafkaConsumer(kafkaConsumerConf));System.out.println("Source 设置并行度为"+streamEnv.getParallelism());}

        infos = stream.flatMap(flatMapFunction);

        infos = infos.map(e -> e,newRowTypeInfo(types, fieldNames));System.out.println("map 设置并行度为"+streamEnv.getParallelism());}// 创建kafka数据临时视图
        tableEnv.createTemporaryView(KAFKA_TEMP_VIEW, infos);// 创建存放kafka数据的clickhouse映射表// String createSinkTableSql = ClickhouseUtil.getCreateSinkTableSql(clickhouseConf, SINK_TABLE, schemaPos);Map<String,String> props =newHashMap<>();
        props.put(ClickHouseConfig.DATABASE_NAME,(String) clickhouseConf.get("database-name"));
        props.put(ClickHouseConfig.URL,(String) clickhouseConf.get("url"));
        props.put(ClickHouseConfig.USERNAME,(String) clickhouseConf.get("username"));
        props.put(ClickHouseConfig.PASSWORD,(String) clickhouseConf.get("password"));
        props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL,(String) clickhouseConf.get("sink.flush-interval"));
        props.put(ClickHouseConfig.SINK_BATCH_SIZE,(String) clickhouseConf.get("sink.batch-size"));Catalog cHcatalog =newClickHouseCatalog("clickhouse", props);

        tableEnv.registerCatalog("clickhouse", cHcatalog);
        tableEnv.useCatalog("clickhouse");// Arrays.stream(tableEnv.listCatalogs()).forEach(e -> System.out.println("catalog: " + e));// Arrays.stream(tableEnv.listDatabases()).forEach(e -> System.out.println("database: " + e));// System.out.println(tableEnv.listTables().length);// Arrays.stream(tableEnv.listTables()).forEach(e -> System.out.println("table: " + e));// tableEnv.executeSql(createSinkTableSql);// System.out.println(tableEnv.executeSql("select * from " + KAFKA_TEMP_VIEW).getTableSchema());//拼接sqlString insertSql ="insert into `"+ clickhouseConf.get("table-name")+"` select * from default_catalog.default_database."+KAFKA_TEMP_VIEW;// System.out.println("insertSql: " + insertSql);// log.info("insertSql: ", insertSql);//执行sql
        tableEnv.executeSql(insertSql);// 测试打印infos结果/*infos.print();
        streamEnv.executeAsync();*/}publicstaticTypeInformation[]getTypeInformationArray(List<SchemaPo> schemaPos){// String[] fieldNames = new String[columnTypeMap.size()];TypeInformation[] types =newTypeInformation[schemaPos.size()];int i =0;for(SchemaPo po : schemaPos){if("String".equalsIgnoreCase(po.getType())){
                types[i]=Types.STRING;}elseif("Int64".equalsIgnoreCase(po.getType())){
                types[i]=Types.LONG;}elseif("UInt64".equalsIgnoreCase(po.getType())){
                types[i]=Types.LONG;}elseif("Int32".equalsIgnoreCase(po.getType())){
                types[i]=Types.INT;}elseif("Int8".equalsIgnoreCase(po.getType())){
                types[i]=Types.INT;}elseif("datetime".equalsIgnoreCase(po.getType())){
                types[i]=Types.SQL_TIMESTAMP;}elseif("Map(String,String)".equalsIgnoreCase(po.getType())){
                types[i]=Types.MAP(Types.STRING,Types.STRING);}else{
                types[i]=Types.STRING;}
            i++;}return types;}}

2.5.2.Kafka2Ck-ODS

publicclassKafka2Ck-ODSimplementsFlatMapFunction{privatestaticLogger logger =Logger.getLogger(Kafka2Ck-ODS.class);publicstaticvoidmain(String[] args)throwsException{Kafka2chApp.run(Kafka2Ck-ODS.class.getName(), args[0],newKafka2Ck-ODS());}@OverridepublicFlatMapFunction<ConsumerRecord<String,String>,Row>newInstance(List<SchemaPo> schemaPos){returnnewFlatMapFunction<ConsumerRecord<String,String>,Row>(){@OverridepublicvoidflatMap(ConsumerRecord<String,String> record,Collector<Row> out)throwsException{// System.out.println("record ===> " +record); // 测试String value = record.value();try{HashMap<String,Object> infoMap =JSON.parseObject(value,HashMap.class);// 处理dataListMap中的数据for(Map.Entry<String,String> entry : dataListMap.entrySet()){String key = entry.getKey();String value1 = entry.getValue();
                        resultMap.put(key.toLowerCase(), value1);}Row row =TableEnv.getRowBySchemaPo1(resultMap, schemaPos);
                    out.collect(row);}catch(Exception e){
                    e.printStackTrace();System.out.printf("数据异常,原因是%s,topic为%s,key为%s,value为%s%n", e.getMessage(), record.topic(), record.key(), record.value());}}};}}
标签: flink kafka clickhouse

本文转载自: https://blog.csdn.net/weixin_53543905/article/details/135130618
版权归原作者 bmyyyyyy 所有, 如有侵权,请联系我们删除。

“【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse”的评论:

还没有评论