本文基于FlinkCDC 3.1.1和Flink 1.18版本,对Mysql数据库的binlog增量监听,实现对DDL语句审计告警的能力。
一、项目能力:
基于FlinkCDC-3.1.1和Flink-1.18技术栈,采用Flink on yarn部署模式,实现增量监听Mysql数据库,对Mysql中的DDL语句做审计告警,将告警消息发送至钉钉告警群。
二、主要技术架构:
技术栈版本java1.8FlinkCDC3.1.1Flink1.18.0Hadoop (yarn)2.8.3
三、部署流程:
- 安装
Java
环境和Hadoop
环境,步骤略; - 安装
Flink
环境:
->wget https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
->tar-zxvf flink-1.18.0-bin-scala_2.12.tgz
->mkdir ./flink-1.18.0/job
- 下载
Flink
与Hadoop
整合相关依赖jar
包:
->cd flink-1.18.0/lib
->wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
配置
flink-conf.yaml
文件,主要配置如下参数:
配置项value说明state.backendfilesystem状态后端存储模式state.checkpoints.dirhdfs://${HOST_NAME}:8020/flink/flink-checkpointscheckpoint地址,基于hdfsstate.savepoints.dirhdfs://${HOST_NAME}:8020/flink/flink-savepointssavepoint地址,同样基于hdfsenv.java.opts“-Dfile.encoding=UTF-8”全局编码(**重要配置,必须要加引号)建立钉钉群,并创建告警机器人。创建过程略;(注意:当前版本只能处理未加签机器人,关键字设置为
告警
,审计
);打包并上传:
->git clone
-> mvn clean package -Dmaven.test.skip=ture
将
jar
包上传到
/flink-1.18.0/job
目录下。
- 创建配置文件:
你可以手动新增配置文件,也可以将项目中的
config.json
文件直接上传到
/flink-1.18.0/job
目录下,如下展示新增过程:
->cd /flink-1.18.0/job
->vim config.json
{"auditName":"sql-audit","job":{"serverId":"","startMode":"latest","timestamp":""},"source":{"sourceType":"mysql","hostname":"127.0.0.1","port":3306,"username":"root","password":"root","databaseList":[],"tableList":[],"blackDatabaseList":[]},"alarm":{"alarmType":"WEBHOOK","webhook":"https://oapi.dingtalk.com/robot/send?access_token=${YOUR_TOKEN}","msgtype":"text","at":{"isAtAll":false,"atMobiles":["13400000000"]}}}
配置文件内容描述如下:
参数名是否必填默认值说明auditName是sql-audit任务名称serverId否-mysql slave server_idstartMode是latestFlink启动模式(支持
initial
,
earliest
,
latest
及
timestamp
四种模式)timestamp否Null如果启动模式设置为
timestamp
,则该值必须设置sourceType是mysql数据源类型,当前只支持
mysql
hostname是-
mysql
连接ipport是-
mysql
连接端口username是-
mysql
连接用户名password是-
mysql
连接密码databaseList否-
Flinkcdc
监听的数据库列表tableList否-
Flinkcdc
监听的数据表列表blackDatabaseList否-需要过滤掉的黑名单数据库列表alarmType是WEBHOOK告警类型,默认为钉钉,当前也只支持钉钉webhook是-
webhook
地址msgtype是text消息类型,钉钉消息类型支持
markdown
和
text
isAtAll是false是否@所有人atMobiles是@人手机号
- 创建启动脚本:
->vim start.sh
->sh start.sh
#!/bin/bashexportJAVA_HOME=${YOUR_JAVA_PATH}/jdk1.8.0_181
exportPATH=$JAVA_HOME/bin:$PATHexportHADOOP_CLASSPATH=`hadoop classpath`exportHADOOP_USER_NAME=hdfs
flink_home=$PWD/../../bin
jar_path=$PWD/sql-audit.jar
config_path=$PWD/config.json
job_name=sql-audit-app
$flink_home/flink run -c com.someway.MySQLAuditExec -m yarn-cluster -ynm$job_name-p1-yjm1024-ytm1024-yDenv.java.opts="-Dfile.encoding=UTF-8"${jar_path}${config_path}
- 告警测试:
通过在数据库创建表,删除表,truncate表和alert表,能够捕获如下审计告警:
- Create 告警样例:
- DROP 告警样例:
- Truncate 告警样例:
- Alter 告警样例:
四、最后
当前项目属于V0.1版本,里面支持的数据源不是很全面,告警渠道也有限。如果有需要的小伙伴,可以自行下载代码,然后做二次开开发。FlinkCDC 3.0版本提供了很多新能力,为数据同步提供了更多的保障机制和可能性,后续会继续探索其他新功能和新使用场景。有需要交流的小伙伴,欢迎关注我的公众号,一起交流学习。
如果需要源码的小伙伴,关注公众号后回复"SQL审计"自动获取代码地址。
版权归原作者 码猿小站 所有, 如有侵权,请联系我们删除。