0


基于Flink MySQL CDC技术实现交易告警

前言

CDC 的全称是 Change Data Capture,是一种用于捕获数据库变更数据的技术。例如 MySQL 对数据的所有变更都会写入到 binlog,CDC 就可以通过监听 binlog 文件来实现对 MySQL 数据变更的捕获,然后做进一步的处理。

Flink CDC 将CDC技术和 Flink 流计算整合到一起,把CDC捕获到的数据变更作为 Flink数据源,以实现对数据变更的流式处理。通过 Flink CDC 可以轻松实现以下功能:

  • 数据同步 将一个数据库中的数据变化实时同步到另一个数据库或数据存储中,实现数据的实时备份和迁移
  • 实时数据分析 捕获数据库的变更数据,并将其作为实时流数据源输入到 Flink 进行实时分析和处理,进行实时报表生成、实时监控、实时推荐等应用
  • 数据集成和 ETL 在数据集成和 ETL过程中,使用 Flink CDC 可以实现对源数据库的实时数据抽取,然后进行数据转换和加载到目标系统中

本文就来实现一个简单的 Flink 作业,通过 Flink CDC 技术来监控 MySQL 中的用户交易记录,针对频繁交易和大额交易进行风控告警。

需求描述

用户交易记录存储在 MySQL 的 user_trade 表中,编写一个 Flink 作业实现对用户交易记录的监听,针对每个用户在一分钟内,若交易次数超过十次,或者交易金额超过一万元的,生成一条交易告警记录并写入 user_trade_alert 表,由业务系统触发告警操作。

需求实现

前期准备

执行DDL语句,完成表的创建

CREATETABLE user_trade
(
    id         BIGINT(20)NOTNULLAUTO_INCREMENTPRIMARYKEY,
    user_id    BIGINT(20)NOTNULL,
    amount     BIGINT(20)NOTNULL,
    trade_time DATETIMENOTNULL)COMMENT'用户交易';CREATETABLE user_trade_alert
(
    id           BIGINT(20)NOTNULLAUTO_INCREMENTPRIMARYKEY,
    user_id      BIGINT(20)NOTNULL,
    alert_reason VARCHAR(1024)NOTNULL)COMMENT'用户交易告警';

引入Maven依赖,

mysql-connector-java

是基础,因为要读写MySQL数据库;

flink-connector-jdbc

是 Flink 提供的JDBC 连接器,用于 Flink 读写数据库;

flink-connector-mysql-cdc

是 Flink 提供的针对 MySQL 的 CDC 实现。

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.31</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.2.0-1.19</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>3.2.0</version></dependency>

Flink作业编写

1、先对数据实体建模。UserTrade 对应用户交易记录,UserTradeStat 对应用户交易的统计,UserTradeAlert 对应用户交易的告警。

@DatapublicstaticclassUserTrade{privateLong id;@JSONField(name ="user_id")privateLong userId;privateLong amount;@JSONField(name ="trade_time")privateLong tradeTime;}@Data@NoArgsConstructor@AllArgsConstructorpublicstaticclassUserTradeAlert{privateLong userId;privateString alertReason;}@Data@NoArgsConstructor@AllArgsConstructorpublicstaticclassUserTradeStat{privateLong userId;privatelong totalAmount;privatelong total;}

2、构建 MySqlSource,Flink cdc 的核心数据源,它会连接到 MySQL 主库并消费 binlog 日志来获取数据变更记录,然后转发给下游算子处理。

privatestaticMySqlSource<UserTrade>ofMySqlSource(String database,String table){returnMySqlSource.<UserTrade>builder().hostname(DB_HOST).port(DB_PORT).databaseList(database).tableList(database +"."+ table).username(DB_USERNAME).password(DB_PASSWORD).deserializer(newUserTradeDeserialization()).build();}

Flink CDC 获取到的数据被封装成

org.apache.kafka.connect.source.SourceRecord

类,我们要自定义反序列化器,将获取到的日志记录转化成 UserTrade 实体对象。

publicstaticclassUserTradeDeserializationimplementsDebeziumDeserializationSchema<UserTrade>{transientJsonConverter jsonConverter;@Overridepublicvoiddeserialize(SourceRecordrecord,Collector<UserTrade> collector)throwsException{// 交易数据,不考虑[删改]的场景byte[] bytes =getJsonConverter().fromConnectData(record.topic(),record.valueSchema(),record.value());UserTrade userTrade = JSON.parseObject(bytes).getJSONObject("payload").getObject("after",UserTrade.class);// 时区问题
        userTrade.setTradeTime(userTrade.getTradeTime()-Duration.ofHours(8L).toMillis());
        collector.collect(userTrade);}@OverridepublicTypeInformation<UserTrade>getProducedType(){returnTypeInformation.of(UserTrade.class);}JsonConvertergetJsonConverter(){if(jsonConverter ==null){this.jsonConverter =newJsonConverter();HashMap<String,Object> configs =newHashMap(2);
            configs.put("converter.type",ConverterType.VALUE.getName());
            configs.put("schemas.enable",true);this.jsonConverter.configure(configs);}return jsonConverter;}}

3、因为是统计每分钟内用户的交易次数和交易额度,必然要用到窗口计算,所以要编写窗口处理函数。UserTradeAggregateFunction 是对窗口内数据的聚合处理,UserTradeWindowFunction 是对窗口计算结果的处理,如果满足告警规则,则会生成一个 UserTradeAlert 对象交给下游算子处理。

publicstaticclassUserTradeAggregateFunctionimplementsAggregateFunction<UserTrade,UserTradeStat,UserTradeStat>{@OverridepublicUserTradeStatcreateAccumulator(){returnnewUserTradeStat();}@OverridepublicUserTradeStatadd(UserTrade value,UserTradeStat accumulator){System.err.println("add:"+ value);
        accumulator.setUserId(value.getUserId());
        accumulator.setTotalAmount(accumulator.getTotalAmount()+ value.amount);
        accumulator.setTotal(accumulator.getTotal()+1);return accumulator;}@OverridepublicUserTradeStatgetResult(UserTradeStat accumulator){return accumulator;}@OverridepublicUserTradeStatmerge(UserTradeStat a,UserTradeStat b){returnnull;}}publicstaticclassUserTradeWindowFunctionimplementsWindowFunction<UserTradeStat,UserTradeAlert,Long,TimeWindow>{@Overridepublicvoidapply(Long key,TimeWindow timeWindow,Iterable<UserTradeStat> iterable,Collector<UserTradeAlert> collector)throwsException{System.err.println("win:"+ timeWindow.getStart()+","+ timeWindow.getEnd());
        iterable.forEach(stat ->{if(stat.getTotal()>10|| stat.getTotalAmount()>1000000){LocalDateTime startTime =LocalDateTime.ofInstant(Instant.ofEpochMilli(timeWindow.getStart()),ZoneId.systemDefault());LocalDateTime endTime =LocalDateTime.ofInstant(Instant.ofEpochMilli(timeWindow.getEnd()),ZoneId.systemDefault());String alertReason ="用户: "+ key +" 在["+ startTime +","+ endTime +"]期间产生交易:"+ stat.getTotal()+"笔,交易金额:"+ stat.getTotalAmount()+",触发告警规则.";
                collector.collect(newUserTradeAlert(key, alertReason));}});}}

4、窗口算子生成的 UserTradeAlert 对象会交给 Sink 算子写进数据库里,所以最后还差一个 SinkFunction,因为是写进MySQL,所以这里构建一个 JdbcSink。

privatestaticSinkFunction<UserTradeAlert>ofMysqlSink(){returnJdbcSink.<UserTradeAlert>sink("INSERT INTO user_trade_alert (user_id,alert_reason) VALUES (?,?)",(ps, value)->{
                ps.setLong(1, value.getUserId());
                ps.setString(2, value.getAlertReason());},JdbcExecutionOptions.builder().withBatchIntervalMs(100).withMaxRetries(0).build(),newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://ip:port/flink_db?useSSL=false").withUsername(DB_USERNAME).withPassword(DB_PASSWORD).withDriverName("com.mysql.cj.jdbc.Driver").build());}

5、所有组件都写完了,最后就是启动 Flink 执行环境,把整个作业串起来。

privatestaticfinalString DB_HOST ="your_ip";privatestaticfinalint DB_PORT =3306;privatestaticfinalString DB_USERNAME ="admin";privatestaticfinalString DB_PASSWORD ="xxx";publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment environment =StreamExecutionEnvironment.getExecutionEnvironment();
    environment.fromSource(ofMySqlSource("flink_db","user_trade"),WatermarkStrategy.<UserTrade>forMonotonousTimestamps().withTimestampAssigner((value, time)-> value.getTradeTime()),"mysql-cdc").setParallelism(1).keyBy(UserTrade::getUserId).window(TumblingEventTimeWindows.of(Duration.ofSeconds(60L))).aggregate(newUserTradeAggregateFunction(),newUserTradeWindowFunction()).addSink(ofMysqlSink());
    environment.execute();}

功能验证

提交并运行Flink作业,往MySQL user_trade 表写入一些用户交易记录,当这些交易记录触发告警规则时,Flink 作业就会往 user_trade_alert 表生成如下告警记录示例:
iduser_idalert_reason11用户: 1 在[2024-09-13T15:29:10,2024-09-13T15:29:15]期间产生交易:1笔,交易金额:9999999,触发告警规则.32用户: 2 在[2024-09-13T15:30:25,2024-09-13T15:30:30]期间产生交易:11笔,交易金额:21,触发告警规则.

尾巴

Flink MySQL CDC 具有强大的功能特性,为实时数据处理提供了高效可靠的解决方案。它能够实现对 MySQL 数据库的实时数据捕获,确保数据的及时性和准确性。可以快速响应数据库中的数据变化,将变更数据实时传输到 Flink 流处理引擎中进行进一步的分析和处理。

标签: flink mysql 大数据

本文转载自: https://blog.csdn.net/qq_32099833/article/details/142958510
版权归原作者 程序员小潘 所有, 如有侵权,请联系我们删除。

“基于Flink MySQL CDC技术实现交易告警”的评论:

还没有评论