0


Flink CDC 读取MySQL的数据

1、前提背景准备

Flink在1.11之后就已经支持从MySQL增量读取Binlog日志的方式。

pom文件如下:

  1. <properties>
  2. <scala.binary.version>2.11</scala.binary.version>
  3. <scala.version>2.11.12</scala.version>
  4. <flink.version>1.12.0</flink.version>
  5. <fastjson.verson>1.2.72</fastjson.verson>
  6. <lombok.version>1.18.6</lombok.version>
  7. <kafka.version>2.3.0</kafka.version>
  8. </properties>
  9. <dependencies>
  10. <dependency>
  11. <groupId>com.github.shyiko</groupId>
  12. <artifactId>mysql-binlog-connector-java</artifactId>
  13. <version>0.21.0</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.apache.kafka</groupId>
  17. <artifactId>kafka-clients</artifactId>
  18. <version>${kafka.version}</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.flink</groupId>
  22. <artifactId>flink-connector-kafka_2.11</artifactId>
  23. <version>${flink.version}</version>
  24. <exclusions>
  25. <exclusion>
  26. <groupId>log4j</groupId>
  27. <artifactId>*</artifactId>
  28. </exclusion>
  29. <exclusion>
  30. <groupId>org.slf4j</groupId>
  31. <artifactId>slf4j-log4j12</artifactId>
  32. </exclusion>
  33. </exclusions>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.projectlombok</groupId>
  37. <artifactId>lombok</artifactId>
  38. <version>${lombok.version}</version>
  39. <scope>provided</scope>
  40. </dependency>
  41. <dependency>
  42. <groupId>com.alibaba</groupId>
  43. <artifactId>fastjson</artifactId>
  44. <version>${fastjson.verson}</version>
  45. </dependency>
  46. <dependency>
  47. <groupId>com.alibaba.ververica</groupId>
  48. <artifactId>flink-connector-mysql-cdc</artifactId>
  49. <version>1.4.0</version>
  50. </dependency>
  51. <dependency>
  52. <groupId>org.apache.flink</groupId>
  53. <artifactId>flink-java</artifactId>
  54. <version>${flink.version}</version>
  55. <!--<scope>provided</scope>-->
  56. <exclusions>
  57. <exclusion>
  58. <groupId>log4j</groupId>
  59. <artifactId>*</artifactId>
  60. </exclusion>
  61. <exclusion>
  62. <groupId>org.slf4j</groupId>
  63. <artifactId>slf4j-log4j12</artifactId>
  64. </exclusion>
  65. </exclusions>
  66. </dependency>
  67. <dependency>
  68. <groupId>org.apache.flink</groupId>
  69. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  70. <version>${flink.version}</version>
  71. <!--<scope>provided</scope>-->
  72. <exclusions>
  73. <exclusion>
  74. <groupId>log4j</groupId>
  75. <artifactId>*</artifactId>
  76. </exclusion>
  77. <exclusion>
  78. <groupId>org.slf4j</groupId>
  79. <artifactId>slf4j-log4j12</artifactId>
  80. </exclusion>
  81. <exclusion>
  82. <groupId>com.google.code.findbugs</groupId>
  83. <artifactId>jsr305</artifactId>
  84. </exclusion>
  85. <exclusion>
  86. <groupId>org.apache.flink</groupId>
  87. <artifactId>force-shading</artifactId>
  88. </exclusion>
  89. </exclusions>
  90. </dependency>
  91. <dependency>
  92. <groupId>org.apache.flink</groupId>
  93. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  94. <version>${flink.version}</version>
  95. <!--<scope>provided</scope>-->
  96. </dependency>
  97. <dependency>
  98. <groupId>org.apache.flink</groupId>
  99. <artifactId>flink-scala_${scala.binary.version}</artifactId>
  100. <version>${flink.version}</version>
  101. </dependency>
  102. <dependency>
  103. <groupId>org.apache.flink</groupId>
  104. <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  105. <version>${flink.version}</version>
  106. <exclusions>
  107. <exclusion>
  108. <groupId>log4j</groupId>
  109. <artifactId>*</artifactId>
  110. </exclusion>
  111. <exclusion>
  112. <groupId>org.slf4j</groupId>
  113. <artifactId>slf4j-log4j12</artifactId>
  114. </exclusion>
  115. </exclusions>
  116. </dependency>
  117. </dependencies>

2、全量读取某个数据库中的所有库中的所有表的Binlog方式代码如下:

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. Properties properties = new Properties();
  4. // 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
  5. properties.setProperty("debezium.snapshot.locking.mode", "none");
  6. env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));
  7. CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  8. //最大同时存在的ck数 和设置的间隔时间有一个就行
  9. checkpointConfig.setMaxConcurrentCheckpoints(1);
  10. //超时时间
  11. checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
  12. //2.3 指定从CK自动重启策略
  13. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
  14. //2.4 设置任务关闭的时候保留最后一次CK数据
  15. checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  16. DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
  17. .hostname("100.21.112.11")
  18. .port(3306)
  19. .deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
  20. .username("root")
  21. .password("xxxx")
  22. .startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
  23. .debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
  24. .build();
  25. /*
  26. * .startupOptions(StartupOptions.latest()) 参数配置
  27. * 1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
  28. * 2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
  29. * 3.latest() 从最新的binlog开始读取
  30. * 4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
  31. * 5.timestamp(long startupTimestampMillis) 指定时间戳读取
  32. * */
  33. env.addSource(MysqlSource).print();
  34. env.execute("flink-cdc");
  35. }
  36. public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
  37. @Override
  38. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  39. Struct value = (Struct) sourceRecord.value();
  40. Struct after = value.getStruct("after");
  41. Struct source = value.getStruct("source");
  42. String db = source.getString("db");//库名
  43. String table = source.getString("table");//表名
  44. //获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
  45. /* READ("r"),
  46. CREATE("c"),
  47. UPDATE("u"),
  48. DELETE("d");*/
  49. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  50. String opstr = operation.toString().toLowerCase();
  51. //类型修正 会把insert识别成create
  52. if (opstr.equals("create")) {
  53. opstr = "insert";
  54. }
  55. //获取after结构体里面的表数据,封装成json输出
  56. JSONObject json1 = new JSONObject();
  57. JSONObject json2 = new JSONObject();
  58. //加个判空
  59. if (after != null) {
  60. List<Field> data = after.schema().fields(); //获取结构体
  61. for (Field field : data) {
  62. String name = field.name(); //结构体的名字
  63. Object value2 = after.get(field);//结构体的字段值
  64. //放进json2里面去 json2放到json1里面去
  65. json2.put(name, value2);
  66. }
  67. }
  68. //整理成大json串输出
  69. json1.put("db", db);
  70. json1.put("table", table);
  71. json1.put("data", json2);
  72. json1.put("type", opstr);
  73. collector.collect(json1.toJSONString());
  74. }
  75. @Override
  76. public TypeInformation<String> getProducedType() {
  77. return TypeInformation.of(String.class);
  78. }
  79. }

3、全量读取某个数据库指定DB中的所有表

可以在build之前 ,添加一个

  1. databaseList,用来指定特定的DB
  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. Properties properties = new Properties();
  4. // 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
  5. properties.setProperty("debezium.snapshot.locking.mode", "none");
  6. env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));
  7. CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  8. //最大同时存在的ck数 和设置的间隔时间有一个就行
  9. checkpointConfig.setMaxConcurrentCheckpoints(1);
  10. //超时时间
  11. checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
  12. //2.3 指定从CK自动重启策略
  13. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
  14. //2.4 设置任务关闭的时候保留最后一次CK数据
  15. checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  16. DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
  17. .hostname("100.21.112.11")
  18. .port(3306)
  19. .deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
  20. .username("root")
  21. .password("xxxx")
  22. .databaseList("horse") // 指定某个特定的库
  23. .startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
  24. .debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
  25. .build();
  26. /*
  27. * .startupOptions(StartupOptions.latest()) 参数配置
  28. * 1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
  29. * 2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
  30. * 3.latest() 从最新的binlog开始读取
  31. * 4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
  32. * 5.timestamp(long startupTimestampMillis) 指定时间戳读取
  33. * */
  34. env.addSource(MysqlSource).print();
  35. env.execute("flink-cdc");
  36. }
  37. public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
  38. @Override
  39. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  40. Struct value = (Struct) sourceRecord.value();
  41. Struct after = value.getStruct("after");
  42. Struct source = value.getStruct("source");
  43. String db = source.getString("db");//库名
  44. String table = source.getString("table");//表名
  45. //获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
  46. /* READ("r"),
  47. CREATE("c"),
  48. UPDATE("u"),
  49. DELETE("d");*/
  50. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  51. String opstr = operation.toString().toLowerCase();
  52. //类型修正 会把insert识别成create
  53. if (opstr.equals("create")) {
  54. opstr = "insert";
  55. }
  56. //获取after结构体里面的表数据,封装成json输出
  57. JSONObject json1 = new JSONObject();
  58. JSONObject json2 = new JSONObject();
  59. //加个判空
  60. if (after != null) {
  61. List<Field> data = after.schema().fields(); //获取结构体
  62. for (Field field : data) {
  63. String name = field.name(); //结构体的名字
  64. Object value2 = after.get(field);//结构体的字段值
  65. //放进json2里面去 json2放到json1里面去
  66. json2.put(name, value2);
  67. }
  68. }
  69. //整理成大json串输出
  70. json1.put("db", db);
  71. json1.put("table", table);
  72. json1.put("data", json2);
  73. json1.put("type", opstr);
  74. collector.collect(json1.toJSONString());
  75. }
  76. @Override
  77. public TypeInformation<String> getProducedType() {
  78. return TypeInformation.of(String.class);
  79. }
  80. }

4、全量读取某个数据库指定DB中的指定表

可以在build之前 ,添加一个

  1. tableList,用来指定特定的DB中的特定表
  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. Properties properties = new Properties();
  4. // 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
  5. properties.setProperty("debezium.snapshot.locking.mode", "none");
  6. env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));
  7. CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  8. //最大同时存在的ck数 和设置的间隔时间有一个就行
  9. checkpointConfig.setMaxConcurrentCheckpoints(1);
  10. //超时时间
  11. checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
  12. //2.3 指定从CK自动重启策略
  13. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
  14. //2.4 设置任务关闭的时候保留最后一次CK数据
  15. checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  16. DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
  17. .hostname("100.21.112.11")
  18. .port(3306)
  19. .deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
  20. .username("root")
  21. .password("xxxx")
  22. .databaseList("horse") // 指定某个特定的库
  23. .tableList("horse.t_dri_info") //指定特定的表
  24. .startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
  25. .debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
  26. .build();
  27. /*
  28. * .startupOptions(StartupOptions.latest()) 参数配置
  29. * 1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
  30. * 2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
  31. * 3.latest() 从最新的binlog开始读取
  32. * 4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
  33. * 5.timestamp(long startupTimestampMillis) 指定时间戳读取
  34. * */
  35. env.addSource(MysqlSource).print();
  36. env.execute("flink-cdc");
  37. }
  38. public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
  39. @Override
  40. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  41. Struct value = (Struct) sourceRecord.value();
  42. Struct after = value.getStruct("after");
  43. Struct source = value.getStruct("source");
  44. String db = source.getString("db");//库名
  45. String table = source.getString("table");//表名
  46. //获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
  47. /* READ("r"),
  48. CREATE("c"),
  49. UPDATE("u"),
  50. DELETE("d");*/
  51. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  52. String opstr = operation.toString().toLowerCase();
  53. //类型修正 会把insert识别成create
  54. if (opstr.equals("create")) {
  55. opstr = "insert";
  56. }
  57. //获取after结构体里面的表数据,封装成json输出
  58. JSONObject json1 = new JSONObject();
  59. JSONObject json2 = new JSONObject();
  60. //加个判空
  61. if (after != null) {
  62. List<Field> data = after.schema().fields(); //获取结构体
  63. for (Field field : data) {
  64. String name = field.name(); //结构体的名字
  65. Object value2 = after.get(field);//结构体的字段值
  66. //放进json2里面去 json2放到json1里面去
  67. json2.put(name, value2);
  68. }
  69. }
  70. //整理成大json串输出
  71. json1.put("db", db);
  72. json1.put("table", table);
  73. json1.put("data", json2);
  74. json1.put("type", opstr);
  75. collector.collect(json1.toJSONString());
  76. }
  77. @Override
  78. public TypeInformation<String> getProducedType() {
  79. return TypeInformation.of(String.class);
  80. }
  81. }
标签: mysql flink 数据库

本文转载自: https://blog.csdn.net/Aaron_ch/article/details/122133198
版权归原作者 一个肉团子 所有, 如有侵权,请联系我们删除。

“Flink CDC 读取MySQL的数据”的评论:

还没有评论