背景
项目需要能够捕获外部数据源的数据变更,实时同步到目标数据库中,自动更新数据,实现源数据库和目标数据库所有表的数据同步更新,本文以mysql -> greenplumn场景记录实现方案。
实现
1.引入依赖
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.4.1</version>
</dependency>
<!-- mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.0</version>
</dependency>
2.创建FlinkCDCSource
创建FlinkCDC连接器,设置数据源的连接信息,日志捕获的起始时间点,读取并缓存数据源的元数据信息
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.DefaultCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import java.util.*;
import java.util.stream.Collectors;
/**
* 目标表是否自动创建
* 表结构变更
*/
@Slf4j
public class FlinkCdcMultiSyncJdbc {
public static CdcPro cdcPro;
public static MySqlCatalog mysqlCatalog;
public static List<String> tables;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
init();
// 监控mysql binlog
//创建 Flink-MySQL-CDC 的 Source
//initial (default): 在第一次启动时对被监视的数据库表执行初始快照,并继续读取最新的binlog (开启断点续传后从上次消费offset继续消费)
//latest-offset: 永远不要在第一次启动时对被监视的数据库表执行快照,只从binlog的末尾读取,这意味着只有自连接器启动以来的更改
//timestamp: 永远不要在第一次启动时对监视的数据库表执行快照,直接从指定的时间戳读取binlog。使用者将从头遍历binlog,并忽略时间戳小于指定时间戳的更改事件
//specific-offset: 不允许在第一次启动时对监视的数据库表进行快照,直接从指定的偏移量读取binlog。
Properties jdbcProperties = new Properties();
jdbcProperties.setProperty("time_zone", "+8:00");
jdbcProperties.setProperty("serverTimeZone", "Asia/Shanghai");
MySqlSource<Tuple2<String, Row>> mySqlSource = MySqlSource.<Tuple2<String, Row>>builder()
.hostname(cdcPro.getHost())
.port(cdcPro.getPort())
.databaseList(cdcPro.getDb())
.tableList(tables.toArray(new String[0]))
// .scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
.username(cdcPro.getUserName())
.password(cdcPro.getPassWord())
.jdbcProperties(jdbcProperties)
.deserializer(new MySQLDebeziumDeserializer(buildTableRowTypeMap(cdcPro, tables)))
.startupOptions(StartupOptions.latest())
.includeSchemaChanges(false)
.build();
SingleOutputStreamOperator<Tuple2<String, Row>> dataStreamSource = env.fromSource(mySqlSource,
WatermarkStrategy.noWatermarks(), "mysql cdc").disableChaining();
dataStreamSource.addSink(new DwSink(cdcPro));
env.execute("flinkcdc_mysql_dw");
}
private static void init() throws Exception {
cdcPro = SourceUtil.createCdcPro();
String source_url = String.format("jdbc:mysql://%s:%d", cdcPro.getHost(), cdcPro.getPort());
// 注册同步的库对应的catalog
mysqlCatalog = new MySqlCatalog(new ClassLoader() {
@Override
public Class<?> loadClass(String name) throws ClassNotFoundException {
return super.loadClass(name);
}
}, "mysql-catalog", cdcPro.getDb(), cdcPro.getUserName(), cdcPro.getPassWord(), source_url);
// List<String> dbs = mysqlCatalog.listDatabases();
tables = buildTables(cdcPro);
System.setProperty("HADOOP_USER_NAME", "root");
}
private static List<String> buildTables(CdcPro cdcPro) throws Exception {
// 如果整库同步,则从catalog里取所有表,否则从指定表中取表名
List<String> tables = new ArrayList<>();
if (".*".equals(cdcPro.getTableList())) {
tables = mysqlCatalog.listTables(cdcPro.getDb());
} else {
String[] tableArray = cdcPro.getTableList().split(",");
for (String table : tableArray) {
tables.add(table);
}
}
//过滤提出列表中的表清单
tables = tables.stream().filter(tableName->!cdcPro.getExcludeTables().contains(tableName)).map(t->cdcPro.getDb() + "." + t).collect(Collectors.toList());
log.info("cdc tables: \n" + tables);
return tables;
}
private static Map<String, RowType> buildTableRowTypeMap(CdcPro cdcPro, List<String> tables) throws Exception {
Map<String, RowType> tableRowTypeMap = new HashMap<>();
// Map<String, RowTypeInfo> tableTypeInformationMap = Maps.newConcurrentMap();
tables.parallelStream().forEach(table ->{
String tableName = StringUtils.replace(table, cdcPro.getDb() + ".", "");
// 获取mysql catalog中注册的表
ObjectPath objectPath = new ObjectPath(cdcPro.getDb(), tableName);
DefaultCatalogTable catalogBaseTable = null;
try {
catalogBaseTable = (DefaultCatalogTable) mysqlCatalog.getTable(objectPath);
} catch (TableNotExistException e) {
throw new RuntimeException(e);
}
// 获取表的Schema
Schema schema = catalogBaseTable.getUnresolvedSchema();
// 获取表中字段名列表和表字段类型
String[] fieldNames = new String[schema.getColumns().size()];
LogicalType[] logicalTypes = new LogicalType[schema.getColumns().size()];
for (int i = 0; i < schema.getColumns().size(); i++) {
Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) schema.getColumns().get(i);
fieldNames[i] = column.getName();
logicalTypes[i] = ((DataType) column.getDataType()).getLogicalType();
}
tableRowTypeMap.put(tableName, RowType.of(logicalTypes, fieldNames));
});
return tableRowTypeMap;
}
}
3.创建数据解析器
创建自定义数据解析器,根据FlinkCDC捕获的日志内容,解析出变更的表、数据、操作类型。
自定义数据转换器,将日志解析数据转换成为Flink数据类型
设置Flink数据的Changelog,表和数据组成二元组,放入Flink收集器中。
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
import com.ververica.cdc.debezium.utils.TemporalConversions;
import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.time.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import org.apache.flink.table.catalog.DefaultCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.RowUtils;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
@Slf4j
public class MySQLDebeziumDeserializer implements DebeziumDeserializationSchema<Tuple2<String, Row>> {
private final Map<String, RowType> tableRowTypeMap;
private final Map<String, DeserializationRuntimeConverter> physicalConverterMap = Maps.newConcurrentMap();
MySQLDebeziumDeserializer(Map<String, RowType> tableRowTypeMap) {
this.tableRowTypeMap = tableRowTypeMap;
for (String tableName : this.tableRowTypeMap.keySet()) {
RowType rowType = this.tableRowTypeMap.get(tableName);
DeserializationRuntimeConverter physicalConverter = createNotNullConverter(rowType);
this.physicalConverterMap.put(tableName, physicalConverter);
}
}
@Override
public void deserialize(SourceRecord record, Collector out) throws Exception {
Struct value = (Struct) record.value();
// log.info("value:{}", value);
String tableName = value.getStruct("source").get("table").toString();
//如果该表不在初始化的列表中,过滤该表的操作
if(!physicalConverterMap.containsKey(tableName)){
return;
}
DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tableName);
Envelope.Operation op = Envelope.operationFor(record);
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
Row insert = extractAfterRow(record, physicalConverter);
insert.setKind(RowKind.INSERT);
out.collect(Tuple2.of(tableName, insert));
} else if (op == Envelope.Operation.DELETE) {
Row delete = extractBeforeRow(record, physicalConverter);
delete.setKind(RowKind.DELETE);
out.collect(Tuple2.of(tableName, delete));
} else if (op == Envelope.Operation.UPDATE) {
Row before = extractBeforeRow(record, physicalConverter);
before.setKind(RowKind.UPDATE_BEFORE);
out.collect(Tuple2.of(tableName, before));
Row after = extractAfterRow(record, physicalConverter);
after.setKind(RowKind.UPDATE_AFTER);
out.collect(Tuple2.of(tableName, after));
}
}
private Row extractAfterRow(SourceRecord record, DeserializationRuntimeConverter physicalConverter) throws Exception {
Schema afterSchema = record.valueSchema().field(Envelope.FieldName.AFTER).schema();
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
return (Row) physicalConverter.convert(after, afterSchema);
}
private Row extractBeforeRow(SourceRecord record, DeserializationRuntimeConverter physicalConverter) throws Exception {
Schema beforeSchema = record.valueSchema().field(Envelope.FieldName.BEFORE).schema();
Struct before = ((Struct) record.value()).getStruct(Envelope.FieldName.BEFORE);
return (Row) physicalConverter.convert(before, beforeSchema);
}
@Override
public TypeInformation<Tuple2<String, Row>> getProducedType() {
return TypeInformation.of(new TypeHint<Tuple2<String, Row>>() {
});
}
public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
/**
* datetime TIMESTAMP_WITHOUT_TIME_ZONE
* date DATE
* time TIME_WITHOUT_TIME_ZONE
* timestamp TIMESTAMP_WITHOUT_TIME_ZONE
*/
switch (type.getTypeRoot()) {
case NULL:
return convertToNull();
case BOOLEAN:
return convertToInt();
case TINYINT:
return convertToInt();
case SMALLINT:
return convertToInt();
case INTEGER:
case INTERVAL_YEAR_MONTH:
return convertToInt();
case BIGINT:
case INTERVAL_DAY_TIME:
return convertToLong();
case DATE:
return convertToDate();
case TIME_WITHOUT_TIME_ZONE:
return convertToTime();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return convertToTimestamp(ZoneId.of("UTC+8"));
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC"));
case FLOAT:
return convertToFloat();
case DOUBLE:
return convertToDouble();
case CHAR:
case VARCHAR:
return convertToString();
case BINARY:
case VARBINARY:
return convertToBinary();
case DECIMAL:
return createDecimalConverter((DecimalType) type);
case ROW:
return createRowConverter((RowType) type);
case ARRAY:
case MAP:
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
private static DeserializationRuntimeConverter convertToNull() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return null;
}
};
}
private static DeserializationRuntimeConverter convertToByte() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Byte.parseByte(dbzObj.toString());
}
};
}
private static DeserializationRuntimeConverter convertToBoolean() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Boolean) {
return dbzObj;
} else if (dbzObj instanceof Byte) {
return (byte) dbzObj == 1;
} else if (dbzObj instanceof Short) {
return (short) dbzObj == 1;
} else {
return Boolean.parseBoolean(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToShort() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Short.parseShort(dbzObj.toString());
}
};
}
private static DeserializationRuntimeConverter convertToInt() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return dbzObj;
} else if (dbzObj instanceof Long) {
return ((Long) dbzObj).intValue();
} else {
return Integer.parseInt(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToLong() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return ((Integer) dbzObj).longValue();
} else if (dbzObj instanceof Long) {
return dbzObj;
} else {
return Long.parseLong(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
int precision = decimalType.getPrecision();
int scale = decimalType.getScale();
if(precision > 38){
precision = 38;
}
if(scale > 10){
scale = 10;
}
int finalPrecision = precision;
int finalScale = scale;
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
BigDecimal bigDecimal;
if (dbzObj instanceof byte[]) {
// decimal.handling.mode=precise
bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
} else if (dbzObj instanceof String) {
// decimal.handling.mode=string
bigDecimal = new BigDecimal((String) dbzObj);
} else if (dbzObj instanceof Double) {
// decimal.handling.mode=double
bigDecimal = BigDecimal.valueOf((Double) dbzObj);
} else {
if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
SpecialValueDecimal decimal =
VariableScaleDecimal.toLogical((Struct) dbzObj);
bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
} else {
// fallback to string
bigDecimal = new BigDecimal(dbzObj.toString());
}
}
return DecimalData.fromBigDecimal(bigDecimal, finalPrecision, finalScale);
}
};
}
private static DeserializationRuntimeConverter convertToDouble() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return ((Float) dbzObj).doubleValue();
} else if (dbzObj instanceof Double) {
return dbzObj;
} else {
return Double.parseDouble(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToFloat() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return dbzObj;
} else if (dbzObj instanceof Double) {
return ((Double) dbzObj).floatValue();
} else {
return Float.parseFloat(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToDate() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return TemporalConversions.toLocalDate(dbzObj);
}
};
}
private static DeserializationRuntimeConverter convertToTime() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
int seconds = 0;
switch (schema.name()) {
case MicroTime.SCHEMA_NAME:
seconds = (int) ((long) dbzObj / 1000);
case NanoTime.SCHEMA_NAME:
seconds = (int) ((long) dbzObj / 1000_000);
}
return LocalTime.ofSecondOfDay(seconds);
} else if (dbzObj instanceof Integer) {
return LocalTime.ofSecondOfDay((Integer) dbzObj);
}
// get number of milliseconds of the day
return TemporalConversions.toLocalTime(dbzObj);
}
};
}
private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case Timestamp.SCHEMA_NAME:
return TimestampData.fromEpochMillis((Long) dbzObj);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
return TimestampData.fromEpochMillis(
micro / 1000, (int) (micro % 1000 * 1000));
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromEpochMillis(
nano / 1000_000, (int) (nano % 1000_000));
}
}
LocalDateTime localDateTime =
TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
return TimestampData.fromLocalDateTime(localDateTime);
}
};
}
private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(ZoneId serverTimeZone) {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof String) {
String str = (String) dbzObj;
// TIMESTAMP_LTZ type is encoded in string type
Instant instant = Instant.parse(str);
return TimestampData.fromLocalDateTime(
LocalDateTime.ofInstant(instant, serverTimeZone));
}
throw new IllegalArgumentException(
"Unable to convert to TimestampData from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());
}
};
}
private static DeserializationRuntimeConverter convertToString() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return StringData.fromString(dbzObj.toString());
}
};
}
private static DeserializationRuntimeConverter convertToBinary() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof byte[]) {
return dbzObj;
} else if (dbzObj instanceof ByteBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return bytes;
} else {
throw new UnsupportedOperationException(
"Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
}
}
};
}
private static DeserializationRuntimeConverter createRowConverter(RowType rowType) {
final DeserializationRuntimeConverter[] fieldConverters =
rowType.getFields().stream()
.map(RowType.RowField::getType)
.map(MySQLDebeziumDeserializer::createNotNullConverter)
.toArray(DeserializationRuntimeConverter[]::new);
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) throws Exception {
Struct struct = (Struct) dbzObj;
int arity = fieldNames.length;
Object[] fieldByPosition = new Object[arity];
LinkedHashMap<String, Integer> positionByName = new LinkedHashMap<>();
for (int i = 0; i < arity; i++) {
String fieldName = fieldNames[i];
Field field = schema.field(fieldName);
positionByName.put(fieldName, i);
if (field == null) {
fieldByPosition[i] = null;
} else {
Object fieldValue = struct.getWithoutDefault(fieldName);
Schema fieldSchema = schema.field(fieldName).schema();
Object convertedField = convertField(fieldConverters[i], fieldValue, fieldSchema);
fieldByPosition[i] = convertedField;
}
}
return RowUtils.createRowWithNamedPositions(null, fieldByPosition, positionByName);
}
};
}
private static Object convertField(DeserializationRuntimeConverter fieldConverter, Object fieldValue,
Schema fieldSchema) throws Exception {
if (fieldValue == null) {
return null;
} else {
return fieldConverter.convert(fieldValue, fieldSchema);
}
}
}
4.自定义Greenplum 写入算子
自定义gp的写入算子,继承Flink RichSinkFunction 针对方法进行实现。
接收Flink收集器传送过来的数据,根据数据的changelog,表名称 自定义生成SQL语句,更新greenplum中的数据
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import java.sql.*;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.stream.Collectors;
@Slf4j
public class DwSink extends RichSinkFunction<Tuple2<String, Row>> {
private Connection connection = null;
private PreparedStatement stmt = null;
private Map<String, String> tableMapping;
private String mapper = null;
private String schema = null;
private CdcPro cdcPro = null;
public DwSink(CdcPro cdcPro) {
this.cdcPro = cdcPro;
}
@Override
public void open(Configuration parameters) throws Exception {
//读取配置
Properties properties = PropUtils.getProperties();
String url = properties.getProperty("dw_jdbc");
String username = properties.getProperty("dw_user");
String password = properties.getProperty("dw_password");
connection = DriverManager.getConnection(url, username, password);
//初始化的表名映射关系
String mapping = properties.getProperty("tableMapping");
//正则匹配
if (mapping.contains("{tableName}")) {
mapper = mapping;
} else {//Map匹配
this.tableMapping = (Map) JacksonUtil.fromJson(mapping, Map.class);
}
schema = properties.getProperty("dw_schema");
if (StringUtils.isBlank(schema)) {
schema = "public";
}
}
@Override
public void invoke(Tuple2<String, Row> record, Context context) throws Exception {
log.debug("invoke:" + record);
Set<String> fieldNames = record.f1.getFieldNames(true);
if (CollectionUtils.isEmpty(fieldNames)) {
log.warn("fieldNames is empty, table:{}", record.f0);
return;
}
if (RowKind.UPDATE_AFTER == record.f1.getKind() || RowKind.INSERT == record.f1.getKind()) {
doInsert(record, fieldNames);
} else {
doDelete(record, fieldNames);
}
}
/**
* 删除的处理
*
* @param record record
* @param fieldNames fieldNames
* @throws Exception e
*/
private void doDelete(Tuple2<String, Row> record, Set<String> fieldNames) throws Exception {
StringJoiner fieldNamesList = new StringJoiner(" and ");
for (String fieldName : fieldNames) {
if (record.f1.getField(fieldName) == null) {
fieldNamesList.add(quoteIdentifier(coverColumn(fieldName)) + " is null");
} else {
fieldNamesList.add(quoteIdentifier(coverColumn(fieldName)) + " = ?");
}
}
String sql = "delete from " + this.getMapperTableName(record.f0) + " where " + fieldNamesList;
log.debug("del sql: {}", sql);
PreparedStatement stmt = connection.prepareStatement(sql);
int index = 1;
for (String fieldName : fieldNames) {
if (record.f1.getField(fieldName) != null) {
handlePreparedStatement(stmt, record.f1.getField(fieldName), index);
index++;
}
}
stmt.execute();
}
/**
* 插入数据的处理
*
* @param record record
* @param fieldNames fieldNames
* @throws Exception e
*/
private void doInsert(Tuple2<String, Row> record, Set<String> fieldNames) throws Exception {
StringJoiner fieldNamesList = new StringJoiner(",");
StringJoiner fieldParmList = new StringJoiner(",");
for (String fieldName : fieldNames) {
fieldNamesList.add(quoteIdentifier(coverColumn(fieldName)));
fieldParmList.add("?");
}
String sql = "insert into " + this.getMapperTableName(record.f0) + " ( " + fieldNamesList + " )" +
" values (" + fieldParmList + ")";
log.info("insert sql: {}", sql);
PreparedStatement stmt = connection.prepareStatement(sql);
int index = 1;
for (String fieldName : fieldNames) {
Object field = record.f1.getField(fieldName);
handlePreparedStatement(stmt, field, index);
index++;
}
stmt.execute();
stmt.close();
}
/**
* 处理flink数据类型到java-jdbc的数据类型
*
* @param stmt stmt
* @param field field
* @param index index
* @throws Exception e
*/
@Deprecated
private void handlePreparedStatement(PreparedStatement stmt, Object field, int index) throws Exception {
// if (field instanceof BinaryStringData) {
// //JSON类型的数据序列化过来是BinaryStringData
// PGobject pGobject = new PGobject();
// pGobject.setType("json");
// pGobject.setValue(field.toString());
// stmt.setObject(index, pGobject);
// } else
if (field instanceof StringData) {
stmt.setString(index, String.valueOf(field));
} else if (field instanceof DecimalData) {
stmt.setBigDecimal(index, ((DecimalData) field).toBigDecimal());
} else if (field instanceof TimestampData) {
stmt.setTimestamp(index, ((TimestampData) field).toTimestamp());
} else if (field instanceof Short) {
stmt.setShort(index, (Short) field);
} else if (field instanceof Boolean) {
stmt.setBoolean(index, (Boolean) field);
} else if (field instanceof Integer) {
stmt.setInt(index, (Integer) field);
} else if (field instanceof Long) {
stmt.setLong(index, (Long) field);
} else if (field instanceof Double) {
stmt.setDouble(index, (Double) field);
} else if (field instanceof Float) {
stmt.setFloat(index, (Float) field);
} else if (field instanceof LocalDate) {
stmt.setDate(index, Date.valueOf((LocalDate) field));
} else if (field instanceof LocalTime) {
LocalTime localTime = (LocalTime) field;
stmt.setTime(index, new Time(localTime.getHour(), localTime.getMinute(), localTime.getSecond()));
} else if (field instanceof byte[]) {
stmt.setBytes(index, (byte[]) field);
} else {
stmt.setObject(index, field);
}
}
private String quoteIdentifier(String identifier) {
return "\"" + identifier + "\"";
}
private String coverColumn(String fileName){
if(StringUtils.isNotEmpty(fileName) && StringUtils.isNotEmpty(cdcPro.getColumnConver())){
if("uppercase".equalsIgnoreCase(cdcPro.getColumnConver())){
return fileName.toUpperCase();
}else if("lowercase".equalsIgnoreCase(cdcPro.getColumnConver())){
return fileName.toLowerCase();
}
}
return fileName;
}
@Override
public void close() throws Exception {
if (stmt != null) {
stmt.close();
}
if (connection != null) {
connection.close();
}
}
/**
* 根据映射规则找到mysql表的对应dw的表
*
* @param originalTableName originalTableName
* @return originalTableName
*/
private String getMapperTableName(String originalTableName) {
String dwTableName;
if (!StringUtils.isBlank(mapper)) {
dwTableName = mapper.replace("{tableName}", originalTableName);
} else {
dwTableName = tableMapping.get(originalTableName);
}
if (StringUtils.isBlank(dwTableName)) {
log.error("mysql表: " + originalTableName + "没有找到对应的dw库所对应的表!");
}
return schema + "." + dwTableName;
}
}
5.参数配置
#mysql--source
mysql_jdbc=jdbc:mysql://ip:3306/test?autoReconnect=true
mysql_userName=root
mysql_passWord=123456
#\u7528\u9017\u53F7\u5206\u9694\u8868\u540D\uFF0C\u5982\u679C\u662F\u8BE5\u5E93\u4E0B\u7684\u5168\u90E8\u8868\uFF0C\u5219\u7528.*\u5373\u53EF
mysql_tableList=.*
excluded_tables=t1,t2,table_decimal
#dw--sink
dw_jdbc=jdbc:postgresql://ip:5432/postgres?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false
dw_schema=ods_cdctest1
dw_user=gpadmin
dw_password=123456
#\u6620\u5C04\u5173\u7CFB\uFF0C\u53EF\u4EE5\u9009\u62E9map\u6620\u5C04\uFF0C\u4E5F\u53EF\u4EE5\u9009\u62E9\u4E0B\u9762\u6B63\u5219\u6620\u5C04\uFF0C\u4E8C\u9009\u4E00
tableMapping={tableName}
column_conver=lowercase
#\u6216\u8005\u5339\u914D\u6B63\u5219\uFF0C\u6BD4\u5982\uFF1A
#tableMapping=abc_{tableName}
copy_dw_schema=public
copy_dw_table=test
其他
如果需要捕获表结构变更,可以实现数据库结构变更事件
public void deserialize(SourceRecord record, Collector out) throws Exception {
Struct value = (Struct) record.value();
Schema keySchema = record.keySchema();
//判断是否是数据库结构变更
if(keySchema != null && keySchema.name().equals(SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME)){
/**
* 获取读取的schema 变更信息
* 类似于
* {"source":{"file":"mysql-bin.000079","pos":4598526,"server_id":123654},"position":{"transaction_id":null,"ts_sec":1698822218,"file":"mysql-bin.000079","pos":4598686,"server_id":123654},"databaseName":"test1","ddl":"create table table_name1\n(\n id int null\n)","tableChanges":[]}
*
*/
String historyRecord = (String) value.get("historyRecord");
Map recordMap = (Map) JacksonUtil.fromJson(historyRecord, Map.class);
String ddl = recordMap.get("ddl").toString();
String database = recordMap.get("databaseName").toString();
out.collect(Tuple2.of(database,ddl));
}
版权归原作者 风卷残尘 所有, 如有侵权,请联系我们删除。