0


Flink流批一体计算(23):Flink SQL之多流kafka写入多个mysql sink

1. 准备工作

生成数据

source kafka json 数据格式 :

topic case_kafka_mysql:

{"ts": "20201011","id": 8,"price_amt":211}

topic flink_test_2:

{"id": 8,"coupon_price_amt":100}

注意:针对双流中的每条记录都发触发

topic: case_kafka_mysql

  1. docker exec -it 192d1369463a bash
  2. bash-5.1# cd /opt/kafka_2.12-2.5.0/bin
  3. bash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic case_kafka_mysql
  4. >{"ts": "20201011","id": 8,"price_amt":211}

topic: flink_test_2

  1. docker exec -it 192d1369463a bash
  2. bash-5.1# cd /opt/kafka_2.12-2.5.0/bin
  3. bash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic flink_test_2
  4. >{"id": 8,"coupon_price_amt":100}
创建数据表

mysql 建表语句

  1. CREATE TABLE `sync_test_2` (
  2. `id` bigint(11) NOT NULL AUTO_INCREMENT,
  3. `ts` varchar(64) DEFAULT NULL,
  4. `total_gmv` bigint(11) DEFAULT NULL,
  5. PRIMARY KEY (`id`),
  6. UNIQUE KEY `uidx` (`ts`) USING BTREE
  7. ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
  8. CREATE TABLE `sync_test_22` (
  9. `id` bigint(11) NOT NULL AUTO_INCREMENT,
  10. `ts` varchar(64) DEFAULT NULL,
  11. `coupon_ratio` double DEFAULT NULL,
  12. PRIMARY KEY (`id`),
  13. UNIQUE KEY `uidx` (`ts`) USING BTREE
  14. ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

2. 创建数据表

创建数据源表
  1. create table flink_test_2_1 (
  2. id BIGINT,
  3. ts VARCHAR,
  4. price_amt BIGINT,
  5. proctime AS PROCTIME ()
  6. )
  7. with (
  8. 'connector' = 'kafka',
  9. 'topic' = 'case_kafka_mysql',
  10. 'properties.bootstrap.servers' = '127.0.0.1:9092',
  11. 'properties.group.id' = 'flink_gp_test2-1',
  12. 'scan.startup.mode' = 'earliest-offset',
  13. 'format' = 'json',
  14. 'json.fail-on-missing-field' = 'false',
  15. 'json.ignore-parse-errors' = 'true',
  16. 'properties.zookeeper.connect' = '127.0.0.1:2181/kafka'
  17. );
  18. create table flink_test_2_2 (
  19. id BIGINT,
  20. coupon_price_amt BIGINT,
  21. proctime AS PROCTIME ()
  22. )
  23. with (
  24. 'connector' = 'kafka',
  25. 'topic' = 'flink_test_2',
  26. 'properties.bootstrap.servers' = '127.0.0.1:9092',
  27. 'properties.group.id' = 'flink_gp_test2-2',
  28. 'scan.startup.mode' = 'earliest-offset',
  29. 'format' = 'json',
  30. 'json.fail-on-missing-field' = 'false',
  31. 'json.ignore-parse-errors' = 'true',
  32. 'properties.zookeeper.connect' = '127.0.0.1:2181/kafka'
  33. );

关键配置的说明:

json.fail-on-missing-field:在json缺失字段时是否报错

json.ignore-parse-errors:在解析json失败时是否报错

一般无法保证json格式,所以以上两个配置是比较重要的。

创建数据目标表
  1. CREATE TABLE sync_test_2 (
  2. ts string,
  3. total_gmv bigint,
  4. PRIMARY KEY (ts) NOT ENFORCED
  5. ) WITH (
  6. 'connector' = 'jdbc',
  7. 'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
  8. 'table-name' = 'sync_test_2',
  9. 'username' = 'root',
  10. 'password' = 'Admin'
  11. );
  12. CREATE TABLE sync_test_22 (
  13. ts string,
  14. coupon_ration bigint,
  15. PRIMARY KEY (ts) NOT ENFORCED
  16. ) WITH (
  17. 'connector' = 'jdbc',
  18. 'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
  19. 'table-name' = 'sync_test_2',
  20. 'username' = 'root',
  21. 'password' = 'Admin'
  22. );

3. 计算

一个作业中写入一个Sink或多个Sink。

说明 写入多个Sink语句时,需要以**BEGIN STATEMENT SET;开头,以END;**结尾。

  1. BEGIN STATEMENT SET; --写入多个Sink时,必填。
  2. INSERT INTO sync_test_2
  3. SELECT
  4. ts,
  5. SUM(price_amt - coupon_price_amt) AS total_gmv
  6. FROM
  7. (
  8. SELECT
  9. a.ts as ts,
  10. a.price_amt as price_amt,
  11. b.coupon_price_amt as coupon_price_amt
  12. FROM
  13. flink_test_2_1 as a
  14. LEFT JOIN flink_test_2_2 b on b.id = a.id
  15. )
  16. GROUP BY ts;
  17. INSERT INTO sync_test_22
  18. SELECT
  19. ts,
  20. sum(coupon_price_amt)/sum(amount) AS coupon_ration
  21. FROM
  22. (
  23. SELECT
  24. a.ts as ts,
  25. a.price_amt as price_amt,
  26. b.coupon_price_amt as coupon_price_amt
  27. FROM
  28. flink_test_2_1 as a
  29. LEFT JOIN flink_test_2_2 b on b.id = a.id
  30. )
  31. GROUP BY ts;;
  32. END; --写入多个Sink时,必填。
WITH子句

WITH提供了一种编写辅助语句以用于更大的查询的方法。这些语句通常被称为公共表表达式(CTE),可以被视为定义仅针对一个查询存在的临时视图。

改写上述查询:

  1. BEGIN STATEMENT SET; --写入多个Sink时,必填。
  2. with orders_with_coupon AS (
  3. SELECT
  4. a.ts as ts,
  5. a.price_amt as price_amt,
  6. b.coupon_price_amt as coupon_price_amt
  7. FROM
  8. flink_test_2_1 as a
  9. LEFT JOIN flink_test_2_2 b on b.id = a.id
  10. )
  11. INSERT INTO sync_test_2
  12. SELECT
  13. ts,
  14. SUM(price_amt - coupon_price_amt) AS total_gmv
  15. FROM orders_with_coupon
  16. GROUP BY ts;
  17. INSERT INTO sync_test_22
  18. SELECT
  19. ts,
  20. coupon_price_amt/price_amt AS coupon_ration
  21. FROM orders_with_coupon
  22. GROUP BY ts;;
  23. END; --写入多个Sink时,必填。
标签: flink sql kafka

本文转载自: https://blog.csdn.net/victory0508/article/details/134809717
版权归原作者 victory0508 所有, 如有侵权,请联系我们删除。

“Flink流批一体计算(23):Flink SQL之多流kafka写入多个mysql sink”的评论:

还没有评论