文章目录
1、引入
玩过Elasticsearch(下面统称ES)的小伙伴都知道ES是一个十分强悍的搜索引擎,但是在之前学习的过程中一直都是通过手敲DSL语句把数据导入进去,这多少有点不优雅。那么到底能不能做到在我们数据库发生变更时,ES中的数据也随之改变,即做到两者数据同步呢?答案理所应当的可以!
2、方案分析
常见的数据同步方案主要有以下三种:
- 同步调用。在实现增删改的同时,通过调用ES所在服务提供的接口,从而实现两端服务的数据同步。这种方式能够简单粗暴的解决同步问题,但是业务的耦合度较高,一般不建议使用;
- 异步调用。增删改服务和搜索服务分别通过MQ进行发送和监听消息,从而实现两端服务的数据同步。这种方式能够在一定程度上有效降低业务的耦合度,但较为依赖MQ的性能;
- binlog监听。给MySQL开启binlog功能,搜索服务基于canal监听binlog变化,从而实现两端服务的数据同步。这种方法完全解除了服务间的耦合,但开启binlog会增加数据库负担、同时实现复杂度较高。
而在这里选择的是通过MQ异步调用方案解决不同服务间ES和MySQL的数据同步问题,MQ则是采用了RabbitMQ。
3、具体实现
3.1、大致思路
我们的目标其实是很明确的,就是我们在进行增删改操作之后,能够即可在搜索端更新并得以体现。这里使用的是发布订阅模型中的Topic Exchange,当然也可以使用Direct Exchange,但是就不推荐使用Fanout Exchange了,当然自己喜欢最重要。
需要注意的是,这里除了数据同步的具体实现外,其余的都只是大致提及需要自己进行编写,比如增删改中的接口,搜索的接口等。
步骤大致可分为以下部分:
- 搭建一个增删改服务my-admin,并且准备好对应的增删改接口;
- 搭建一个搜索服务my-search,并且准备好对应的搜索接口;
- 导入实现数据同步所需依赖;
- 配置xml文件;
- 声明交换机及所需队列;
- 在my-admin服务中完成消息发送;
- 在my-search服务中对消息进行监听从而更新ES数据。
3.2、导入依赖
第一步和第二步这里就直接省略了,大家伙们随便准备一个简单的demo就行。这里除了基本的依赖之外,值得注意的只要有三个依赖,分别是数据库依赖、ES依赖和MQ依赖,如果需要用的druid的则需要自己进行修改。(其中ES依赖只有搜索服务需要)
<!-- 两个服务都需要的依赖 --><!-- RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- MySQL --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- 搜索服务独有依赖 --><!--elasticsearch--><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId></dependency>
3.3、配置文件
配置文件主要是对MySQL和RabbitMQ进行配置。这一部分两个服务不出意外的话是一样的,除非是用了不同的数据库或者RabbitMQ。
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/heima?useSSL=false
username: root
password:123456rabbitmq:host: 192.168.150.100 # 主机名port:5672# 端口virtual-host: / # 虚拟主机username: xbaozi # 用户名password:123456# 密码
3.4、MQ搭建
3.4.1、封装名称属性
对交换机和队列以及Topic Exchange所需要的RoutingKey名称,提高整体代码的复用性。在两个服务中同时新建一个constant包,将该包下新建下面类:
publicclassMqConstants{/**
* 交换机
*/publicfinalstaticString MY_EXCHANGE ="my.topic";/**
* 监听新增和修改的队列
*/publicfinalstaticString MY_INSERT_QUEUE ="my.insert.queue";/**
* 监听删除的队列
*/publicfinalstaticString MY_DELETE_QUEUE ="my.delete.queue";/**
* 新增或修改的RoutingKey
*/publicfinalstaticString MY_INSERT_KEY ="my.insert";/**
* 删除的RoutingKey
*/publicfinalstaticString MY_DELETE_KEY ="my.delete";}
3.4.2、声明注入交换机
因为是两边服务都需要,且为了避免只在一个服务声明另一个服务先运行产生异常,因此这里使用的是配置类注入的形式进行声明,当然也可以使用注解的形式,只是相对来说对这里不太方便而已。在两个服务中新建config包,并在该包下新建配置类:
@ConfigurationpublicclassMqConfig{@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange(MqConstants.MY_EXCHANGE,true,false);}@BeanpublicQueueinsertQueue(){returnnewQueue(MqConstants.MY_INSERT_QUEUE,true);}@BeanpublicQueuedeleteQueue(){returnnewQueue(MqConstants.MY_DELETE_QUEUE,true);}@BeanpublicBindinginsertQueueBinding(){returnBindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.MY_INSERT_KEY);}@BeanpublicBindingdeleteQueueBinding(){returnBindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.MY_DELETE_KEY);}}
3.4.3、发送消息
发送消息相对简单,只需要在增删改服务中的增、删、改操作中调用convertAndSend方法即可。(记得注入RabbitTemplate)
@PostMappingpublicvoidsave(@RequestBodyUser user){
userService.save(user);// 发送保存消息,避免发送消息过大造成队列溢出,因此只需要发送id支持查询即可
rabbitTemplate.convertAndSend(MqConstants.MY_EXCHANGE,MqConstants.MY_INSERT_KEY, user.getId());}@PutMapping()publicvoidupdateById(@RequestBodyUser user){if(user.getId()==null){thrownewInvalidParameterException("id不能为空");}
userService.updateById(user);// 发送更新消息,避免发送消息过大造成队列溢出,因此只需要发送id支持查询即可
rabbitTemplate.convertAndSend(MqConstants.MY_EXCHANGE,MqConstants.MY_INSERT_KEY, user.getId());}@DeleteMapping("/{id}")publicvoiddeleteById(@PathVariable("id")Long id){
userService.removeById(id);// 发送删除消息
rabbitTemplate.convertAndSend(MqConstants.MY_EXCHANGE,MqConstants.MY_DELETE_KEY, id);}
3.4.4、接收消息
对于接受消息需要准备好删除和更新的业务逻辑,其中新增和修改统称为了更新业务。除此之外,还需要准备好一个监听器,用于监听MQ发送过来的消息。
- 业务逻辑实现:这里是通过service实现了两个业务逻辑,这里直接给出Impl中的具体实现方式。
@OverridepublicvoiddeleteById(Long id){try{// 1.准备RequestDeleteRequest request =newDeleteRequest("hotel", id.toString());// 2.发送请求
client.delete(request,RequestOptions.DEFAULT);}catch(IOException e){thrownewRuntimeException(e);}}@OverridepublicvoidupdateById(Long id){try{// 0.根据id查询酒店数据User user =getById(id);// 转换为文档类型UserDoc userDoc =newUserDoc(user);// 1.准备Request对象IndexRequest request =newIndexRequest("user").id(user.getId().toString());// 2.准备Json文档
request.source(JSON.toJSONString(userDoc),XContentType.JSON);// 3.发送请求
client.index(request,RequestOptions.DEFAULT);}catch(IOException e){thrownewRuntimeException(e);}}
- 监听消息实现:因为前期已经通过Bean注入了交换机与队列的关系,因此这里是需要使用注解表明需要监听的队列即可。
/**
* 监听用户新增或修改的业务
* @param id 用户id
*/@RabbitListener(queues =MqConstants.MY_INSERT_QUEUE)publicvoidlistenUserInsertOrUpdate(Long id){
userService.insertById(id);}/**
* 监听用户删除的业务
* @param id 用户id
*/@RabbitListener(queues =MqConstants.MY_DELETE_QUEUE)publicvoidlistenUserDelete(Long id){
userService.deleteById(id);}
3.5、验收成果
到这里的话就可以对结果进行演示了,因为在写这玩意的时候是已经写完了demo且已经是关掉了的,因此这里就不放演示截图,直接告诉大家伙的验收方式:
- 如果有页面的,通过界面测试: - 同时打开搜索界服务和增删改服务的界面,找到同一条数据;- 通过增删改服务的界面进行新增操作(删除或修改);- 确保已经操作成功后在搜索服务界面刷新,找到那条数据进行验证;- 如果搜索服务搜索出来的是更新后的,则证明数据同步成功。
- 如果没有界面的,通过接口测试工具测试: - 同样先找到同一条数据确定下来,无论是通过接口查询还是DSL或SQL;- 通过Postman或其他接口测试工具进行新增操作(删除或修改);- 确保已经操作成功后再次调用搜索服务的搜索接口,找到那条数据进行验证;- 如果搜索出来的是更新后的,则证明数据同步成功。
4、失败原因推测
这里对可能失败的原因进行一个推测,如果真的失败了可以尝试一下以下方法:
- MQ、ES等服务是否启动;
- 如果是使用的服务器或虚拟机,防火墙 / 端口是否打开;
- 是否在两个服务中都对MySQL以及MQ进行配置;
- 在运行时代码中是否对RestClient进行初始化,如通过Bean注入;
- 交换机、队列的声明和绑定是否正确,且在两个服务中都进行了声明;
- 编译器如IDEA的Maven配置、yml文件的字符集配置是否正确;
- 如果是长期打开的虚拟机,建议重启后再试;
- 换一个IDEA或者服务器吧,没救了(偷溜)
版权归原作者 陈宝子 所有, 如有侵权,请联系我们删除。