0


实现mysql和es数据同步的两大工具——Logstash和Canal

用途

在大型实战项目开发过程中,当数据量达到比较大的规模时,不可避免的要考虑使用ElasticSearch(es)等搜索引擎来解决大量数据的查询性能压力,因此,做好mysql的数据同步变得至关重要。我所了解,并且使用的是通过Logstash和Canal中间件,来实现将数据写入到ES等中。

一、实现同步原理

1.1 Logstash

Logstash提供了一个JDBC插件,它可以定期查询数据库并捕获变化。通过配置Logstash指定连接到mysql的哪个表和es的哪个索引库,并指定对应的查询语句。当MySQL中的数据发生变化时,Logstash的JDBC插件会定时的检测到这些变化,并且迅速捕获这些新数据或者以更新的数据(根据文档id),并将其同步到es中。

注意
  • 因为涉及到数据的更新,ES和Mysql表的id字段对应关系
  • 因为是根据时间实现增量同步,所以mysql表中必须有一个包含更新或插入时间的字段

1.2 Canal

Canal中间件则是通过模拟成mysql的从库,实时接收mysql的增量数据,然后通过RESTful API将数据写入到ES中。

二、使用步骤(以es为7.14.0为例)

2.1 Logstash

  1. 下载与es对应版本的logstash-7.14.0和使用的mysql jar包
  2. 在logstash-7.14.0安装目录下创建文件夹mysqltb,将jar包放在该目录下,新建mysql.conf文件,添加内容如下:
  3. 命令行执行 logstash ‐f ../mysqletc/mysql.conf

input {

jdbc {

  # 连接的数据库以及表

  jdbc_connection_string => ""

  # 数据库的账号和密码

  jdbc_user => ""

  jdbc_password => ""

  # mysql jar包的路径

  jdbc_driver_library => ""

  # the name of the driver class for mysql

  jdbc_driver_class => "com.mysql.jdbc.Driver"

  jdbc_paging_enabled => "true"

  jdbc_page_size => "50000"

  #以下对应着要执行的sql的绝对路径。

  #statement_filepath => ""

  statement => ""

  #定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新

  schedule => "* * * * *"

   # 数据标签,用于标记不同的索引数据

  type => "user"

  # 加上jdbc时区, 要不然logstash的时间会不准确

  jdbc_default_timezone => "Asia/Shanghai"

  # 设置列名区分大小写, 默认全小写

  lowercase_column_names => "false"

}

}

output {

if[type] == "user"{

elasticsearch {

  #ESIP地址与端口

  hosts => ["" ]

  #ES索引名称(自己定义的)

  index => ""

  #!!!自增ID编号,与查询语句中特有字段名一致

  document_id => "%{id}"

  # 这个必须要写不然不会同步

  document_type => "_doc"

}}

stdout {

  #以JSON格式输出

  codec => json_lines

}

2.2 Canal

下载canal1.1.5

8.0以下版本均要开启mysql的binlog写入功能,在mysql中使用使用命令show variables like ‘%log_bin%’看是否修改成功

#开启binlog模式

log_bin=mysql-bin

binlog-format=row

#single DB binlog-ignore-db=mysql

在Canal.deployer目录下找到配置文件conf/example/instance.properties,修改如下字段

同步数据库地址

canal.instance.master.address=你的mysql数据库地址

同步数据库账号密码

canal.instance.dbUsername=你的mysql账号

canal.instance.dbPassword=你的mysql密码

修改完成后,保存,启动bin目录下面的startup.bat(linux启动startup.sh),显示Listening for transport dt_socket at address: 9099则没错

在canal.adapter目录下找到配置文件conf/application.yml,修改以下配置:

srcDataSources:

defaultDS:

  url: 你自己的mysql数据库地址

  username: mysql数据库账号

  password: mysql数据库密码

canalAdapters:

  • instance: example # canal instance Name or mq topic name

    groups:

    • groupId: g1

      outerAdapters:

      • name: logger

      • name: es7 #ES同步适配器

        key: es7Key

        hosts: #ES连接地址,!!!!!!主要这里有的地方要加http:

        properties:

        mode: rest

在conf/es7目录中,新建与sql映射表名一样的yml文件

outerAdapterKey: es7Key # 这里和主配置canalAdapters.instance.groupId.outerAdapters.key下面的的key一样

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值

destination: example

groupId: g1

esMapping:

_index: #ES的索引名称

_id: _id

sql: "" # sql映射

etlCondition: "where a.c_time>={}"

commitBatch: 3000 #提交批大小

注意:

如果报错,可以先看日志,如果日志没有报错,并且监听到了mysql数据的改变,但是没有同步到es,可能是由于es7.x的版本问题,解决办法如下:

将canal 1.1.5.alpha-2中plugins目录下的client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar复制到1.1.5的plugins下

更多报错参考:canalAdapter同步至es7.x踩坑指南_client-adapter.es7x-1.1.5-snapshot-jar-with-depend-CSDN博客

三、总结

logstash使用相对比较简单,易上手,可扩展插件生态系统,支持定时存储,但是它耗资源较大,并且不能与消息队列缓存。canal实时同步存储,查询更快、更迅速,对数据库无压力,但是相对而言难些。

标签: adb

本文转载自: https://blog.csdn.net/geuejfwhje/article/details/136424639
版权归原作者 geuejfwhje 所有, 如有侵权,请联系我们删除。

“实现mysql和es数据同步的两大工具——Logstash和Canal”的评论:

还没有评论