0


Flink CDC整库同步

背景

项目需要能够捕获外部数据源的数据变更,实时同步到目标数据库中,自动更新数据,实现源数据库和目标数据库所有表的数据同步更新,本文以mysql -> greenplumn场景记录实现方案。

实现
1.引入依赖
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-mysql-cdc</artifactId>
    <version>2.4.1</version>
</dependency>
 
<!--        mysql-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.29</version>
</dependency>
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.5.0</version>
</dependency>
2.创建FlinkCDCSource

创建FlinkCDC连接器,设置数据源的连接信息,日志捕获的起始时间点,读取并缓存数据源的元数据信息

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.DefaultCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
 
import java.util.*;
import java.util.stream.Collectors;
 
/**
 * 目标表是否自动创建
 * 表结构变更
 */
@Slf4j
public class FlinkCdcMultiSyncJdbc {
 
    public static CdcPro cdcPro;
 
    public static MySqlCatalog mysqlCatalog;
 
    public static List<String> tables;
 
    public static void main(String[] args) throws Exception {
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        init();
 
        // 监控mysql binlog
        //创建 Flink-MySQL-CDC 的 Source
        //initial (default): 在第一次启动时对被监视的数据库表执行初始快照,并继续读取最新的binlog (开启断点续传后从上次消费offset继续消费)
        //latest-offset: 永远不要在第一次启动时对被监视的数据库表执行快照,只从binlog的末尾读取,这意味着只有自连接器启动以来的更改
        //timestamp: 永远不要在第一次启动时对监视的数据库表执行快照,直接从指定的时间戳读取binlog。使用者将从头遍历binlog,并忽略时间戳小于指定时间戳的更改事件
        //specific-offset: 不允许在第一次启动时对监视的数据库表进行快照,直接从指定的偏移量读取binlog。
        Properties jdbcProperties = new Properties();
        jdbcProperties.setProperty("time_zone", "+8:00");
        jdbcProperties.setProperty("serverTimeZone", "Asia/Shanghai");
        MySqlSource<Tuple2<String, Row>> mySqlSource = MySqlSource.<Tuple2<String, Row>>builder()
                .hostname(cdcPro.getHost())
                .port(cdcPro.getPort())
                .databaseList(cdcPro.getDb())
                .tableList(tables.toArray(new String[0]))
//                .scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
                .username(cdcPro.getUserName())
                .password(cdcPro.getPassWord())
                .jdbcProperties(jdbcProperties)
                .deserializer(new MySQLDebeziumDeserializer(buildTableRowTypeMap(cdcPro, tables)))
                .startupOptions(StartupOptions.latest())
                .includeSchemaChanges(false)
                .build();
 
        SingleOutputStreamOperator<Tuple2<String, Row>> dataStreamSource = env.fromSource(mySqlSource,
                WatermarkStrategy.noWatermarks(), "mysql cdc").disableChaining();
        dataStreamSource.addSink(new DwSink(cdcPro));
        env.execute("flinkcdc_mysql_dw");
    }
 
    private static void init() throws Exception {
        cdcPro = SourceUtil.createCdcPro();
        String source_url = String.format("jdbc:mysql://%s:%d", cdcPro.getHost(), cdcPro.getPort());
        // 注册同步的库对应的catalog
        mysqlCatalog = new MySqlCatalog(new ClassLoader() {
            @Override
            public Class<?> loadClass(String name) throws ClassNotFoundException {
                return super.loadClass(name);
            }
        }, "mysql-catalog", cdcPro.getDb(), cdcPro.getUserName(), cdcPro.getPassWord(), source_url);
        //        List<String> dbs = mysqlCatalog.listDatabases();
        tables = buildTables(cdcPro);
        System.setProperty("HADOOP_USER_NAME", "root");
    }
 
    private static List<String> buildTables(CdcPro cdcPro) throws Exception {
        // 如果整库同步,则从catalog里取所有表,否则从指定表中取表名
        List<String> tables = new ArrayList<>();
        if (".*".equals(cdcPro.getTableList())) {
            tables = mysqlCatalog.listTables(cdcPro.getDb());
        } else {
            String[] tableArray = cdcPro.getTableList().split(",");
            for (String table : tableArray) {
                tables.add(table);
            }
        }
 
        //过滤提出列表中的表清单
        tables = tables.stream().filter(tableName->!cdcPro.getExcludeTables().contains(tableName)).map(t->cdcPro.getDb() + "." + t).collect(Collectors.toList());
        log.info("cdc tables: \n" + tables);
        return tables;
    }
 
    private static Map<String, RowType> buildTableRowTypeMap(CdcPro cdcPro, List<String> tables) throws Exception {
        Map<String, RowType> tableRowTypeMap = new HashMap<>();
//        Map<String, RowTypeInfo> tableTypeInformationMap = Maps.newConcurrentMap();
        tables.parallelStream().forEach(table ->{
            String tableName = StringUtils.replace(table, cdcPro.getDb() + ".", "");
            // 获取mysql catalog中注册的表
            ObjectPath objectPath = new ObjectPath(cdcPro.getDb(), tableName);
            DefaultCatalogTable catalogBaseTable = null;
            try {
                catalogBaseTable = (DefaultCatalogTable) mysqlCatalog.getTable(objectPath);
            } catch (TableNotExistException e) {
                throw new RuntimeException(e);
            }
            // 获取表的Schema
            Schema schema = catalogBaseTable.getUnresolvedSchema();
            // 获取表中字段名列表和表字段类型
            String[] fieldNames = new String[schema.getColumns().size()];
            LogicalType[] logicalTypes = new LogicalType[schema.getColumns().size()];
            for (int i = 0; i < schema.getColumns().size(); i++) {
                Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) schema.getColumns().get(i);
                fieldNames[i] = column.getName();
                logicalTypes[i] = ((DataType) column.getDataType()).getLogicalType();
 
            }
            tableRowTypeMap.put(tableName, RowType.of(logicalTypes, fieldNames));
        });
        return tableRowTypeMap;
    }
 
}
3.创建数据解析器

创建自定义数据解析器,根据FlinkCDC捕获的日志内容,解析出变更的表、数据、操作类型。

自定义数据转换器,将日志解析数据转换成为Flink数据类型

设置Flink数据的Changelog,表和数据组成二元组,放入Flink收集器中。

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
import com.ververica.cdc.debezium.utils.TemporalConversions;
import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.time.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import org.apache.flink.table.catalog.DefaultCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.RowUtils;
import org.apache.flink.util.Collector;
 
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
 
@Slf4j
public class MySQLDebeziumDeserializer implements DebeziumDeserializationSchema<Tuple2<String, Row>> {
 
    private final Map<String, RowType> tableRowTypeMap;
    private final Map<String, DeserializationRuntimeConverter> physicalConverterMap = Maps.newConcurrentMap();
 
    MySQLDebeziumDeserializer(Map<String, RowType> tableRowTypeMap) {
        this.tableRowTypeMap = tableRowTypeMap;
        for (String tableName : this.tableRowTypeMap.keySet()) {
            RowType rowType = this.tableRowTypeMap.get(tableName);
            DeserializationRuntimeConverter physicalConverter = createNotNullConverter(rowType);
            this.physicalConverterMap.put(tableName, physicalConverter);
        }
    }
 
    @Override
    public void deserialize(SourceRecord record, Collector out) throws Exception {
        Struct value = (Struct) record.value();
//        log.info("value:{}", value);
        String tableName = value.getStruct("source").get("table").toString();
 
        //如果该表不在初始化的列表中,过滤该表的操作
        if(!physicalConverterMap.containsKey(tableName)){
            return;
        }
 
        DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tableName);
        Envelope.Operation op = Envelope.operationFor(record);
 
        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
            Row insert = extractAfterRow(record, physicalConverter);
            insert.setKind(RowKind.INSERT);
            out.collect(Tuple2.of(tableName, insert));
        } else if (op == Envelope.Operation.DELETE) {
            Row delete = extractBeforeRow(record, physicalConverter);
            delete.setKind(RowKind.DELETE);
            out.collect(Tuple2.of(tableName, delete));
        } else if (op == Envelope.Operation.UPDATE) {
            Row before = extractBeforeRow(record, physicalConverter);
            before.setKind(RowKind.UPDATE_BEFORE);
            out.collect(Tuple2.of(tableName, before));
 
            Row after = extractAfterRow(record, physicalConverter);
            after.setKind(RowKind.UPDATE_AFTER);
            out.collect(Tuple2.of(tableName, after));
        }
 
    }
 
    private Row extractAfterRow(SourceRecord record, DeserializationRuntimeConverter physicalConverter) throws Exception {
        Schema afterSchema = record.valueSchema().field(Envelope.FieldName.AFTER).schema();
        Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
        return (Row) physicalConverter.convert(after, afterSchema);
    }
 
    private Row extractBeforeRow(SourceRecord record, DeserializationRuntimeConverter physicalConverter) throws Exception {
        Schema beforeSchema = record.valueSchema().field(Envelope.FieldName.BEFORE).schema();
        Struct before = ((Struct) record.value()).getStruct(Envelope.FieldName.BEFORE);
        return (Row) physicalConverter.convert(before, beforeSchema);
    }
 
    @Override
    public TypeInformation<Tuple2<String, Row>> getProducedType() {
        return TypeInformation.of(new TypeHint<Tuple2<String, Row>>() {
        });
    }
 
    public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
        /**
         * datetime       TIMESTAMP_WITHOUT_TIME_ZONE
         * date           DATE
         * time           TIME_WITHOUT_TIME_ZONE
         * timestamp      TIMESTAMP_WITHOUT_TIME_ZONE
         */
        switch (type.getTypeRoot()) {
            case NULL:
                return convertToNull();
            case BOOLEAN:
                return convertToInt();
            case TINYINT:
                return convertToInt();
            case SMALLINT:
                return convertToInt();
            case INTEGER:
            case INTERVAL_YEAR_MONTH:
                return convertToInt();
            case BIGINT:
            case INTERVAL_DAY_TIME:
                return convertToLong();
            case DATE:
                return convertToDate();
            case TIME_WITHOUT_TIME_ZONE:
                return convertToTime();
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return convertToTimestamp(ZoneId.of("UTC+8"));
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC"));
            case FLOAT:
                return convertToFloat();
            case DOUBLE:
                return convertToDouble();
            case CHAR:
            case VARCHAR:
                return convertToString();
            case BINARY:
            case VARBINARY:
                return convertToBinary();
            case DECIMAL:
                return createDecimalConverter((DecimalType) type);
            case ROW:
                return createRowConverter((RowType) type);
            case ARRAY:
            case MAP:
            case MULTISET:
            case RAW:
            default:
                throw new UnsupportedOperationException("Unsupported type: " + type);
        }
    }
 
    private static DeserializationRuntimeConverter convertToNull() {
        return new DeserializationRuntimeConverter() {
 
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                return null;
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter convertToByte() {
        return new DeserializationRuntimeConverter() {
 
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                return Byte.parseByte(dbzObj.toString());
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter convertToBoolean() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Boolean) {
                    return dbzObj;
                } else if (dbzObj instanceof Byte) {
                    return (byte) dbzObj == 1;
                } else if (dbzObj instanceof Short) {
                    return (short) dbzObj == 1;
                } else {
                    return Boolean.parseBoolean(dbzObj.toString());
                }
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter convertToShort() {
        return new DeserializationRuntimeConverter() {
 
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                return Short.parseShort(dbzObj.toString());
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter convertToInt() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Integer) {
                    return dbzObj;
                } else if (dbzObj instanceof Long) {
                    return ((Long) dbzObj).intValue();
                } else {
                    return Integer.parseInt(dbzObj.toString());
                }
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter convertToLong() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Integer) {
                    return ((Integer) dbzObj).longValue();
                } else if (dbzObj instanceof Long) {
                    return dbzObj;
                } else {
                    return Long.parseLong(dbzObj.toString());
                }
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
        int precision = decimalType.getPrecision();
        int scale = decimalType.getScale();
        if(precision > 38){
            precision = 38;
        }
 
        if(scale > 10){
            scale = 10;
        }
 
        int finalPrecision = precision;
        int finalScale = scale;
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                BigDecimal bigDecimal;
                if (dbzObj instanceof byte[]) {
                    // decimal.handling.mode=precise
                    bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
                } else if (dbzObj instanceof String) {
                    // decimal.handling.mode=string
                    bigDecimal = new BigDecimal((String) dbzObj);
                } else if (dbzObj instanceof Double) {
                    // decimal.handling.mode=double
                    bigDecimal = BigDecimal.valueOf((Double) dbzObj);
                } else {
                    if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
                        SpecialValueDecimal decimal =
                                VariableScaleDecimal.toLogical((Struct) dbzObj);
                        bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
                    } else {
                        // fallback to string
                        bigDecimal = new BigDecimal(dbzObj.toString());
                    }
                }
                return DecimalData.fromBigDecimal(bigDecimal, finalPrecision, finalScale);
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter convertToDouble() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Float) {
                    return ((Float) dbzObj).doubleValue();
                } else if (dbzObj instanceof Double) {
                    return dbzObj;
                } else {
                    return Double.parseDouble(dbzObj.toString());
                }
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter convertToFloat() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Float) {
                    return dbzObj;
                } else if (dbzObj instanceof Double) {
                    return ((Double) dbzObj).floatValue();
                } else {
                    return Float.parseFloat(dbzObj.toString());
                }
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter convertToDate() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                return TemporalConversions.toLocalDate(dbzObj);
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter convertToTime() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Long) {
                    int seconds = 0;
                    switch (schema.name()) {
                        case MicroTime.SCHEMA_NAME:
                            seconds = (int) ((long) dbzObj / 1000);
                        case NanoTime.SCHEMA_NAME:
                            seconds = (int) ((long) dbzObj / 1000_000);
                    }
                    return LocalTime.ofSecondOfDay(seconds);
                } else if (dbzObj instanceof Integer) {
                    return LocalTime.ofSecondOfDay((Integer) dbzObj);
                }
                // get number of milliseconds of the day
                return TemporalConversions.toLocalTime(dbzObj);
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Long) {
                    switch (schema.name()) {
                        case Timestamp.SCHEMA_NAME:
                            return TimestampData.fromEpochMillis((Long) dbzObj);
                        case MicroTimestamp.SCHEMA_NAME:
                            long micro = (long) dbzObj;
                            return TimestampData.fromEpochMillis(
                                    micro / 1000, (int) (micro % 1000 * 1000));
                        case NanoTimestamp.SCHEMA_NAME:
                            long nano = (long) dbzObj;
                            return TimestampData.fromEpochMillis(
                                    nano / 1000_000, (int) (nano % 1000_000));
                    }
                }
                LocalDateTime localDateTime =
                        TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
                return TimestampData.fromLocalDateTime(localDateTime);
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(ZoneId serverTimeZone) {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof String) {
                    String str = (String) dbzObj;
                    // TIMESTAMP_LTZ type is encoded in string type
                    Instant instant = Instant.parse(str);
                    return TimestampData.fromLocalDateTime(
                            LocalDateTime.ofInstant(instant, serverTimeZone));
                }
                throw new IllegalArgumentException(
                        "Unable to convert to TimestampData from unexpected value '"
                                + dbzObj
                                + "' of type "
                                + dbzObj.getClass().getName());
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter convertToString() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                return StringData.fromString(dbzObj.toString());
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter convertToBinary() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof byte[]) {
                    return dbzObj;
                } else if (dbzObj instanceof ByteBuffer) {
                    ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    return bytes;
                } else {
                    throw new UnsupportedOperationException(
                            "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
                }
            }
 
        };
    }
 
    private static DeserializationRuntimeConverter createRowConverter(RowType rowType) {
        final DeserializationRuntimeConverter[] fieldConverters =
                rowType.getFields().stream()
                        .map(RowType.RowField::getType)
                        .map(MySQLDebeziumDeserializer::createNotNullConverter)
                        .toArray(DeserializationRuntimeConverter[]::new);
        final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
 
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
 
            @Override
            public Object convert(Object dbzObj, Schema schema) throws Exception {
                Struct struct = (Struct) dbzObj;
                int arity = fieldNames.length;
                Object[] fieldByPosition = new Object[arity];
                LinkedHashMap<String, Integer> positionByName = new LinkedHashMap<>();
                for (int i = 0; i < arity; i++) {
                    String fieldName = fieldNames[i];
                    Field field = schema.field(fieldName);
                    positionByName.put(fieldName, i);
                    if (field == null) {
                        fieldByPosition[i] = null;
                    } else {
                        Object fieldValue = struct.getWithoutDefault(fieldName);
                        Schema fieldSchema = schema.field(fieldName).schema();
                        Object convertedField = convertField(fieldConverters[i], fieldValue, fieldSchema);
                        fieldByPosition[i] = convertedField;
                    }
                }
                return RowUtils.createRowWithNamedPositions(null, fieldByPosition, positionByName);
            }
 
        };
    }
 
    private static Object convertField(DeserializationRuntimeConverter fieldConverter, Object fieldValue,
                                       Schema fieldSchema) throws Exception {
        if (fieldValue == null) {
            return null;
        } else {
            return fieldConverter.convert(fieldValue, fieldSchema);
        }
    }
 
}
4.自定义Greenplum 写入算子

自定义gp的写入算子,继承Flink RichSinkFunction 针对方法进行实现。

接收Flink收集器传送过来的数据,根据数据的changelog,表名称 自定义生成SQL语句,更新greenplum中的数据

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
 
import java.sql.*;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.stream.Collectors;
 
@Slf4j
public class DwSink extends RichSinkFunction<Tuple2<String, Row>> {
 
    private Connection connection = null;
    private PreparedStatement stmt = null;
    private Map<String, String> tableMapping;
    private String mapper = null;
    private String schema = null;
 
    private CdcPro cdcPro = null;
 
    public DwSink(CdcPro cdcPro) {
        this.cdcPro = cdcPro;
    }
 
    @Override
    public void open(Configuration parameters) throws Exception {
        //读取配置
        Properties properties = PropUtils.getProperties();
        String url = properties.getProperty("dw_jdbc");
        String username = properties.getProperty("dw_user");
        String password = properties.getProperty("dw_password");
        connection = DriverManager.getConnection(url, username, password);
        //初始化的表名映射关系
        String mapping = properties.getProperty("tableMapping");
        //正则匹配
        if (mapping.contains("{tableName}")) {
            mapper = mapping;
        } else {//Map匹配
            this.tableMapping = (Map) JacksonUtil.fromJson(mapping, Map.class);
        }
        schema = properties.getProperty("dw_schema");
        if (StringUtils.isBlank(schema)) {
            schema = "public";
        }
    }
 
    @Override
    public void invoke(Tuple2<String, Row> record, Context context) throws Exception {
        log.debug("invoke:" + record);
        Set<String> fieldNames = record.f1.getFieldNames(true);
        if (CollectionUtils.isEmpty(fieldNames)) {
            log.warn("fieldNames is empty, table:{}", record.f0);
            return;
        }
        if (RowKind.UPDATE_AFTER == record.f1.getKind() || RowKind.INSERT == record.f1.getKind()) {
            doInsert(record, fieldNames);
        } else {
            doDelete(record, fieldNames);
        }
    }
 
    /**
     * 删除的处理
     *
     * @param record     record
     * @param fieldNames fieldNames
     * @throws Exception e
     */
    private void doDelete(Tuple2<String, Row> record, Set<String> fieldNames) throws Exception {
        StringJoiner fieldNamesList = new StringJoiner(" and ");
        for (String fieldName : fieldNames) {
            if (record.f1.getField(fieldName) == null) {
                fieldNamesList.add(quoteIdentifier(coverColumn(fieldName)) + " is null");
            } else {
                fieldNamesList.add(quoteIdentifier(coverColumn(fieldName)) + " = ?");
            }
        }
        String sql = "delete from " + this.getMapperTableName(record.f0) + " where " + fieldNamesList;
        log.debug("del sql: {}", sql);
        PreparedStatement stmt = connection.prepareStatement(sql);
        int index = 1;
        for (String fieldName : fieldNames) {
            if (record.f1.getField(fieldName) != null) {
                handlePreparedStatement(stmt, record.f1.getField(fieldName), index);
                index++;
            }
        }
        stmt.execute();
    }
 
    /**
     * 插入数据的处理
     *
     * @param record     record
     * @param fieldNames fieldNames
     * @throws Exception e
     */
    private void doInsert(Tuple2<String, Row> record, Set<String> fieldNames) throws Exception {
        StringJoiner fieldNamesList = new StringJoiner(",");
        StringJoiner fieldParmList = new StringJoiner(",");
        for (String fieldName : fieldNames) {
            fieldNamesList.add(quoteIdentifier(coverColumn(fieldName)));
            fieldParmList.add("?");
        }
        String sql = "insert into " + this.getMapperTableName(record.f0) + " ( " + fieldNamesList + " )" +
                " values (" + fieldParmList + ")";
        log.info("insert sql: {}", sql);
        PreparedStatement stmt = connection.prepareStatement(sql);
        int index = 1;
        for (String fieldName : fieldNames) {
            Object field = record.f1.getField(fieldName);
            handlePreparedStatement(stmt, field, index);
            index++;
        }
        stmt.execute();
        stmt.close();
    }
 
    /**
     * 处理flink数据类型到java-jdbc的数据类型
     *
     * @param stmt  stmt
     * @param field field
     * @param index index
     * @throws Exception e
     */
    @Deprecated
    private void handlePreparedStatement(PreparedStatement stmt, Object field, int index) throws Exception {
//        if (field instanceof BinaryStringData) {
//            //JSON类型的数据序列化过来是BinaryStringData
//            PGobject pGobject = new PGobject();
//            pGobject.setType("json");
//            pGobject.setValue(field.toString());
//            stmt.setObject(index, pGobject);
//        } else
        if (field instanceof StringData) {
            stmt.setString(index, String.valueOf(field));
        } else if (field instanceof DecimalData) {
            stmt.setBigDecimal(index, ((DecimalData) field).toBigDecimal());
        } else if (field instanceof TimestampData) {
            stmt.setTimestamp(index, ((TimestampData) field).toTimestamp());
        } else if (field instanceof Short) {
            stmt.setShort(index, (Short) field);
        } else if (field instanceof Boolean) {
            stmt.setBoolean(index, (Boolean) field);
        } else if (field instanceof Integer) {
            stmt.setInt(index, (Integer) field);
        } else if (field instanceof Long) {
            stmt.setLong(index, (Long) field);
        } else if (field instanceof Double) {
            stmt.setDouble(index, (Double) field);
        } else if (field instanceof Float) {
            stmt.setFloat(index, (Float) field);
        } else if (field instanceof LocalDate) {
            stmt.setDate(index, Date.valueOf((LocalDate) field));
        } else if (field instanceof LocalTime) {
            LocalTime localTime = (LocalTime) field;
            stmt.setTime(index, new Time(localTime.getHour(), localTime.getMinute(), localTime.getSecond()));
        } else if (field instanceof byte[]) {
            stmt.setBytes(index, (byte[]) field);
        } else {
            stmt.setObject(index, field);
        }
    }
 
    private String quoteIdentifier(String identifier) {
        return "\"" + identifier + "\"";
    }
 
    private String coverColumn(String fileName){
        if(StringUtils.isNotEmpty(fileName) && StringUtils.isNotEmpty(cdcPro.getColumnConver())){
 
            if("uppercase".equalsIgnoreCase(cdcPro.getColumnConver())){
                return fileName.toUpperCase();
            }else if("lowercase".equalsIgnoreCase(cdcPro.getColumnConver())){
                return fileName.toLowerCase();
            }
        }
        return fileName;
    }
 
    @Override
    public void close() throws Exception {
        if (stmt != null) {
            stmt.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
 
    /**
     * 根据映射规则找到mysql表的对应dw的表
     *
     * @param originalTableName originalTableName
     * @return originalTableName
     */
    private String getMapperTableName(String originalTableName) {
        String dwTableName;
        if (!StringUtils.isBlank(mapper)) {
            dwTableName = mapper.replace("{tableName}", originalTableName);
        } else {
            dwTableName = tableMapping.get(originalTableName);
        }
        if (StringUtils.isBlank(dwTableName)) {
            log.error("mysql表: " + originalTableName + "没有找到对应的dw库所对应的表!");
        }
        return schema + "." + dwTableName;
    }
 
}
5.参数配置
#mysql--source
mysql_jdbc=jdbc:mysql://ip:3306/test?autoReconnect=true
mysql_userName=root
mysql_passWord=123456
#\u7528\u9017\u53F7\u5206\u9694\u8868\u540D\uFF0C\u5982\u679C\u662F\u8BE5\u5E93\u4E0B\u7684\u5168\u90E8\u8868\uFF0C\u5219\u7528.*\u5373\u53EF
mysql_tableList=.*
excluded_tables=t1,t2,table_decimal
#dw--sink
dw_jdbc=jdbc:postgresql://ip:5432/postgres?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false
dw_schema=ods_cdctest1
dw_user=gpadmin
dw_password=123456
#\u6620\u5C04\u5173\u7CFB\uFF0C\u53EF\u4EE5\u9009\u62E9map\u6620\u5C04\uFF0C\u4E5F\u53EF\u4EE5\u9009\u62E9\u4E0B\u9762\u6B63\u5219\u6620\u5C04\uFF0C\u4E8C\u9009\u4E00
tableMapping={tableName}
column_conver=lowercase
#\u6216\u8005\u5339\u914D\u6B63\u5219\uFF0C\u6BD4\u5982\uFF1A
#tableMapping=abc_{tableName}

copy_dw_schema=public
copy_dw_table=test
其他

如果需要捕获表结构变更,可以实现数据库结构变更事件

public void deserialize(SourceRecord record, Collector out) throws Exception {
        Struct value = (Struct) record.value();
        Schema keySchema = record.keySchema();
        //判断是否是数据库结构变更
        if(keySchema != null && keySchema.name().equals(SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME)){
            /**
             * 获取读取的schema 变更信息
             * 类似于
             * {"source":{"file":"mysql-bin.000079","pos":4598526,"server_id":123654},"position":{"transaction_id":null,"ts_sec":1698822218,"file":"mysql-bin.000079","pos":4598686,"server_id":123654},"databaseName":"test1","ddl":"create table table_name1\n(\n    id int null\n)","tableChanges":[]}
             *
             */
            String historyRecord = (String) value.get("historyRecord");
            Map recordMap = (Map) JacksonUtil.fromJson(historyRecord, Map.class);
            String ddl = recordMap.get("ddl").toString();
            String database = recordMap.get("databaseName").toString();
            out.collect(Tuple2.of(database,ddl));
        }
标签: flink 大数据

本文转载自: https://blog.csdn.net/h6lkj/article/details/136631913
版权归原作者 风卷残尘 所有, 如有侵权,请联系我们删除。

“Flink CDC整库同步”的评论:

还没有评论