1. 场景
2. 版本
mysqlflinkclickhouse5.7.20-logflink-1.13.120.11.4.135.7.20-logflink-1.13.220.11.4.135.7.20-logflink-1.13.520.11.4.13
flink 连接clickhouse 的包
3. 代码的自定义结构图
4. 代码的pom 文件
4.1 pom 文件
<?xml version="1.0"encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.glab</groupId><artifactId>flink-connector-clickhouse</artifactId><version>13.1</version><name>flink-connector-clickhouse</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.13.1</flink.version><scala.binary.version>2.11</scala.binary.version><clickhouse-jdbc-version>0.3.0</clickhouse-jdbc-version></properties><packaging>jar</packaging><dependencies><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>${clickhouse-jdbc-version}</version><scope>provided</scope><exclusions><exclusion><groupId>com.google.guava</groupId><artifactId>guava</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/com.google.guava/guava --><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>30.1.1-jre</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.2</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpmime</artifactId><version>4.5.2</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpcore</artifactId><version>4.4.4</version></dependency><dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>1.2</version><scope>provided</scope></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>1.2.3</version><scope>provided</scope></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version><scope>provided</scope></dependency><!--kafak connector 测试用--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>test</scope></dependency><!-- Table ecosystem --><!-- Projects depending on this project won't depend on flink-table-*. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<!--<optional>true</optional>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<!--<type>test-jar</type>-->
<scope>provided</scope>
</dependency>
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. --><!-- <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.2</version><configuration><shadedArtifactAttached>true</shadedArtifactAttached><outputFile>out/flink-connector-clickhouse-${pom.version}.jar</outputFile><artifactSet><includes><include>*:*</include></includes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
4.2. ClickHouseDynamicTableFactory.java
package com.glab.flink.connector.clickhouse.table;import com.glab.flink.connector.clickhouse.table.internal.dialect.ClickHouseDialect;import com.glab.flink.connector.clickhouse.table.internal.options.ClickHouseOptions;import org.apache.flink.configuration.ConfigOption;import org.apache.flink.configuration.ConfigOptions;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.catalog.ResolvedCatalogTable;import org.apache.flink.table.catalog.ResolvedSchema;import org.apache.flink.table.connector.sink.DynamicTableSink;import org.apache.flink.table.connector.source.DynamicTableSource;import org.apache.flink.table.factories.*;import org.apache.flink.table.utils.TableSchemaUtils;import java.time.Duration;import java.util.Arrays;import java.util.HashSet;import java.util.Set;
public class ClickHouseDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {
public static final String IDENTIFIER ="clickhouse";
private static final String DRIVER_NAME ="ru.yandex.clickhouse.ClickHouseDriver";
public static final ConfigOption<String> URL = ConfigOptions.key("url")
.stringType()
.noDefaultValue()
.withDeprecatedKeys("the ClickHouse url in format `clickhouse://<host>:<port>`.");
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription("the ClickHouse username.");
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription("the ClickHouse password.");
public static final ConfigOption<String> DATABASE_NAME = ConfigOptions.key("database-name")
.stringType()
.defaultValue("default")
.withDescription("the ClickHouse database name. Default to `default`.");
public static final ConfigOption<String> TABLE_NAME = ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("the ClickHouse table name.");
public static final ConfigOption<Integer> SINK_BATCH_SIZE = ConfigOptions.key("sink.batch-size")
.intType()
.defaultValue(Integer.valueOf(1000))
.withDescription("the flush max size, over this number of records, will flush data. The default value is 1000.");
public static final ConfigOption<Duration> SINK_FLUSH_INTERVAL = ConfigOptions.key("sink.flush-interval")
.durationType()
.defaultValue(Duration.ofSeconds(1L))
.withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The default value is 1s.");
public static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries")
.intType()
.defaultValue(Integer.valueOf(3))
.withDescription("the max retry times if writing records to database failed.");
public static final ConfigOption<Boolean> SINK_WRITE_LOCAL = ConfigOptions.key("sink.write-local")
.booleanType()
.defaultValue(Boolean.valueOf(false))
.withDescription("directly write to local tables in case of Distributed table.");
public static final ConfigOption<String> SINK_PARTITION_STRATEGY = ConfigOptions.key("sink.partition-strategy")
.stringType()
.defaultValue("balanced")
.withDescription("partition strategy. available: balanced, hash, shuffle.");
public static final ConfigOption<String> SINK_PARTITION_KEY = ConfigOptions.key("sink.partition-key")
.stringType()
.noDefaultValue()
.withDescription("partition key used for hash strategy.");
public static final ConfigOption<Boolean> SINK_IGNORE_DELETE = ConfigOptions.key("sink.ignore-delete")
.booleanType()
.defaultValue(Boolean.valueOf(true))
.withDescription("whether to treat update statements as insert statements and ignore deletes. defaults to true.");
public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows")
.longType()
.defaultValue(-1L)
.withDescription("the max number of rows of lookup cache, over this value, the oldest rows will be eliminated." +
"cache.max-rows and cache ttl options must all be specified id any of them is specified. cache is not enabled as default.");
public static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions.key("lookup.cache.ttl")
.durationType()
.defaultValue(Duration.ofSeconds(10))
.withDescription("the cache time to live");
public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if lookup database failed.");
@Override
public DynamicTableSource createDynamicTableSource(Context context){
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig config = helper.getOptions();
helper.validate();
try {
validateConfigOptions(config);} catch (Exception e){
e.printStackTrace();}
//带New的使用1.13API,不带的用12的
ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();return new ClickHouseDynamicTableSource(resolvedSchema, getOptions(config), getJdbcLookupOptions(config));}
@Override
public DynamicTableSink createDynamicTableSink(Context context){
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig config = helper.getOptions();
helper.validate();
try {
validateConfigOptions(config);} catch (Exception e){
e.printStackTrace();}
//带New的使用1.13API,不带的用12的
ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();return new ClickHouseDynamicTableSink(resolvedSchema, getOptions(config));}
@Override
public String factoryIdentifier(){return IDENTIFIER;}
@Override
public Set<ConfigOption<?>>requiredOptions(){
Set<ConfigOption<?>> requiredOptions = new HashSet<>();
requiredOptions.add(URL);
requiredOptions.add(TABLE_NAME);return requiredOptions;}
@Override
public Set<ConfigOption<?>>optionalOptions(){
Set<ConfigOption<?>> optionalOptions = new HashSet<>();
optionalOptions.add(USERNAME);
optionalOptions.add(PASSWORD);
optionalOptions.add(DATABASE_NAME);
optionalOptions.add(SINK_BATCH_SIZE);
optionalOptions.add(SINK_FLUSH_INTERVAL);
optionalOptions.add(SINK_MAX_RETRIES);
optionalOptions.add(SINK_WRITE_LOCAL);
optionalOptions.add(SINK_PARTITION_STRATEGY);
optionalOptions.add(SINK_PARTITION_KEY);
optionalOptions.add(SINK_IGNORE_DELETE);
optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
optionalOptions.add(LOOKUP_CACHE_TTL);
optionalOptions.add(LOOKUP_MAX_RETRIES);return optionalOptions;}
private void validateConfigOptions(ReadableConfig config) throws Exception{
String partitionStrategy = config.get(SINK_PARTITION_STRATEGY);if(!Arrays.asList(new String[]{"hash", "balanced", "shuffle"}).contains(partitionStrategy))
throw new IllegalArgumentException("Unknown sink.partition-strategy `" + partitionStrategy + "`");if(partitionStrategy.equals("hash")&&!config.getOptional(SINK_PARTITION_KEY).isPresent())
throw new IllegalArgumentException("A partition key must be provided for hash partition strategy");if((config.getOptional(USERNAME).isPresent()^ config.getOptional(PASSWORD).isPresent()))
throw new IllegalArgumentException("Either all or none of username and password should be provided");}
private ClickHouseOptions getOptions(ReadableConfig config){return(new ClickHouseOptions.Builder()).withUrl((String)config.get(URL))
.withUsername((String)config.get(USERNAME))
.withPassword((String)config.get(PASSWORD))
.withDatabaseName((String)config.get(DATABASE_NAME))
.withTableName((String)config.get(TABLE_NAME))
.withBatchSize(((Integer)config.get(SINK_BATCH_SIZE)).intValue())
.withFlushInterval((Duration)config.get(SINK_FLUSH_INTERVAL))
.withMaxRetries(((Integer)config.get(SINK_MAX_RETRIES)).intValue())
.withWriteLocal((Boolean)config.get(SINK_WRITE_LOCAL))
.withPartitionStrategy((String)config.get(SINK_PARTITION_STRATEGY))
.withPartitionKey((String)config.get(SINK_PARTITION_KEY))
.withIgnoreDelete(((Boolean)config.get(SINK_IGNORE_DELETE)).booleanValue())
.setDialect(new ClickHouseDialect())
.build();}
/* private JdbcOptions getJdbcOptions(ReadableConfig config){return JdbcOptions.builder()
.setDriverName(DRIVER_NAME)
.setDBUrl(config.get(URL))
.setTableName(config.get(TABLE_NAME))
.setDialect(new ClickHouseDialect())
.build();}*/
private JdbcLookupOptions getJdbcLookupOptions(ReadableConfig config){return JdbcLookupOptions.builder()
.setCacheExpireMs(config.get(LOOKUP_CACHE_TTL).toMillis())
.setMaxRetryTimes(config.get(LOOKUP_MAX_RETRIES))
.setCacheMaxSize(config.get(LOOKUP_CACHE_MAX_ROWS))
.build();}}
4.3 ClickHouseDynamicTableSink.java
package com.glab.flink.connector.clickhouse.table;import com.glab.flink.connector.clickhouse.table.internal.AbstractClickHouseSinkFunction;import com.glab.flink.connector.clickhouse.table.internal.options.ClickHouseOptions;import org.apache.flink.table.catalog.ResolvedSchema;import org.apache.flink.table.connector.ChangelogMode;import org.apache.flink.table.connector.sink.DynamicTableSink;import org.apache.flink.table.connector.sink.SinkFunctionProvider;import org.apache.flink.types.RowKind;import org.apache.flink.util.Preconditions;
public class ClickHouseDynamicTableSink implements DynamicTableSink {
private final ResolvedSchema resolvedSchema;
private final ClickHouseOptions options;
public ClickHouseDynamicTableSink(ResolvedSchema resolvedSchema, ClickHouseOptions options){
this.resolvedSchema = resolvedSchema;
this.options = options;}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode){
validatePrimaryKey(requestedMode);return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();}
private void validatePrimaryKey(ChangelogMode requestedMode){
Preconditions.checkState((ChangelogMode.insertOnly().equals(requestedMode)|| this.resolvedSchema.getPrimaryKey().isPresent()), "please declare primary key for sink table when query contains update/delete record.");}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context){
AbstractClickHouseSinkFunction sinkFunction =(new AbstractClickHouseSinkFunction.Builder())
.withOptions(this.options)
.withFieldNames(this.resolvedSchema.getColumnNames())
.withFieldDataTypes(this.resolvedSchema.getColumnDataTypes())
.withPrimaryKey(this.resolvedSchema.getPrimaryKey())
.withRowDataTypeInfo(context.createTypeInformation(this.resolvedSchema.toSinkRowDataType()))
.build();return SinkFunctionProvider.of(sinkFunction);}
@Override
public ClickHouseDynamicTableSink copy(){return new ClickHouseDynamicTableSink(this.resolvedSchema, this.options);}
@Override
public String asSummaryString(){return"ClickHouse sink";}}
4.4 ClickHouseDynamicTableSource.java
package com.glab.flink.connector.clickhouse.table;import com.glab.flink.connector.clickhouse.table.internal.ClickHouseRowDataLookupFunction;import com.glab.flink.connector.clickhouse.table.internal.dialect.ClickHouseDialect;import com.glab.flink.connector.clickhouse.table.internal.options.ClickHouseOptions;import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;import org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat;import org.apache.flink.table.catalog.ResolvedSchema;import org.apache.flink.table.connector.ChangelogMode;import org.apache.flink.table.connector.source.*;import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;import org.apache.flink.table.types.DataType;import org.apache.flink.table.types.logical.RowType;import org.apache.flink.types.RowKind;import org.apache.flink.util.Preconditions;import org.apache.http.client.utils.URIBuilder;
public class ClickHouseDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsLimitPushDown {
private final ResolvedSchema resolvedSchema;
private final ClickHouseOptions options;
private final JdbcLookupOptions lookupOptions;
private long limit = -1;
public ClickHouseDynamicTableSource(ResolvedSchema resolvedSchema, ClickHouseOptions options, JdbcLookupOptions lookupOptions){
this.resolvedSchema = resolvedSchema;
this.options = options;
this.lookupOptions = lookupOptions;}
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext){
String[] keyNames = new String[lookupContext.getKeys().length];
for(int i =0; i <keyNames.length; i++){
int[] innerKeyArr = lookupContext.getKeys()[i];
Preconditions.checkArgument(innerKeyArr.length ==1, "JDBC only support non-nested look up keys");
keyNames[i]= resolvedSchema.getColumnNames().get(innerKeyArr[0]);}
final RowType rowType =(RowType)resolvedSchema.toSourceRowDataType().getLogicalType();
ClickHouseRowDataLookupFunction lookupFunction =
new ClickHouseRowDataLookupFunction(options, lookupOptions,
resolvedSchema.getColumnNames().stream().toArray(String[]::new),
resolvedSchema.getColumnDataTypes().stream().toArray(DataType[]::new), keyNames, rowType);return TableFunctionProvider.of(lookupFunction);}
@Override
public ChangelogMode getChangelogMode(){return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.build();}
//仅供数据探查
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext){
ClickHouseDialect dialect =(ClickHouseDialect)options.getDialect();
String query = dialect.getSelectFromStatement(options.getTableName(), resolvedSchema.getColumnNames().stream().toArray(String[]::new), new String[0]);
//1.13支持SupportsLimitPushDown,不然数据太大直接卡死了
if(limit >=0){
query = String.format("%s %s", query, dialect.getLimitClause(limit));}
RowType rowType =(RowType)resolvedSchema.toSourceRowDataType().getLogicalType();
getJdbcUrl(options.getUrl(), options.getDatabaseName());
JdbcRowDataInputFormat build = JdbcRowDataInputFormat.builder()
.setDrivername(options.getDialect().defaultDriverName().get())
.setDBUrl(getJdbcUrl(options.getUrl(), options.getDatabaseName()))
.setUsername(options.getUsername().orElse(null))
.setPassword(options.getPassword().orElse(null))
.setQuery(query)
.setRowConverter(dialect.getRowConverter(rowType))
.setRowDataTypeInfo(scanContext.createTypeInformation(resolvedSchema.toSourceRowDataType()))
.build();return InputFormatProvider.of(build);}
@Override
public DynamicTableSource copy(){
ClickHouseDynamicTableSource tableSource = new ClickHouseDynamicTableSource(resolvedSchema, options, lookupOptions);return tableSource;}
@Override
public String asSummaryString(){return"clickhouse source";}
private String getJdbcUrl(String url, String dbName){
try {return"jdbc:" + (new URIBuilder(url)).setPath("/" + dbName).build().toString();}catch (Exception e){
throw new RuntimeException("get JDBC url failed.", e);}}
@Override
public void applyLimit(long limit){
this.limit = limit;}}
4.5 其他的类代码上传
代码地址
flink 13 自定义的clickhouse 的source 和 sink 的 自定义 https://download.csdn.net/download/wudonglianga/86501949
4.5.1 flink 所含的包
[root@node01 flink-1.13.1]# cd lib/[root@node01 lib]# ll
总用量 384180
-rw-r--r-- 1 root root 3583858月 2718:39 clickhouse-jdbc-0.3.0.jar
-rw-r--r-- 1 root root 45850648月 2822:51 flink-connector-clickhouse-13.1-jar-with-dependencies.jar
-rw-r--r-- 1 root root 2489808月 2822:12 flink-connector-jdbc_2.11-1.13.1.jar
-rw-r--r-- 1 root root 300872688月 2818:10 flink-connector-mysql-cdc-2.0.2.jar
-rw-r--r-- 1 zookeeper hadoop 923115月 252021 flink-csv-1.13.1.jar
-rw-r--r-- 1 zookeeper hadoop 1155309725月 252021 flink-dist_2.11-1.13.1.jar
-rw-r--r-- 1 root root 8136310月 52021 flink-hadoop-compatibility_2.12-1.12.0.jar
-rw-r--r-- 1 zookeeper hadoop 1481315月 252021 flink-json-1.13.1.jar
-rw-r--r-- 1 root root 4331702510月 52021 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-rw-rw-r-- 1 zookeeper hadoop 77097404月 82021 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root root 3810148010月 52021 flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar
-rw-r--r-- 1 zookeeper hadoop 364172285月 252021 flink-table_2.11-1.13.1.jar
-rw-r--r-- 1 zookeeper hadoop 409659085月 252021 flink-table-blink_2.11-1.13.1.jar
-rw-r--r-- 1 root root 165482110月 52021 hadoop-mapreduce-client-core-3.1.1.3.1.4.0-315.jar
-rw-r--r-- 1 root root 5219159310月 52021 hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar
-rw-r--r-- 1 root root 1742706310月 52021 hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar
-rw-rw-r-- 1 zookeeper hadoop 6711410月 102019 log4j-1.2-api-2.12.1.jar
-rw-rw-r-- 1 zookeeper hadoop 27677110月 102019 log4j-api-2.12.1.jar
-rw-rw-r-- 1 zookeeper hadoop 167443310月 102019 log4j-core-2.12.1.jar
-rw-rw-r-- 1 zookeeper hadoop 2351810月 102019 log4j-slf4j-impl-2.12.1.jar
-rw-r--r-- 1 root root 23973218月 2822:13 mysql-connector-java-8.0.21.jar
[root@node01 lib]# pwd
/opt/module/flink/flink-1.13.1/flink-1.13.1/lib
[root@node01 lib]#
4.6 表结构
4.6.1 mysql 表结构
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS =0;
-- ----------------------------
-- Table structure for Flink_cdc
-- ----------------------------
DROP TABLE IF EXISTS `Flink_cdc`;
CREATE TABLE `Flink_cdc`(`id` bigint(64) NOT NULL AUTO_INCREMENT,
`name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`age` int(20) NULL DEFAULT NULL,
`birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT =10225 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of Flink_cdc
-- ----------------------------
INSERT INTO `Flink_cdc` VALUES (190, '乜荷爽', 5, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (191, '嵇露影', 4, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (192, '富胜', 18, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (193, '孟言', 7, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (194, '漆维光', 16, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (195, '澹巧', 7, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (196, '司玉', 23, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (197, '唐栋豪', 5, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (198, '姚以', 22, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (199, '仲亨', 15, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (200, '凌燕翠', 11, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (201, '琴荷亚', 13, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
SET FOREIGN_KEY_CHECKS =1;
4.6.2 clickhouse 表结构
create table clickhosuetable (id UInt64 , name String, age UInt64, birthday Datetime ) engine =MergeTree partition by toYYYYMMDD(birthday) primary key (id);
insert into clickhosuetable values (10001,'flink',25,'2022-08-28 12:00:00');
#*******************source*********************************
CREATE TABLE source_mysql2 (id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
age INT,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)) WITH ('connector'='jdbc',
'url'='jdbc:mysql://192.168.1.162:3306/wudldb',
'table-name'='Flink_cdc',
'username'='root',
'password'='123456');#*************************slink 表***************************
CREATE TABLE if not exists wutable2 (id BIGINT,
name STRING,
age BIGINT,
birthday TIMESTAMP,
PRIMARY KEY(id) NOT ENFORCED
) WITH ('connector'='clickhouse',
'url'='clickhouse://192.168.1.161:8123',
'username'='default',
'password'='',
'database-name'='wudldb',
'table-name'='clickhosuetable',
'lookup.cache.max-rows'='100',
'lookup.cache.ttl'='10',
'lookup.max-retries'='3');#***************************insert *************************
insert into wutable2 selectid ,name , age, birthday from source_mysql2;
4.7 flink cdc 到clickhouse
CREATE TABLE source_mysql (id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
age INT,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)) WITH ('connector'='mysql-cdc',
'hostname'='192.168.1.162',
'port'='3306',
'username'='root',
'password'='123456',
'server-time-zone'='Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name'='wudldb',
'table-name'='Flink_cdc');#****************************
CREATE TABLE if not exists wutable2 (id BIGINT,
name STRING,
age BIGINT,
birthday TIMESTAMP,
PRIMARY KEY(id) NOT ENFORCED
) WITH ('connector'='clickhouse',
'url'='clickhouse://192.168.1.161:8123',
'username'='default',
#'password-wudongliang' = '', 'database-name'='wudldb',
'table-name'='clickhosuetable',
'lookup.cache.max-rows'='100',
'lookup.cache.ttl'='10',
'lookup.max-retries'='3');# *******************************************************************
Flink SQL> insert into wutable2 selectid ,name , age, birthday from source_mysql;[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 1712c4e583d900b5523c08150ad9dd70
Flink SQL>
clickhouse结果
SELECT count(*)
FROM clickhosuetable
Query id: 93ee83d4-7092-46e4-9954-736af4e09548
┌─count()─┐
│ 20449 │
└─────────┘
1 rows in set. Elapsed: 0.005 sec.
node01.com :)
flink 程序
5. flink 对应 clickhouse 的 数据类型映射 Data Type Mapping
Flink TypeClickHouse TypeCHARStringVARCHARString / IP / UUIDSTRINGString / EnumBOOLEANUInt8BYTESFixedStringDECIMALDecimal / Int128 / Int256 / UInt64 / UInt128 / UInt256TINYINTInt8SMALLINTInt16 / UInt8INTEGERInt32 / UInt16 / IntervalBIGINTInt64 / UInt32FLOATFloat32DOUBLEFloat64DATEDateTIMEDateTimeTIMESTAMPDateTimeTIMESTAMP_LTZDateTimeINTERVAL_YEAR_MONTHInt32INTERVAL_DAY_TIMEInt64ARRAYArrayMAPMapROWNot supportedMULTISETNot supportedRAWNot supported
事例:
CREATE TABLE if not exists clickhouseTable (
ts BIGINT,
id STRING,
geohash12 STRING,
loc_type STRING,
wifimac STRING,
id_type STRING,
.....
address STRING,
PRIMARY KEY(ts, id) NOT ENFORCED
) WITH ('connector'='clickhouse', -- 使用 ck connector
'url'='clickhouse://xxxxx:8123', --集群中任意一台
'username'='',
'password'='',
'database-name'='test',
'table-name'='lbs',
-----以下为sink参数------
'sink.batch-size'='1000000', -- 批量插入数量
'sink.flush-interval'='5000', --刷新时间,默认1s
'sink.max-retries'='3', --最大重试次数
'sink.partition-strategy'='hash', --插入策略hash\balanced\shuffle
'sink.partition-key'='id''sink.write-local'='true',--是否写入本地表
'sink.ignore-delete'='true',
-----以下为source参数-----
'lookup.cache.max-rows'='100',
'lookup.cache.ttl'='10',
'lookup.max-retries'='3');
--1、sink.partition-strategy选择hash时,需配置sink.partition-key,并且sink.write-local=true写入本地表;
hash函数使用murmur3_32,与官方murmurHash3_32()集群表分发策略保持一致
--2、当sink.write-local=false时写入集群表,sink.partition-strategy无效,分发策略以来ck集群表配置;
6. flink 13.5 需要 用mysql驱动包8 以及flink checkpoint 启动hdfs
版权归原作者 wudl5566 所有, 如有侵权,请联系我们删除。