0


基于Flink CDC实现ElasticSearch同步MySQL环境搭建笔记

1. 目标

实现将不同MySQL Schema实时同步至同一数据源以供其他数据分析应用作为数据源调用。

搭建范围包括:供数据分析应用调用的数据源搭建以及MySQL数据同步

2. 系统架构

Flink+ElasticSearch的部署配置为本文重点。


序号产品版本备注1CentOS7.8 64bit移动云ECS2MySQL8.0移动云DB3ElasticSearch8.15.34Flink1.20.05
Flink CDC
3.2.0生成Flink Job

3. 环境搭建

3.1 ElasticSearch部署

参考ElasticSearch8.13.0安装步骤

Step1. 创建ES群组、用户(ElasticSearch8 不允许root用户启动)

groupadd es
useradd es -g es -p XXXXXXX

Step2. 下载ElasticSearch8.15.3安装包 (ElasticSearch官方安装步骤)

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.15.3-linux-x86_64.tar.gz
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.15.3-linux-x86_64.tar.gz.sha512
shasum -a 512 -c elasticsearch-8.15.3-linux-x86_64.tar.gz.sha512 

期望结果:

如果提示shasum command not found, 安装per-Digest-SHA后在执行

yum install -y perl-Digest-SHA

Step3. 解压并赋权

tar -xzf elasticsearch-8.15.3-linux-x86_64.tar.gz
chown -R es:es elasticsearch-8.15.3
cd elasticsearch-8.15.3/ 

Step4. 配置elasticsearch.yml

# 允许外部访问
network.host: 0.0.0.0
# 关闭安全组件
xpack.security.enabled: false

Step5. 设置vm.max_map_count参数 (系统虚拟内存默认最大映射数为65530,无法满足ES系统要求,需要调整为262144以上。)

vim /etc/sysctl.conf 
# 添加添加参数 
vm.max_map_count = 262144
# 重新加载/etc/sysctl.conf配置\
sysctl -p

Step6. 启动elasticsearch

su es
# 首次运行建议使用如下命令,以发现启动问题
bin/elasticsearch

# 后台运行
bin/elasticsearch -d -p pid 

首次启动记录elastic初始密码,以供后续使用

Step7. 验证启动成功

网页输入: https://ip:9200 提示输入用户名、密码则启动成功

Step8. 配置默认动态模板,关闭date_detection (当存在date字段,但该字段不是必填项时,开启date_detection可能会发生“cannot parse empty datetime”的报错)

curl -XPUT  [uesr]:[password]@[ip]:[port]/_template/template_default?pretty -H "Content-Type: application/json" --data-binary template_default.json


# template_defualt.json
{
    "index_patterns" : ["*"],
    "mappings" : {
        "date_detection": false
    }
}

3.2 数据同步

3.2.1 Flink部署

Step1 下载flink安装包

wget https://dlcdn.apache.org/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
wget https://dlcdn.apache.org/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz.sha512
shasum -a 512 -c flink-1.20.0-bin-scala_2.12.tgz.sha512 

Step2 解压安装包

tar -xzf flink-1.20.0-bin-scala_2.12.tgz
cd flink-1.20.0-bin-scala_2.12/

Step3 配置config.yml

jobmanager:
  bind-host: 0.0.0.0
。。。
  memory:
    process:
      size: 2600m

。。。

taskmanager:
  bind-host: 0.0.0.0
  host: 0.0.0.0
  numberOfTaskSlots: 2
  memory:
    process:
      size: 2728m
    flink:
      size: 2280m
。。。


execution:
  checkpointing:
    interval: 3min

。。。
rest:
  address: 0.0.0.0
  bind-address: 0.0.0.0

Step4 启动flink

bin/start-cluster.sh

注:启动flink,需安装JAVA,环境变量JAVA_HOME不为空。

3.2.2 创建同步任务

Step1. 下载flink cdc3.2.0

wget https://dlcdn.apache.org/flink/flink-cdc-3.2.0/flink-cdc-3.2.0-bin.tar.gz
wget https://dlcdn.apache.org/flink/flink-cdc-3.2.0/flink-cdc-3.2.0-bin.tar.gz.sha512
shasum -a 512 flink-cdc-3.2.0-bin.tar.gz.sha512

Step2. 解压文件

tar -zxf flink-cdc-3.2.0-bin.tar.gz
cd flink-cdc-3.2.0/

Step3. 编写CDC YAML文件

################################################################################
# Description: Sync MySQL all tables to ElasticSearch
################################################################################
source:
  type: mysql
  hostname: [DB_IP]
  port: [DB_PORT,DEFAULT 3306]
  username: [DB_USER]
  password: [DB_PASSWORD]
  tables: [同步的表,全库[schema].\.*]
  server-id: 5400-5404
  server-time-zone: Asia/Shanghai

sink:
  type: elasticsearch
  name: ES Sink
  hosts: http://[ES_IP]:[ES_PORT]
  username: [ES_USER]
  password: [ES_PASSWORD]
  version: 8
  batch.size.max: 5
  inflight.requests.max: 1
  buffered.requests.max: 10
  batch.size.max.bytes: 52428800
  buffer.time.max.ms:  1000
  record.size.max.bytes: 10485760


pipeline:
  name: Sync MySQL Database to ElasticSearch
  parallelism: 1

Step4. 上传JAR包

将flink-cdc-pipeline-connector-elasticsearch-3.2.0.jar,flink-cdc-pipeline-connector-elasticsearch-3.2.0.jar 上传至 FLINK_CDC_HOME/lib目录下

将mysql-connector-java-8.0.27.jar 上传至FLINK_HOME/lib 目录下

Step5. 创建同步任务

bin/flink-cdc.sh MySQLToES.yaml --flink-home /data/flink-1.20.0

期望结果:

3.3 环境验证

  1. Flink任务验证

Jobs -> Running Jobs 找到之前创建的JOB,运行状态为Running

等待3min,切换至checkpoints页面,checkpoints history为completed

  1. 查看ElasticSearch日志

首次同步会存在create index,create mapping日志

修改Mysql数据库表内容, ES对应INDEX下的记录有被同步修改

4. 后续

  1. Flink CDC 配置文件 elasticsearch相关配置的含义
  batch.size.max: 5
  inflight.requests.max: 1
  buffered.requests.max: 10
  batch.size.max.bytes: 52428800
  buffer.time.max.ms:  1000
  record.size.max.bytes: 10485760
  1. elasticsearch.yml中 xpack.security.enabled: true时,外部api链接
标签: elasticsearch flink

本文转载自: https://blog.csdn.net/zhzhouzero/article/details/143615443
版权归原作者 zhzhouzero 所有, 如有侵权,请联系我们删除。

“基于Flink CDC实现ElasticSearch同步MySQL环境搭建笔记”的评论:

还没有评论