0


【ElasticSearch】ES与MySQL数据同步方案及Java实现

文章目录

一、同步实现思路

elasticsearch中的酒店数据来自于mysql数据库,当mysql中的数据发生改变时,es中的数据也要跟着改变,即es与mysql之间的数据同步。
在这里插入图片描述

1、方案一:同步调用

操作mysql的微服务hotel-admin不能直接更新es的索引库,那就由操作es索引库的微服务hotel-demo来暴露一个更新索引库的接口给hotel-admin调用

在这里插入图片描述
同步调用方式下,业务耦合太多。

2、方案二:异步通知

引入消息队列,hotel-admin将数据写入mysql,并且自己再往MQ发条消息,说"数据更新了",任务就算完成,至于后面的hotel-demo什么时候更新ES,花费了多久,那是hotel-demo自己的事情。
在这里插入图片描述

3、方案三:监听binlog

使用canal中间件去监听mysql的binlog,当binlog发生改变,就通知hotel-demo,和上面不同的时,更加解放了hotel-admin这个上游服务,它往mysql写完数据就算任务完成,不用发消息,也不用调用其他服务,达到了完全解耦合
在这里插入图片描述
其实mysql的binlog发生改变,搭配cancel,就像方案二的hotel-admin服务发了一条消息到MQ。

三种实现方式的对比:
在这里插入图片描述

二、实现ES与MySQL数据同步

1、导入hotel-admin工程

启动服务,访问localhost:{spring.service.port}

在这里插入图片描述
在hotel-admin服务中,模拟MySQL数据的增删改查。

2、项目分析

mysql的增删改动作需要同步到es中,但对es来说,增和改可以归为一类,就看文档id存在不存在了。接下来使用方案二,及MQ实现数据同步,思路如下;

  • 声明exchange、queue、RoutingKey
  • 在hotel-admin中的增、删、改业务中完成消息发送
  • 在hotel-demo中完成消息监听,并更新elasticsearch中数据

模型如下:

在这里插入图片描述

3、SpringAMQP整合

  • 引入依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  • 临时启动个rabbitmq
docker run -d-p5672:5672 -p15672:15672 rabbitmq:3-management
# 访问host:15672,用户和密码为默认的guest
  • 在hotel-admin中的application.yml,添加mq连接信息
spring:rabbitmq:host: 192.168.150.101 # 主机名port:5672# 端口virtual-host: / # 虚拟主机 username: guest # 用户名password: guest # 密码
  • 最后,记得给消费方也引入AMQP依赖,并添加上mq的连接信息

4、声明队列和交换机

  • 在常量目录下定义队列和交换机的名字
packagecn.llg.hotel.constants;publicclassHotelMqConstants{//交换机名称publicstaticfinalStringEXCHANGE_NAME="hotel.topic";//新增和修改队列publicstaticfinalStringINSERT_QUEUE_NAME="hotel.insert.queue";//删除队列publicstaticfinalStringDELETE_QUEUE_NAME="hotel.delete.queue";//RoutingKeypublicstaticfinalStringINSERT_KEY="hotel.insert";publicstaticfinalStringDELETE_KEY="hotel.delete";}
  • 接下来声明队列和交换机,可以基于注解,也可以基于Bean,后者复杂些,这里演示后者
packagecn.llg.hotel.config;importcn.llg.hotel.constants.HotelMqConstants;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.TopicExchange;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * @date 2023/7/12
 */@ConfigurationpublicclassMqConfig{@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);}@BeanpublicQueueinsertQueue(){returnnewQueue(HotelMqConstants.INSERT_QUEUE_NAME,true);}@BeanpublicQueuedeleteQueue(){returnnewQueue(HotelMqConstants.DELETE_QUEUE_NAME,true);}/**
     * 绑定队列和交换机关系
     */@BeanpublicBindinginsertQueueBinding(){returnBindingBuilder.bind(insertQueue()).to(topicExchange()).with(HotelMqConstants.INSERT_KEY);}@BeanpublicBindingdeleteQueueBinding(){returnBindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HotelMqConstants.DELETE_KEY);}}

5、发送消息MQ

注入RabbitTemplate的对象之后,这里就直接在controller中发送MQ消息了,convertAndSend()方法的三个参数:

  • 交换机名称
  • routingKey
  • 消息内容,这里消息体尽量小些,别把一整个对象发过去
packagecn.llg.hotel.web;importcn.llg.hotel.constants.HotelMqConstants;importcn.llg.hotel.pojo.Hotel;importcn.llg.hotel.pojo.PageResult;importcn.llg.hotel.service.IHotelService;importcom.baomidou.mybatisplus.extension.plugins.pagination.Page;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.*;importjava.security.InvalidParameterException;@RestController@RequestMapping("hotel")publicclassHotelController{@AutowiredprivateIHotelService hotelService;@AutowiredprivateRabbitTemplate rabbitTemplate;@PostMappingpublicvoidsaveHotel(@RequestBodyHotel hotel){// 新增酒店
        hotelService.save(hotel);// 发送MQ消息,MQ是基于内存的,你把整个酒店对象hotel发过去很容易占满队列,发个主键ID就好,消息体尽量小些
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME,HotelMqConstants.INSERT_KEY, hotel.getId());}@PutMapping()publicvoidupdateById(@RequestBodyHotel hotel){if(hotel.getId()==null){thrownewInvalidParameterException("id不能为空");}
        hotelService.updateById(hotel);// 发送MQ消息
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME,HotelMqConstants.INSERT_KEY, hotel.getId());}@DeleteMapping("/{id}")publicvoiddeleteById(@PathVariable("id")Long id){
        hotelService.removeById(id);// 发送MQ消息
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME,HotelMqConstants.DELETE_KEY, id);}//其他接口@GetMapping("/list")publicPageResulthotelList(@RequestParam(value ="page", defaultValue ="1")Integer page,@RequestParam(value ="size", defaultValue ="1")Integer size
    ){Page<Hotel> result = hotelService.page(newPage<>(page, size));returnnewPageResult(result.getTotal(), result.getRecords());}@GetMapping("/{id}")publicHotelqueryById(@PathVariable("id")Long id){return hotelService.getById(id);}}

6、监听MQ消息

hotel-demo整合完SpringAMQP后,在hotel-demo中监听消息。

  • 新建类HotelListener类,并加@Component注解以Bean的形式管理
packagecn.llg.hotel.mq;importcn.llg.hotel.constants.HotelMqConstants;importcn.llg.hotel.service.IHotelService;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;/**
 * @date 2023/7/13
 */@ComponentpublicclassHotelListener{@ResourceIHotelService hotelService;/**
     * 监听酒店新增或者修改的业务
     * id接受一个Long,因为发送过来的是一个Long id
     * @param id 酒店ID
     */@RabbitListener(queues =HotelMqConstants.INSERT_QUEUE_NAME)publicvoidlistenHotelInsertAndUpdate(Long id){
        hotelService.insertDocById(id);}/**
     * 监听酒店删除业务
     */@RabbitListener(queues =HotelMqConstants.DELETE_QUEUE_NAME)publicvoidlistenHotelDelete(Long id){
        hotelService.deleteDocById(id);}}
  • 拿到MQ中的酒店id后,使用JavaHighLevelClient对象来更新ES数据
packagecn.llg.hotel.service;importcn.llg.hotel.domain.dto.RequestParams;importcn.llg.hotel.domain.pojo.Hotel;importcn.llg.hotel.domain.vo.PageResult;importcom.baomidou.mybatisplus.extension.service.IService;publicinterfaceIHotelServiceextendsIService<Hotel>{voidinsertDocById(Long id);voiddeleteDocById(Long id);}
@ServicepublicclassHotelServiceextendsServiceImpl<HotelMapper,Hotel>implementsIHotelService{@ResourceRestHighLevelClient client;@OverridepublicvoidinsertDocById(Long id){try{//0.根据ID查数据,并转为文档类型Hotel hotel =getById(id);HotelDoc hotelDoc =newHotelDoc(hotel);//1.准备requestIndexRequest request =newIndexRequest("hotel").id(hotelDoc.getId().toString());//2.准备DSL
            request.source(JSON.toJSONString(hotelDoc),XContentType.JSON);//3.发送请求
            client.index(request,RequestOptions.DEFAULT);}catch(IOException e){thrownewRuntimeException(e);}}@OverridepublicvoiddeleteDocById(Long id){try{//1.准备requestDeleteRequest request =newDeleteRequest("hotel",id.toString());//2.发送请求
            client.delete(request,RequestOptions.DEFAULT);}catch(IOException e){thrownewRuntimeException(e);}}}

最后补充下上面的Hotel和HotelDoc之间的转换关系:

@Data@TableName("tb_hotel")publicclassHotel{@TableId(type =IdType.INPUT)privateLong id;privateString name;privateString address;privateInteger price;privateInteger score;privateString brand;privateString city;privateString starName;privateString business;privateString longitude;privateString latitude;privateString pic;}
@Data@NoArgsConstructorpublicclassHotelDoc{privateLong id;privateString name;privateString address;privateInteger price;privateInteger score;privateString brand;privateString city;privateString starName;privateString business;privateString location;privateString pic;//距离privateObject distance;//是否充广告privateBoolean isAD;//ES中的completion,后面存数组,这里可以对应成ListprivateList<String> suggestion;publicHotelDoc(Hotel hotel){this.id = hotel.getId();this.name = hotel.getName();this.address = hotel.getAddress();this.price = hotel.getPrice();this.score = hotel.getScore();this.brand = hotel.getBrand();this.city = hotel.getCity();this.starName = hotel.getStarName();this.business = hotel.getBusiness();this.location = hotel.getLatitude()+", "+ hotel.getLongitude();this.pic = hotel.getPic();if(this.business.contains("/")){//此时business有多个值,需要分开后放入suggestionString[] arr =this.business.split("/");//添加元素this.suggestion =newArrayList<>();Collections.addAll(this.suggestion,arr);this.suggestion.add(this.brand);}else{this.suggestion =Arrays.asList(this.brand,this.business);}}}

7、测试同步功能

重启两个服务,查看MQ:

在这里插入图片描述

点击队列查看详情,可以看到绑定交换机成功:

在这里插入图片描述
接下来去酒店管理页面修改一条酒店信息(我直接调接口了,不去页面改)

在这里插入图片描述
在酒店搜索页面搜一下:

在这里插入图片描述

可以看到ES数据跟随MySQL更新成功!


本文转载自: https://blog.csdn.net/llg___/article/details/131691624
版权归原作者 -代号9527 所有, 如有侵权,请联系我们删除。

“【ElasticSearch】ES与MySQL数据同步方案及Java实现”的评论:

还没有评论