0


分布式 SpringCloudAlibaba、Feign与RabbitMQ实现MySQL到ES数据同步

文章目录

⛄引言

本文参考黑马 分布式Elastic search

Elasticsearch是一款非常强大的开源搜索引擎,具备非常多强大功能,可以帮助我们从海量数据中快速找到需要的内容

一、思路分析

⛅实现方式

同步调用

方案一:同步调用

在这里插入图片描述

基本步骤如下:

  • hotel-demo对外提供接口,用来修改elasticsearch中的数据
  • 酒店管理服务在完成数据库操作后,直接调用hotel-demo提供的接口,

异步通知

方案二:异步通知

在这里插入图片描述

流程如下:

  • hotel-admin对mysql数据库数据完成增、删、改后,发送MQ消息
  • hotel-demo监听MQ,接收到消息后完成elasticsearch数据修改

监听binlong

方案三:监听binlog

在这里插入图片描述

流程如下:

  • 给mysql开启binlog功能
  • mysql完成增、删、改操作都会记录在binlog中
  • hotel-demo基于canal监听binlog变化,实时更新elasticsearch中的内容

⚡框架选择

方式一:同步调用

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高

方式二:异步通知

  • 优点:低耦合,实现难度一般
  • 缺点:依赖mq的可靠性

方式三:监听binlog

  • 优点:完全解除服务间耦合
  • 缺点:开启binlog增加数据库负担、实现复杂度高

本次实现方式我们选择 以RabbitMQ 异步方式 搭载 SpringCloud Alibaba + Feign 实现。

二、实现数据同步

⌚需求分析

需求

实现酒店管理增删改查业务,已提供页面。 完成其数据发生增删改查操作时 同步 ElasticSearch

分析

我们采用分布式技术的方式来实现

框架采用 SpringCloud Alibaba、Nacos 、OpenFeign 远程调用、RabbitMQ 作为消息承载体承载数据、 Elastic Search 搜索引擎

⏰搭建环境

以下为模块概览

在这里插入图片描述

主要分为两大模块

  • 完成酒店模块增删改查业务,引入MQ依赖,完成其向MQ的发送消息 此模块作为生产者
  • 完成ES-MQ模块,引入MQ、ES依赖,完成接受MQ的消息以及完成对ES的更新此模块作为消费者

注意:Nacos需要自行下载,本项目依赖于Nacos注册中心, 运行起来后不影响后面的服务注册进nacos

本次所用到的 RabbitMQ、 ElasticSearch 均部署在 云服务器

MQ结构如图:

在这里插入图片描述

⚡核心源码

hotel-service 业务模块

导入hotel-service 核心代码,已完成基础的增删改查工作。 **具体源码公众号搜索

程序员Bug终结者

回复 es 获取**

ES模块引入依赖

<!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- ES --><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId></dependency>

声明队列交换机名称

publicclassMqConstants{/**
     * 交换机
     */publicstaticfinalStringHOTEL_EXCHANGE="hotel.topic";/**
     * 新增或修改的routing_key
     */publicstaticfinalStringHOTEL_INSERT_KEY="hotel.insert";/**
     * 删除的 routing_key
     */publicstaticfinalStringHOTEL_DELETE_KEY="hotel.delete";}

hotel-service模块 发送消息

@RestController@RequestMapping("hotel")publicclassHotelController{@AutowiredprivateHotelService hotelService;@ResourceprivateRabbitTemplate rabbitTemplate;@GetMapping("/{id}")publicHotelqueryById(@PathVariable("id")Long id){return hotelService.getById(id);}@GetMapping("/list")publicPageResulthotelList(@RequestParam(value ="page", defaultValue ="1")Integer page,@RequestParam(value ="size", defaultValue ="1")Integer size
    ){Page<Hotel> result = hotelService.page(newPage<>(page, size));returnnewPageResult(result.getTotal(), result.getRecords());}@PostMappingpublicvoidsaveHotel(@RequestBodyHotel hotel){
        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY, hotel.getId());}@PutMapping()publicvoidupdateById(@RequestBodyHotel hotel){if(hotel.getId()==null){thrownewInvalidParameterException("id不能为空");}
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY, hotel.getId());}@DeleteMapping("/{id}")publicvoiddeleteById(@PathVariable("id")Long id){
        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY, id);}}

ES模块接受消息

@Component@Slf4jpublicclassMqConsumerListener{@ResourceprivateHotelService hotelService;/**
     * 监听酒店新增或修改的业务
     * @param id
     */@RabbitListener(bindings =@QueueBinding(value =@Queue(value =MqConstants.HOTEL_INSERT_KEY, durable ="true"),
            exchange =@Exchange(name =MqConstants.HOTEL_EXCHANGE, type =ExchangeTypes.DIRECT), key =MqConstants.HOTEL_INSERT_KEY))publicvoidlistenHotelInsertOrUpdate(String id)throwsIOException{
        hotelService.insertById(id);}/**
     * 监听酒店删除的业务
     * @param id
     */@RabbitListener(bindings =@QueueBinding(value =@Queue(value =MqConstants.HOTEL_DELETE_KEY, durable ="true"),
            exchange =@Exchange(name =MqConstants.HOTEL_EXCHANGE, type =ExchangeTypes.DIRECT), key =MqConstants.HOTEL_DELETE_KEY))publicvoidlistenHotelDelete(String id)throwsIOException{
        hotelService.deleteById(id);}}

核心方法实现

@ServicepublicclassHotelService{@ResourceprivateRestHighLevelClient client;@ResourceprivateHotelClient hotelClient;publicvoidinsertById(String id){try{//1. 根据id查询酒店数据Hotel hotel = hotelClient.findById(id);HotelDoc hotelDoc =newHotelDoc(hotel);//2. 准备RequestIndexRequest request =newIndexRequest("hotel").id(hotel.getId().toString());//3. 准备DSL
            request.source(JSON.toJSONString(hotelDoc),XContentType.JSON);//4. 发送请求
            client.index(request,RequestOptions.DEFAULT);}catch(IOException e){
            e.printStackTrace();}}publicvoiddeleteById(String id){try{//1. 准备 RequestDeleteRequest request =newDeleteRequest("hotel", id);// 2.发送请求
            client.delete(request,RequestOptions.DEFAULT);}catch(Exception e){
            e.printStackTrace();}}}

三、测试

运行nacos

startup.cmd -m standalone

将hotel-service模块注册到nacos

在这里插入图片描述

访问页面,对酒店数据进行增删改查操作

在这里插入图片描述

将第一条信息价格修改为399

在这里插入图片描述

查看es中数据的变化

在这里插入图片描述

成功完成数据同步

四、源码获取

请联系 公众号 程序员Bug终结者 回复 es同步 获取源码及数据库文件

⛵小结

以上就是【Bug 终结者】对 分布式 SpringCloudAlibaba、Feign与RabbitMQ实现MySQL到ES数据同步 的简单介绍,ES搜索引擎无疑是最优秀的分布式搜索引擎,使用它,可大大提高项目的灵活、高效性!**通过本文已了解 MySQL数据同步ES基本过程以及核心实现

技术改变世界!!!

**

如果这篇【文章】有帮助到你,希望可以给【Bug 终结者】点个赞👍,创作不易,如果有对【后端技术】、【前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【Bug 终结者】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!


本文转载自: https://blog.csdn.net/weixin_45526437/article/details/137338372
版权归原作者 Bug 终结者 所有, 如有侵权,请联系我们删除。

“分布式 SpringCloudAlibaba、Feign与RabbitMQ实现MySQL到ES数据同步”的评论:

还没有评论