文章目录
一、同步实现思路
elasticsearch中的酒店数据来自于mysql数据库,当mysql中的数据发生改变时,es中的数据也要跟着改变,即es与mysql之间的数据同步。
1、方案一:同步调用
操作mysql的微服务hotel-admin不能直接更新es的索引库,那就由操作es索引库的微服务hotel-demo来暴露一个更新索引库的接口给hotel-admin调用
同步调用方式下,业务耦合太多。
2、方案二:异步通知
引入消息队列,hotel-admin将数据写入mysql,并且自己再往MQ发条消息,说"数据更新了",任务就算完成,至于后面的hotel-demo什么时候更新ES,花费了多久,那是hotel-demo自己的事情。
3、方案三:监听binlog
使用canal中间件去监听mysql的binlog,当binlog发生改变,就通知hotel-demo,和上面不同的时,更加解放了hotel-admin这个上游服务,它往mysql写完数据就算任务完成,不用发消息,也不用调用其他服务,达到了完全解耦合。
其实mysql的binlog发生改变,搭配cancel,就像方案二的hotel-admin服务发了一条消息到MQ。
三种实现方式的对比:
二、实现ES与MySQL数据同步
1、导入hotel-admin工程
启动服务,访问localhost:{spring.service.port}
在hotel-admin服务中,模拟MySQL数据的增删改查。
2、项目分析
mysql的增删改动作需要同步到es中,但对es来说,增和改可以归为一类,就看文档id存在不存在了。接下来使用方案二,及MQ实现数据同步,思路如下;
- 声明exchange、queue、RoutingKey
- 在hotel-admin中的增、删、改业务中完成消息发送
- 在hotel-demo中完成消息监听,并更新elasticsearch中数据
模型如下:
3、SpringAMQP整合
- 引入依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
- 临时启动个rabbitmq
docker run -d-p5672:5672 -p15672:15672 rabbitmq:3-management
# 访问host:15672,用户和密码为默认的guest
- 在hotel-admin中的application.yml,添加mq连接信息
spring:rabbitmq:host: 192.168.150.101 # 主机名port:5672# 端口virtual-host: / # 虚拟主机 username: guest # 用户名password: guest # 密码
- 最后,记得给消费方也引入AMQP依赖,并添加上mq的连接信息
4、声明队列和交换机
- 在常量目录下定义队列和交换机的名字
packagecn.llg.hotel.constants;publicclassHotelMqConstants{//交换机名称publicstaticfinalStringEXCHANGE_NAME="hotel.topic";//新增和修改队列publicstaticfinalStringINSERT_QUEUE_NAME="hotel.insert.queue";//删除队列publicstaticfinalStringDELETE_QUEUE_NAME="hotel.delete.queue";//RoutingKeypublicstaticfinalStringINSERT_KEY="hotel.insert";publicstaticfinalStringDELETE_KEY="hotel.delete";}
- 接下来声明队列和交换机,可以基于注解,也可以基于Bean,后者复杂些,这里演示后者
packagecn.llg.hotel.config;importcn.llg.hotel.constants.HotelMqConstants;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.TopicExchange;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
* @date 2023/7/12
*/@ConfigurationpublicclassMqConfig{@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);}@BeanpublicQueueinsertQueue(){returnnewQueue(HotelMqConstants.INSERT_QUEUE_NAME,true);}@BeanpublicQueuedeleteQueue(){returnnewQueue(HotelMqConstants.DELETE_QUEUE_NAME,true);}/**
* 绑定队列和交换机关系
*/@BeanpublicBindinginsertQueueBinding(){returnBindingBuilder.bind(insertQueue()).to(topicExchange()).with(HotelMqConstants.INSERT_KEY);}@BeanpublicBindingdeleteQueueBinding(){returnBindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HotelMqConstants.DELETE_KEY);}}
5、发送消息MQ
注入RabbitTemplate的对象之后,这里就直接在controller中发送MQ消息了,convertAndSend()方法的三个参数:
- 交换机名称
- routingKey
- 消息内容,这里消息体尽量小些,别把一整个对象发过去
packagecn.llg.hotel.web;importcn.llg.hotel.constants.HotelMqConstants;importcn.llg.hotel.pojo.Hotel;importcn.llg.hotel.pojo.PageResult;importcn.llg.hotel.service.IHotelService;importcom.baomidou.mybatisplus.extension.plugins.pagination.Page;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.*;importjava.security.InvalidParameterException;@RestController@RequestMapping("hotel")publicclassHotelController{@AutowiredprivateIHotelService hotelService;@AutowiredprivateRabbitTemplate rabbitTemplate;@PostMappingpublicvoidsaveHotel(@RequestBodyHotel hotel){// 新增酒店
hotelService.save(hotel);// 发送MQ消息,MQ是基于内存的,你把整个酒店对象hotel发过去很容易占满队列,发个主键ID就好,消息体尽量小些
rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME,HotelMqConstants.INSERT_KEY, hotel.getId());}@PutMapping()publicvoidupdateById(@RequestBodyHotel hotel){if(hotel.getId()==null){thrownewInvalidParameterException("id不能为空");}
hotelService.updateById(hotel);// 发送MQ消息
rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME,HotelMqConstants.INSERT_KEY, hotel.getId());}@DeleteMapping("/{id}")publicvoiddeleteById(@PathVariable("id")Long id){
hotelService.removeById(id);// 发送MQ消息
rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME,HotelMqConstants.DELETE_KEY, id);}//其他接口@GetMapping("/list")publicPageResulthotelList(@RequestParam(value ="page", defaultValue ="1")Integer page,@RequestParam(value ="size", defaultValue ="1")Integer size
){Page<Hotel> result = hotelService.page(newPage<>(page, size));returnnewPageResult(result.getTotal(), result.getRecords());}@GetMapping("/{id}")publicHotelqueryById(@PathVariable("id")Long id){return hotelService.getById(id);}}
6、监听MQ消息
hotel-demo整合完SpringAMQP后,在hotel-demo中监听消息。
- 新建类HotelListener类,并加@Component注解以Bean的形式管理
packagecn.llg.hotel.mq;importcn.llg.hotel.constants.HotelMqConstants;importcn.llg.hotel.service.IHotelService;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;/**
* @date 2023/7/13
*/@ComponentpublicclassHotelListener{@ResourceIHotelService hotelService;/**
* 监听酒店新增或者修改的业务
* id接受一个Long,因为发送过来的是一个Long id
* @param id 酒店ID
*/@RabbitListener(queues =HotelMqConstants.INSERT_QUEUE_NAME)publicvoidlistenHotelInsertAndUpdate(Long id){
hotelService.insertDocById(id);}/**
* 监听酒店删除业务
*/@RabbitListener(queues =HotelMqConstants.DELETE_QUEUE_NAME)publicvoidlistenHotelDelete(Long id){
hotelService.deleteDocById(id);}}
- 拿到MQ中的酒店id后,使用JavaHighLevelClient对象来更新ES数据
packagecn.llg.hotel.service;importcn.llg.hotel.domain.dto.RequestParams;importcn.llg.hotel.domain.pojo.Hotel;importcn.llg.hotel.domain.vo.PageResult;importcom.baomidou.mybatisplus.extension.service.IService;publicinterfaceIHotelServiceextendsIService<Hotel>{voidinsertDocById(Long id);voiddeleteDocById(Long id);}
@ServicepublicclassHotelServiceextendsServiceImpl<HotelMapper,Hotel>implementsIHotelService{@ResourceRestHighLevelClient client;@OverridepublicvoidinsertDocById(Long id){try{//0.根据ID查数据,并转为文档类型Hotel hotel =getById(id);HotelDoc hotelDoc =newHotelDoc(hotel);//1.准备requestIndexRequest request =newIndexRequest("hotel").id(hotelDoc.getId().toString());//2.准备DSL
request.source(JSON.toJSONString(hotelDoc),XContentType.JSON);//3.发送请求
client.index(request,RequestOptions.DEFAULT);}catch(IOException e){thrownewRuntimeException(e);}}@OverridepublicvoiddeleteDocById(Long id){try{//1.准备requestDeleteRequest request =newDeleteRequest("hotel",id.toString());//2.发送请求
client.delete(request,RequestOptions.DEFAULT);}catch(IOException e){thrownewRuntimeException(e);}}}
最后补充下上面的Hotel和HotelDoc之间的转换关系:
@Data@TableName("tb_hotel")publicclassHotel{@TableId(type =IdType.INPUT)privateLong id;privateString name;privateString address;privateInteger price;privateInteger score;privateString brand;privateString city;privateString starName;privateString business;privateString longitude;privateString latitude;privateString pic;}
@Data@NoArgsConstructorpublicclassHotelDoc{privateLong id;privateString name;privateString address;privateInteger price;privateInteger score;privateString brand;privateString city;privateString starName;privateString business;privateString location;privateString pic;//距离privateObject distance;//是否充广告privateBoolean isAD;//ES中的completion,后面存数组,这里可以对应成ListprivateList<String> suggestion;publicHotelDoc(Hotel hotel){this.id = hotel.getId();this.name = hotel.getName();this.address = hotel.getAddress();this.price = hotel.getPrice();this.score = hotel.getScore();this.brand = hotel.getBrand();this.city = hotel.getCity();this.starName = hotel.getStarName();this.business = hotel.getBusiness();this.location = hotel.getLatitude()+", "+ hotel.getLongitude();this.pic = hotel.getPic();if(this.business.contains("/")){//此时business有多个值,需要分开后放入suggestionString[] arr =this.business.split("/");//添加元素this.suggestion =newArrayList<>();Collections.addAll(this.suggestion,arr);this.suggestion.add(this.brand);}else{this.suggestion =Arrays.asList(this.brand,this.business);}}}
7、测试同步功能
重启两个服务,查看MQ:
点击队列查看详情,可以看到绑定交换机成功:
接下来去酒店管理页面修改一条酒店信息(我直接调接口了,不去页面改)
在酒店搜索页面搜一下:
可以看到ES数据跟随MySQL更新成功!
版权归原作者 -代号9527 所有, 如有侵权,请联系我们删除。