0


FLINK SQL 1.17.1读取KAFKA数据,实时计算后写入MYSQL

为了后续搭建实时数据做准备,测试使用FLINK SQL实时读取KAFKA数据,通过实时计算后,写入MYSQL。

原始数据为仿造的保单表和险种表的数据,在kafka中创建两张贴源层表:保单表和险种表,再建一张关联表和一张汇总表,然后将数据写入mysql。

1、环境搭建

在官网下载flink和kafka的安装包,上传至服务器并解压,此处省略,需要注意的是由于需要连接kafka和mysql,需要在官网下载相应的jar包并上传至flink的lib目录:
flink-connector-jdbc-3.1.0-1.17.jar

flink-connector-kafka-1.17.1.jar

flink-shaded-jackson-2.14.2-17.0.jar

flink-sql-connector-kafka-1.17.1.jar

flink-sql-csv-1.17.1.jar

flink-sql-json-1.17.1.jar

kafka-clients-3.5.1.jar

mysql-connector-java-5.1.48.jar

2、数据准备

./kafka-topics.sh --bootstrap-server 10.9.135.16:9092  --create --topic cont
./kafka-console-producer.sh --bootstrap-server 10.9.135.16:9092 --topic cont
{"contno": "1001", "prem":"20", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
{"contno": "1002", "prem":"10", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
{"contno": "1003", "prem":"10", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
{"contno": "1004", "prem":"10", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
{"contno": "1005", "prem":"10", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
{"contno": "1006", "prem":"10", "amnt": "100","riskcode": "002", "ts":"2023-11-01 20:38:05"}
{"contno": "1007", "prem":"10", "amnt": "100","riskcode": "002", "ts":"2023-11-01 20:38:05"}
{"contno": "1008", "prem":"10", "amnt": "100","riskcode": "003", "ts":"2023-11-01 20:38:05"}
{"contno": "1009", "prem":"10", "amnt": "100","riskcode": "004", "ts":"2023-11-01 20:38:05"}
{"contno": "1010", "prem":"10", "amnt": "100","riskcode": "004", "ts":"2023-11-01 20:38:05"}
{"contno": "1011", "prem":"10", "amnt": "100","riskcode": "005", "ts":"2023-11-01 20:38:05"}
{"contno": "1012", "prem":"10", "amnt": "100","riskcode": "005", "ts":"2023-11-01 20:38:05"}
{"contno": "1013", "prem":"10", "amnt": "100","riskcode": "005", "ts":"2023-11-01 20:38:05"}
{"contno": "1014", "prem":"50", "amnt": "100","riskcode": "006", "ts":"2023-11-01 20:38:05"}
{"contno": "1015", "prem":"60", "amnt": "100","riskcode": "006", "ts":"2023-11-01 20:38:05"}

./kafka-topics.sh --bootstrap-server 10.9.135.16:9092  --create --topic riskapp
./kafka-console-producer.sh --bootstrap-server 10.9.135.16:9092 --topic riskapp
{"riskcode": "001", "riskname":"险种1", "ts":"2023-11-01 20:38:05"}
{"riskcode": "002", "riskname":"险种2", "ts":"2023-11-01 20:38:05"}
{"riskcode": "003", "riskname":"险种3", "ts":"2023-11-01 20:38:05"}
{"riskcode": "004", "riskname":"险种4", "ts":"2023-11-01 20:38:05"}
{"riskcode": "005", "riskname":"险种5", "ts":"2023-11-01 20:38:05"}
{"riskcode": "006", "riskname":"险种6", "ts":"2023-11-01 20:38:05"}

3、FLINK SQL

​--贴源层保单表
CREATE TABLE cont (
contno string,
prem int,
amnt int,
riskcode string,
record_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'cont',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '10.9.135.16:9092',
'format' = 'json'
);

--贴源层险种表
CREATE TABLE riskapp (
riskcode string,
riskname string,
record_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'riskapp',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '10.9.135.16:9092',
'format' = 'json'
);

--保单和险种关联表
CREATE TABLE kafka_cont_risk (
contno string,
prem int,
amnt int,
riskcode string,
riskname string,
primary key (contno) not enforced
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'kafka_cont_risk',
'properties.bootstrap.servers' = '10.9.135.16:9092',
'key.format' = 'json',
'value.format' = 'json'
);

--汇总表
CREATE TABLE risk_prem (
riskname string,
prem int,
primary key (riskname) not enforced
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.9.134.14:3306/libra_report?useSSL=false',
'table-name' = 'risk_prem',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'root',
'password' = '123456'
);

--创建mysql表
CREATE TABLE libra_report.risk_prem (
riskname varchar(100),
prem int,
primary key (riskname)
);

--往保单和险种关联表中插入数据
insert into kafka_cont_risk SELECT a.contno,a.prem,a.amnt,a.riskcode,b.riskname FROM (SELECT contno,prem,amnt,riskcode,ROW_NUMBER() OVER (PARTITION BY contno ORDER BY record_time desc) AS rownum FROM cont) a left join (SELECT riskcode,riskname,ROW_NUMBER() OVER (PARTITION BY riskcode ORDER BY record_time desc) AS rownum FROM riskapp)b on a.riskcode=b.riskcode WHERE a.rownum = 1 and b.rownum = 1;

--往汇总表插数
set 'table.exec.sink.not-null-enforcer'='DROP';
insert into risk_prem select riskname,sum(prem) from kafka_cont_risk group by riskname;

4、查询结果

--查询保单险种关联表
select * from kafka_cont_risk;

--查询汇总表
select * from risk_prem;

--查询mysql结果表数据
select * from libra_report.risk_prem;

标签: flink sql kafka

本文转载自: https://blog.csdn.net/weixin_41745141/article/details/134303244
版权归原作者 伊一cherry大数据 所有, 如有侵权,请联系我们删除。

“FLINK SQL 1.17.1读取KAFKA数据,实时计算后写入MYSQL”的评论:

还没有评论