文章目录
MySQL CDC配置
第一步: 启用binlog
1. 检查MySQL的binlog是否已启用
show variables like '%log_bin%';
2. 若未启用binlog
- 打开MySQL配置文件my.cnf(MySQL安装目录的etc文件夹下)
- 找到[mysqld]部分,添加如下配置
log-bin=mysql-bin # 指定二进制日志文件的名称前缀server-id=1# 唯一标识MySQL服务器的数字expire_logs_days=30# binlog日志过期时间(按实际情况配置)
- 保存并关闭配置文件, 并重启MySQL服务使配置生效
sudo systemctl restart mysqld
第二步: 设置binlog格式为row
因为要监控表记录变更前后的具体数据, 需要将binlog格式设置为row.
1. 确保MySQL的binlog格式设置为ROW
show variables like '%binlog_format%';
2. 若未设置为row
- 打开MySQL配置文件my.cnf(MySQL安装目录的etc文件夹下)
- 找到[mysqld]部分,添加如下配置
binlog_format=ROW
- 保存并关闭配置文件, 并重启MySQL服务使配置生效
sudo systemctl restart mysqld
第三步: 创建CDC用户
创建一个具备合适权限的MySQL用户, 使得Debezium MySQL connector可以监控数据库的变化.
- 创建MySQL用户, 用于Flink CDC连接到MySQL数据库
CREATEUSER'flinkcdc'@'%' IDENTIFIED BY'FlinkCDC_123456';
- 授予该用户适当的权限以访问要采集的数据库和表。
GRANTSELECT,SHOWDATABASES,REPLICATION SLAVE,REPLICATION CLIENT ON*.*TO'flinkcdc' IDENTIFIED BY'FlinkCDC_123456';
- 使权限生效
FLUSH PRIVILEGES;
MySQL CDC DataStream API实现
所使用软件的版本
- java 1.8
- Scala 2.11
- Flink 1.14.2
- Flink CDC 2.3.0
- Source MySQL 5.7
- Sink MySQL 5.7
- jackson 2.10.2
MySQL CDC DataStream API可实现一个job监控采集多个数据库、多个表.
1. 定义MySqlSource
//源数据库连接配置文件Properties dbProps =DbConfigUtil.loadConfig("mysql.properties");//Debezium配置Properties debeziumProps =newProperties();//decimal.handling.mode指定connector如何处理DECIMAL和NUMERIC列的值,有3种模式:precise、double和string//precise(默认值):以二进制形式在变更事件中精确表示它们,使用java.math.BigDecimal值来表示(此种模式采集会将DECIMAL和NUMERIC列转成二进制格式,不易读,不便于数据处理)//以double值来表示它们,这可能会到值精度丢失//string:将值编码为格式化的字符串,易于下游消费,但会丢失有关实际类型的语义信息。(建议使用此种模式,便于下游进行数据处理)
debeziumProps.setProperty("decimal.handling.mode","string");//Time、date和timestamps可以以不同的精度表示,包括://adaptive_time_microseconds(默认值):精确地捕获date、datetime和timestamp的值,使用毫秒、微秒或纳秒精度值,具体取决于数据库列的类型,但 TIME 类型字段除外,它们始终以微秒表示。//adaptive(不建议使用):以数据库列类型为基础,精确地捕获时间和时间戳值,使用毫秒、微秒或纳秒精度值。//connect:总是使用 Kafka Connect 内置的 Time、Date 和 Timestamp 表示法表示时间和时间戳值,无论数据库列的精度如何,都使用毫秒精度。
debeziumProps.setProperty("time.precision.mode","connect");//MySQL CDC数据源MySqlSource<String> sourceFunction =MySqlSource.<String>builder().hostname(dbProps.getProperty("host")).port(Integer.parseInt(dbProps.getProperty("port"))).databaseList(dbProps.getProperty("database_list").split(",")).tableList(dbProps.getProperty("table_list").split(",")).username(dbProps.getProperty("username")).password(dbProps.getProperty("password")).connectionPoolSize(2).serverTimeZone("Asia/Shanghai").debeziumProperties(debeziumProps).deserializer(newJsonDebeziumDeserializationSchema()).serverId("6001").startupOptions(StartupOptions.initial()).build();
2. 数据处理
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);// 启用Checkpoint
env.enableCheckpointing(60000);// 默认即为EXACTLY_ONCE。设置Checkpoint模式为EXACTLY_ONCE,每条记录在恢复的时候都是精确一次地处理的
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置状态后端
env.setStateBackend(newHashMapStateBackend());// 设置Checkpoint状态存储系统及目录
env.getCheckpointConfig().setCheckpointStorage("hdfs://ns/flink/checkpoint/mysql_cdc");// 两次Checkpoint之间的最小暂停时间是500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// checkpoints必须在指定的时间内完成,否则被丢弃
env.getCheckpointConfig().setCheckpointTimeout(60000);//只允许checkpoint连续失败两次
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);// 设置最大并行运行的Checkpoint数量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 在作业取消时保留外部检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 启用非对齐Checkpoint,可以极大减少背压情况的下Checkpoint次数
env.getCheckpointConfig().enableUnalignedCheckpoints();//获取数据源SingleOutputStreamOperator<String> dataStreamSource = env
.addSource(sourceFunction).uid("source-01").name("read-from-source");ObjectMapper mapper =newObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);
mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES,true);//JSON字符串转JsonNodeSingleOutputStreamOperator<JsonNode> dataStreamJsonNode = dataStreamSource
.map(line -> mapper.readTree(line)).uid("map-01").name("source-to-JsonNode");// 从监控的多个表中过滤出'订单表', 并解析Json的after数据SingleOutputStreamOperator<OrderInfo> orderOperator = dataStreamJsonNode
.filter(line ->"order_info".equalsIgnoreCase(line.get("source").get("table").asText())).uid("order-info-filter-01").name("filter-order-info").map(line -> line.get("after").toString()).uid("order-info-map-01").name("parse-order-info-after").map(line -> mapper.readValue(line,OrderInfo.class)).uid("order-info-map-02").name("order-info-to-pojo");
3. sink到MySQL
// 定义JdbcSinkSinkFunction<OrderInfo> orderInfoSink =JdbcSink.sink(UPSERT_SQL,(JdbcStatementBuilder<OrderInfo>)(ps, order)->newOrderInfoPreparedStatementSetter().setParams(ps, order),JdbcExecutionOptions.builder().withBatchSize(100).withBatchIntervalMs(2000).withMaxRetries(3).build(),JdbcSinkConnUtil.getConnOptions("sink-mysql.properties"));
orderOperator.addSink(orderInfoSink);
参考
版权归原作者 Southwest- 所有, 如有侵权,请联系我们删除。