0


数据同步-Mysql同步到ElasticSearch

Mysql同步到ElasticSearch

数据同步

一般情况下,如果做查询搜索功能,使用 ES 来模糊搜索,但是数据是存放在数据库 MySQL 里的,所以说我们需要把 MySQL 中的数据和 ES 进行同步,保证数据一致(以 MySQL 为主)。

数据同步包含:全量同步 (首次) + 增量同步(新数据)。

首次安装完 ES,把 MySQL 数据全量同步到 ES 里,写一个单次脚本。

public class FullSyncPostToEs implements CommandLineRunner {

    @Resource
    private PostService postService;

    @Resource
    private PostEsDao postEsDao;

    @Override
    public void run(String... args){
        List<Post> postList = postService.list();if(CollectionUtils.isEmpty(postList)){return;}
        List<PostEsDTO> postEsDTOList = postList.stream().map(PostEsDTO::objToDto).collect(Collectors.toList());
        final int pageSize =500;
        int total = postEsDTOList.size();
        log.info("FullSyncPostToEs start, total {}", total);for(int i =0; i < total; i += pageSize){
            int end = Math.min(i + pageSize, total);
            log.info("sync from {} to {}", i, end);
            postEsDao.saveAll(postEsDTOList.subList(i, end));}
        log.info("FullSyncPostToEs end, total {}", total);}}

增量同步有五种方式:

1、定时任务

  • 定时任务:比如1 分钟 1 次,找到 MySQL 中过去几分钟内(至少是定时周期的 2 倍)发生改变的数据,然后更新到 ES。
public class IncSyncPostToEs {

    @Resource
    private PostMapper postMapper;

    @Resource
    private PostEsDao postEsDao;

    /**
     * 每分钟执行一次
     */
    @Scheduled(fixedRate =60 * 1000)
    public void run(){
        // 查询近 5 分钟内的数据
        Date fiveMinutesAgoDate = new Date(new Date().getTime() - 5 * 60 * 1000L);
        List<Post> postList = postMapper.listPostWithDelete(fiveMinutesAgoDate);if(CollectionUtils.isEmpty(postList)){
            log.info("no inc post");return;}
        List<PostEsDTO> postEsDTOList = postList.stream()
                .map(PostEsDTO::objToDto)
                .collect(Collectors.toList());
        final int pageSize =500;
        int total = postEsDTOList.size();
        log.info("IncSyncPostToEs start, total {}", total);for(int i =0; i < total; i += pageSize){
            int end = Math.min(i + pageSize, total);
            log.info("sync from {} to {}", i, end);
            postEsDao.saveAll(postEsDTOList.subList(i, end));}
        log.info("IncSyncPostToEs end, total {}", total);}}

优点:简单易懂、占用资源少、不用引入第三方中间件;
缺点:有时间差;
应用场景:数据时间内不同步影响不大、或者数据几乎不发生修改;

2、双写

  • 双写:写数据的时候,必须去写入到ES,更新、删除都需要操作ES(加事务:可能存在写入某一方出现失败,形成脏数据)。

3、MQ异步写入

  • MQ异步写入:在写入数据库时,通过MQ异步写入ES,同样可能存在数据写入不一致问题。

4、Logstash

  • ES的Logstash数据同步管道:Logstash 事件处理管道有三个阶段:输入过滤器输出

下载地址:https://www.elastic.co/guide/en/logstash/7.17/installing-logstash.html
inputs 模块负责收集数据,filters 模块可以对收集到的数据进行格式化、过滤、简单的数据处理,outputs 模块负责将数据同步到目的地,Logstash的处理流程,就像管道一样,数据从管道的一端,流向另外一端。

inputs 和 outputs 支持编解码器,使您能够在数据进入或离开管道时对数据进行编码或解码,而无需使用单独的过滤器。
在这里插入图片描述
启动Logstash,添加一个conf配置文件,便可完成同步任务。

 C:\Windows\system32> cd C:\logstash-7.17.23\
 C:\logstash-7.17.23> .\bin\logstash.bat -f .\config\syslog.conf

syslog.conf:数据同步的配置文件。

举个例子:

输入事件:

input {
  jdbc {
    jdbc_driver_library =>"mysql-connector-java-5.1.36-bin.jar"  //数据库驱动
    jdbc_driver_class =>"com.mysql.jdbc.Driver"                 //连接数据库
    jdbc_connection_string =>"jdbc:mysql://localhost:3306/mydb"
    jdbc_user =>"mysql"
     jdbc_password =>"mysql"
     statement =>"SELECT * from songs where artist = :favorite_artist"  //执行sql语句
    parameters =>{"favorite_artist"=>"Beethoven"}   //预编译  
    schedule =>"* * * * *"    //corn表达式,多久进行同步
  }}

:sql_last_value 可以设置每次查询结果中updatetime为最后的时间,作为下次增量同步的开始时间(需要对时间进行排序才能保证最后一条数据为时间最大的)。

input {
  jdbc {
    statement =>"SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE updatetime > :sql_last_value order  by updatetime desc"
    use_column_value =>true
    tracking_column =>"updatetime "# ... other configuration bits}}

输出事件:

output {
stdout { codec => rubydebug }
elasticsearch {
hosts =>"127.0.0,1:9200"  //写入到ES
index =>"post_v1"        //ES对应的索引
document_id =>"%{id)"        //取数据库查询出的id作为ES中的唯一id
}}

过滤事件:

filter {
mutate {rename=>{"updatetime"=>"updateTime"    //给字段重命名
"userid"=>"userId""createtime"=>"createTime""isdelete"=>"isDelete"
remove_field =>["thumbnm","favournum"]   //移除不需要同步到ES中的字段

更多参数,可参考官方文档进行配置:https://www.elastic.co/guide/en/logstash/7.17/output-plugins.html。

5、Canal

  • Canal

优点:实时同步,实时性非常强;
原理:数据库每次修改时,会修改 binlog 文件,只要监听该文件的修改,就能第一时间得到消息并处理;
canal: 帮你监听 binlog,并解析 binlog 为你可以理解的内容,它伪装成了 mysql 的从节点,获取主节点给的 binlog。

在这里插入图片描述

参考文档https://github.com/alibaba/canal/wiki/QuickStart

后记
👉👉💕💕美好的一天,到此结束,下次继续努力!欲知后续,请看下回分解,写作不易,感谢大家的支持!! 🌹🌹🌹


本文转载自: https://blog.csdn.net/m0_59230408/article/details/140592748
版权归原作者 失重外太空. 所有, 如有侵权,请联系我们删除。

“数据同步-Mysql同步到ElasticSearch”的评论:

还没有评论