1. 目标
实现将不同MySQL Schema实时同步至同一数据源以供其他数据分析应用作为数据源调用。
搭建范围包括:供数据分析应用调用的数据源搭建以及MySQL数据同步
2. 系统架构
Flink+ElasticSearch的部署配置为本文重点。
序号产品版本备注1CentOS7.8 64bit移动云ECS2MySQL8.0移动云DB3ElasticSearch8.15.34Flink1.20.05
Flink CDC
3.2.0生成Flink Job
3. 环境搭建
3.1 ElasticSearch部署
参考ElasticSearch8.13.0安装步骤
Step1. 创建ES群组、用户(ElasticSearch8 不允许root用户启动)
groupadd es useradd es -g es -p XXXXXXX
Step2. 下载ElasticSearch8.15.3安装包 (ElasticSearch官方安装步骤)
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.15.3-linux-x86_64.tar.gz wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.15.3-linux-x86_64.tar.gz.sha512 shasum -a 512 -c elasticsearch-8.15.3-linux-x86_64.tar.gz.sha512
期望结果:
如果提示shasum command not found, 安装per-Digest-SHA后在执行
yum install -y perl-Digest-SHA
Step3. 解压并赋权
tar -xzf elasticsearch-8.15.3-linux-x86_64.tar.gz chown -R es:es elasticsearch-8.15.3 cd elasticsearch-8.15.3/
Step4. 配置elasticsearch.yml
# 允许外部访问 network.host: 0.0.0.0 # 关闭安全组件 xpack.security.enabled: false
Step5. 设置vm.max_map_count参数 (系统虚拟内存默认最大映射数为65530,无法满足ES系统要求,需要调整为262144以上。)
vim /etc/sysctl.conf # 添加添加参数 vm.max_map_count = 262144 # 重新加载/etc/sysctl.conf配置\ sysctl -p
Step6. 启动elasticsearch
su es # 首次运行建议使用如下命令,以发现启动问题 bin/elasticsearch # 后台运行 bin/elasticsearch -d -p pid
首次启动记录elastic初始密码,以供后续使用
Step7. 验证启动成功
网页输入: https://ip:9200 提示输入用户名、密码则启动成功
Step8. 配置默认动态模板,关闭date_detection (当存在date字段,但该字段不是必填项时,开启date_detection可能会发生“cannot parse empty datetime”的报错)
curl -XPUT [uesr]:[password]@[ip]:[port]/_template/template_default?pretty -H "Content-Type: application/json" --data-binary template_default.json # template_defualt.json { "index_patterns" : ["*"], "mappings" : { "date_detection": false } }
3.2 数据同步
3.2.1 Flink部署
Step1 下载flink安装包
wget https://dlcdn.apache.org/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz wget https://dlcdn.apache.org/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz.sha512 shasum -a 512 -c flink-1.20.0-bin-scala_2.12.tgz.sha512
Step2 解压安装包
tar -xzf flink-1.20.0-bin-scala_2.12.tgz cd flink-1.20.0-bin-scala_2.12/
Step3 配置config.yml
jobmanager: bind-host: 0.0.0.0 。。。 memory: process: size: 2600m 。。。 taskmanager: bind-host: 0.0.0.0 host: 0.0.0.0 numberOfTaskSlots: 2 memory: process: size: 2728m flink: size: 2280m 。。。 execution: checkpointing: interval: 3min 。。。 rest: address: 0.0.0.0 bind-address: 0.0.0.0
Step4 启动flink
bin/start-cluster.sh
注:启动flink,需安装JAVA,环境变量JAVA_HOME不为空。
3.2.2 创建同步任务
Step1. 下载flink cdc3.2.0
wget https://dlcdn.apache.org/flink/flink-cdc-3.2.0/flink-cdc-3.2.0-bin.tar.gz wget https://dlcdn.apache.org/flink/flink-cdc-3.2.0/flink-cdc-3.2.0-bin.tar.gz.sha512 shasum -a 512 flink-cdc-3.2.0-bin.tar.gz.sha512
Step2. 解压文件
tar -zxf flink-cdc-3.2.0-bin.tar.gz cd flink-cdc-3.2.0/
Step3. 编写CDC YAML文件
################################################################################ # Description: Sync MySQL all tables to ElasticSearch ################################################################################ source: type: mysql hostname: [DB_IP] port: [DB_PORT,DEFAULT 3306] username: [DB_USER] password: [DB_PASSWORD] tables: [同步的表,全库[schema].\.*] server-id: 5400-5404 server-time-zone: Asia/Shanghai sink: type: elasticsearch name: ES Sink hosts: http://[ES_IP]:[ES_PORT] username: [ES_USER] password: [ES_PASSWORD] version: 8 batch.size.max: 5 inflight.requests.max: 1 buffered.requests.max: 10 batch.size.max.bytes: 52428800 buffer.time.max.ms: 1000 record.size.max.bytes: 10485760 pipeline: name: Sync MySQL Database to ElasticSearch parallelism: 1
Step4. 上传JAR包
将flink-cdc-pipeline-connector-elasticsearch-3.2.0.jar,flink-cdc-pipeline-connector-elasticsearch-3.2.0.jar 上传至 FLINK_CDC_HOME/lib目录下
将mysql-connector-java-8.0.27.jar 上传至FLINK_HOME/lib 目录下
Step5. 创建同步任务
bin/flink-cdc.sh MySQLToES.yaml --flink-home /data/flink-1.20.0
期望结果:
3.3 环境验证
- Flink任务验证
Jobs -> Running Jobs 找到之前创建的JOB,运行状态为Running
等待3min,切换至checkpoints页面,checkpoints history为completed
- 查看ElasticSearch日志
首次同步会存在create index,create mapping日志
修改Mysql数据库表内容, ES对应INDEX下的记录有被同步修改
4. 后续
- Flink CDC 配置文件 elasticsearch相关配置的含义
batch.size.max: 5 inflight.requests.max: 1 buffered.requests.max: 10 batch.size.max.bytes: 52428800 buffer.time.max.ms: 1000 record.size.max.bytes: 10485760
- elasticsearch.yml中 xpack.security.enabled: true时,外部api链接
版权归原作者 zhzhouzero 所有, 如有侵权,请联系我们删除。