0


基于FlinkCDC-3.1.1&Flink-1.18实现MySQL DDL审计告警

本文基于FlinkCDC 3.1.1和Flink 1.18版本,对Mysql数据库的binlog增量监听,实现对DDL语句审计告警的能力。

一、项目能力:

基于FlinkCDC-3.1.1和Flink-1.18技术栈,采用Flink on yarn部署模式,实现增量监听Mysql数据库,对Mysql中的DDL语句做审计告警,将告警消息发送至钉钉告警群。

二、主要技术架构:

技术栈版本java1.8FlinkCDC3.1.1Flink1.18.0Hadoop (yarn)2.8.3

三、部署流程:
  1. 安装Java环境和Hadoop环境,步骤略;
  2. 安装Flink环境:
->wget https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
->tar-zxvf flink-1.18.0-bin-scala_2.12.tgz
->mkdir ./flink-1.18.0/job
  1. 下载FlinkHadoop整合相关依赖jar包:
->cd flink-1.18.0/lib
->wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
  1. 配置flink-conf.yaml文件,主要配置如下参数
    配置项value说明state.backendfilesystem状态后端存储模式state.checkpoints.dirhdfs://${HOST_NAME}:8020/flink/flink-checkpointscheckpoint地址,基于hdfsstate.savepoints.dirhdfs://${HOST_NAME}:8020/flink/flink-savepointssavepoint地址,同样基于hdfsenv.java.opts“-Dfile.encoding=UTF-8”全局编码(**重要配置,必须要加引号)

  2. 建立钉钉群,并创建告警机器人。创建过程略;(注意:当前版本只能处理未加签机器人,关键字设置为告警审计);

  3. 打包并上传:

->git clone 
-> mvn clean package -Dmaven.test.skip=ture

​ 将

jar

包上传到

/flink-1.18.0/job

目录下。

  1. 创建配置文件:

​ 你可以手动新增配置文件,也可以将项目中的

config.json

文件直接上传到

/flink-1.18.0/job

目录下,如下展示新增过程:

->cd /flink-1.18.0/job
->vim config.json
{"auditName":"sql-audit","job":{"serverId":"","startMode":"latest","timestamp":""},"source":{"sourceType":"mysql","hostname":"127.0.0.1","port":3306,"username":"root","password":"root","databaseList":[],"tableList":[],"blackDatabaseList":[]},"alarm":{"alarmType":"WEBHOOK","webhook":"https://oapi.dingtalk.com/robot/send?access_token=${YOUR_TOKEN}","msgtype":"text","at":{"isAtAll":false,"atMobiles":["13400000000"]}}}

配置文件内容描述如下:
参数名是否必填默认值说明auditName是sql-audit任务名称serverId否-mysql slave server_idstartMode是latestFlink启动模式(支持

initial

,

earliest

,

latest

timestamp

四种模式)timestamp否Null如果启动模式设置为

timestamp

,则该值必须设置sourceType是mysql数据源类型,当前只支持

mysql

hostname是-

mysql

连接ipport是-

mysql

连接端口username是-

mysql

连接用户名password是-

mysql

连接密码databaseList否-

Flinkcdc 

监听的数据库列表tableList否-

Flinkcdc 

监听的数据表列表blackDatabaseList否-需要过滤掉的黑名单数据库列表alarmType是WEBHOOK告警类型,默认为钉钉,当前也只支持钉钉webhook是-

webhook

地址msgtype是text消息类型,钉钉消息类型支持

markdown

text

isAtAll是false是否@所有人atMobiles是@人手机号

  1. 创建启动脚本
->vim start.sh
->sh start.sh
#!/bin/bashexportJAVA_HOME=${YOUR_JAVA_PATH}/jdk1.8.0_181
exportPATH=$JAVA_HOME/bin:$PATHexportHADOOP_CLASSPATH=`hadoop classpath`exportHADOOP_USER_NAME=hdfs

flink_home=$PWD/../../bin
jar_path=$PWD/sql-audit.jar
config_path=$PWD/config.json
job_name=sql-audit-app

$flink_home/flink run -c com.someway.MySQLAuditExec -m yarn-cluster -ynm$job_name-p1-yjm1024-ytm1024-yDenv.java.opts="-Dfile.encoding=UTF-8"${jar_path}${config_path}
  1. 告警测试

通过在数据库创建表,删除表,truncate表和alert表,能够捕获如下审计告警:

  • Create 告警样例:在这里插入图片描述
  • DROP 告警样例在这里插入图片描述
  • Truncate 告警样例:在这里插入图片描述
  • Alter 告警样例:在这里插入图片描述
四、最后

当前项目属于V0.1版本,里面支持的数据源不是很全面,告警渠道也有限。如果有需要的小伙伴,可以自行下载代码,然后做二次开开发。FlinkCDC 3.0版本提供了很多新能力,为数据同步提供了更多的保障机制和可能性,后续会继续探索其他新功能和新使用场景。有需要交流的小伙伴,欢迎关注我的公众号,一起交流学习。

如果需要源码的小伙伴,关注公众号后回复"SQL审计"自动获取代码地址

在这里插入图片描述

标签: flink mysql 大数据

本文转载自: https://blog.csdn.net/weixin_43914798/article/details/140658215
版权归原作者 码猿小站 所有, 如有侵权,请联系我们删除。

“基于FlinkCDC-3.1.1&Flink-1.18实现MySQL DDL审计告警”的评论:

还没有评论