0


RabbitMQ实现数据库与ElasticSearch的数据同步和分享文件过期处理

🎈 1 参考文档

RabbitMQ实现数据库与ElasticSearch的数据同步 | Hannya。-CSDN

企业级开发项目实战——基于RabbitMQ实现数据库、elasticsearch的数据同步 | 波总说先赚它一个小目标-CSDN

SPringBoot集成RabbitMQ实现30秒过期删除功能 | 军大君-CSDN


🔍 2 个人需求

  1. 当进行文件上传、文件创建、文件重命名等操作时:通过RabbitMQ:- 生产者:文件服务,执行上传、创建、重命名等文件操作,将用户文件信息(例如文件名、文件ID等)发送到RabbitMQ新增队列。- 消费者:查询服务,监听RabbitMQ新增队列,一旦收到消息,将用户文件信息新增或更新到Elasticsearch中。
  2. 文件删除时:通过RabbitMQ:- 生产者:文件服务,执行文件删除操作,将用户文件ID发送到RabbitMQ删除队列。- 消费者:查询服务,监听 RabbitMQ 队列,一旦收到消息,通过用户文件ID从Elasticsearch中删除相应的用户文件信息。
  3. 根据文件名进行文件模糊查询:通过OpenFeign:- 生产者:文件服务,查询服务调用文件服务提供的OpenFeign接口,通过用户文件ID从查询该用户文件是否存在。- 消费者:查询服务,如果不存在,将数据根据用户文件ID从Elasticsearch中删除。
  4. 分享文件时间到期处理:通过RabbitMQ的TTL(生存时间) + 死信队列:- 生产者:文件服务, 使用TTL模拟一个“延时队列”,在文件分享时间到期后,将消息传递到死信队列。- 消费者:文件服务,死信监听器监听到之后,将分享文件的分享状态改为已过期状态。

🔈3 声明

只是提供思路,代码不是很完整,直接复制运行不了。

最后面有完整网盘项目代码。


🚀4 OpenFeign相关部分(查询服务)

4.1 引入依赖

<!-- nacos --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!-- openfeign --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><!-- loadbalancer --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-loadbalancer</artifactId></dependency>

4.2 application.yml

spring:# nacos注册的服务名application:name: netdisk-search
  cloud:nacos:discovery:# 配置注册服务的IP地址server-addr: (IP地址):8848username: nacos
        password: nacos

4.3 FileFeignService 接口

@FeignClient(name ="netdisk-file", configuration =FeignInterceptor.class)publicinterfaceFileFeignService{@RequestMapping("/file/getUserFile/{userFileId}")ResultResponse<Boolean>getUserFile(@PathVariableLong userFileId);}

4.4 @EnableFeignClients 注解

@ComponentScan(value ="com.cauli.search.*")@EnableFeignClients(basePackages ="com.cauli.search")@SpringBootApplicationpublicclassNetdiskSearchApplication{publicstaticvoidmain(String[] args){SpringApplication.run(NetdiskSearchApplication.class, args);}}

🚀5 Elasticsearch相关部分(查询服务)

5.1 引入依赖

<!-- elasticsearch --><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.0.1</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.12.3</version></dependency><dependency><groupId>jakarta.json</groupId><artifactId>jakarta.json-api</artifactId><version>2.0.1</version></dependency>

5.2 application.yml

# elasticsearch相关的配置elasticsearch:# ES网关地址hostname: (IP地址)
  # ES网关端口port:9200# ES网官方方案scheme: http

5.3 ElasticSearchConfig 配置类

@ConfigurationpublicclassElasticSearchConfig{@Value("${elasticsearch.hostname}")String hostname;@Value("${elasticsearch.port}")int port;@Value("${elasticsearch.scheme}")String scheme;@BeanpublicElasticsearchClientelasticsearchClient(){// 创建低级客户端RestClient client =RestClient.builder(newHttpHost(hostname, port,scheme)).build();// 创建API客户端,使用Jackson映射器创建传输层ElasticsearchTransport transport =newRestClientTransport(client,newJacksonJsonpMapper());returnnewElasticsearchClient(transport);}}

5.4 Elasticsearch 服务类和服务实现类

publicinterfaceElasticsearchService{/**
     * 更新ES数据
     *
     * @param fileSearchDTO
     */voiduploadES(FileSearchDTO fileSearchDTO);/**
     * 删除ES数据
     *
     * @param userFileId
     */voiddeleteES(Long userFileId);/**
     * 搜索ES数据
     *
     * @return
     */List<SearchFileVO>searchES(SearchFileQueryDTO searchFileVO);}
@Slf4j@ServicepublicclassElasticsearchServiceImplimplementsElasticsearchService{@AutowiredprivateElasticsearchClient elasticsearchClient;@ResourceprivateFileFeignService feignService;privatefinalThreadPoolExecutor executor =newThreadPoolExecutor(12,// 核心线程数20,// 最大线程数1,// 线程存活时间TimeUnit.SECONDS,// 存活时间单位newArrayBlockingQueue<>(1000)// 任务队列);publicvoiduploadES(FileSearchDTO fileSearchDTO){
        executor.execute(()->{try{
                elasticsearchClient.index(i -> i.index("file_search").id(fileSearchDTO.getUserFileId()).document(fileSearchDTO));}catch(IOException e){thrownewRuntimeException(e);}});}publicvoiddeleteES(Long userFileId){
        executor.execute(()->{try{
                elasticsearchClient.delete(d -> d
                        .index("file_search").id(String.valueOf(userFileId)));}catch(Exception e){
                log.debug("ES删除操作失败,请检查配置");}});}@OverridepublicList<SearchFileVO>searchES(SearchFileQueryDTO searchFileQueryDTO){int pageNum =(int) searchFileQueryDTO.getPageNum()-1;int pageSize =(int)(searchFileQueryDTO.getPageSize()==0?10: searchFileQueryDTO.getPageSize());SearchResponse<FileSearchDTO> search =null;try{
            search = elasticsearchClient.search(s -> s
                    .index("file_search").query(_1 -> _1
                            .bool(_2 -> _2
                                    .must(_3 -> _3
                                            .bool(_4 -> _4
                                                    .should(_5 -> _5
                                                            .match(_6 -> _6
                                                                    .field("fileName").query(searchFileQueryDTO.getFileName()))).should(_5 -> _5
                                                            .wildcard(_6 -> _6
                                                                    .field("fileName").wildcard("*"+ searchFileQueryDTO.getFileName()+"*"))))).must(_3 -> _3
                                            .term(_4 -> _4
                                                    .field("userId").value(StpUtil.getLoginIdAsLong()))))).from(pageNum).size(pageSize).highlight(h -> h
                            .fields("fileName", f -> f.type("plain").preTags("<span class='keyword'>").postTags("</span>")).encoder(HighlighterEncoder.Html)),FileSearchDTO.class);}catch(IOException e){
            e.printStackTrace();}List<SearchFileVO> searchFileVOList =newArrayList<>();if(search !=null){for(Hit<FileSearchDTO> hit : search.hits().hits()){SearchFileVO searchFileVO =newSearchFileVO();BeanUtil.copyProperties(hit.source(), searchFileVO);
                searchFileVO.setHighLight(hit.highlight());
                searchFileVOList.add(searchFileVO);// 如果文件不存在,也从ES中删除if(!feignService.getUserFile(searchFileVO.getUserFileId()).getData()){
                    executor.execute(()->this.deleteES(searchFileVO.getUserFileId()));}}}return searchFileVOList;}}

5.5 ElasticsearchController 前端控制器

@RestController@RequestMapping("/search")publicclassElasticsearchController{@AutowiredprivateElasticsearchService elasticService;@GetMapping(value ="/searchFile")publicRestResult<SearchFileVO>searchFile(SearchFileQueryDTO searchFileQueryDTO){List<SearchFileVO> searchFileVOList = elasticService.searchES(searchFileQueryDTO);returnRestResult.success().dataList(searchFileVOList, searchFileVOList.size());}}

5.6 相关实体类

/**
 * 文件搜索VO
 */@DatapublicclassSearchFileVO{@JsonSerialize(using =ToStringSerializer.class)privateLong userFileId;privateString fileName;privateString filePath;privateString extendName;privateLong fileSize;privateString fileUrl;privateMap<String,List<String>> highLight;privateInteger isDir;}
/**
 * 文件搜索DTO
 */@Data@JsonIgnoreProperties(ignoreUnknown =true)publicclassFileSearchDTO{privateString indexName;privateString userFileId;privateString fileId;privateString fileName;privateString content;privateString fileUrl;privateLong fileSize;privateInteger storageType;privateString identifier;privateLong userId;privateString filePath;privateString extendName;privateInteger isDir;privateString deleteTime;privateString deleteBatchNum;}
/**
 * 文件查询条件DTO
 */@DatapublicclassSearchFileQueryDTO{@ApiModelProperty("文件名")privateString fileName;@ApiModelProperty("当前页")privatelong pageNum;@ApiModelProperty("每页数量")privatelong pageSize;}

🚀6 RabbitMQ相关部分

6.1 生产者部分(文件服务)

6.1.1 引入依赖

<!-- nacos --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!-- RabbitMQ(我的SpringBoot是2.6.8的) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

6.1.2 完整application.yml

server:port:8083spring:# MySQL配置datasource:driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/(数据库名)?characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=trueusername: root
    password: (MySQL密码)
  # nacos注册的服务名application:name: netdisk-file
  cloud:nacos:discovery:# 配置注册服务的IP地址server-addr: (IP地址):8848username: nacos
        password: nacos
  # rabbitmq相关的配置rabbitmq:host: (IP地址)
    port:5672virtual-host: (虚拟主机名,比如:/file)
    username: (用户名,默认:guest)
    password: (密码,默认:guest)

6.1.3 RabbitMQConfig 配置类

@ConfigurationpublicclassRabbitMQConfig{// 普通交换机publicstaticfinalStringFILE_EXCHANGE="file.exchange";// 文件保存相关publicstaticfinalStringQUEUE_FILE_SAVE="queue.file.save";publicstaticfinalStringKEY_FILE_SAVE="key.file.save";// 文件删除相关publicstaticfinalStringQUEUE_FILE_REMOVE="queue.file.remove";publicstaticfinalStringKEY_FILE_REMOVE="key.file.remove";// 死信相关publicstaticfinalStringDEAD_LETTER_EXCHANGE="deadLetter.exchange";publicstaticfinalStringDEAD_LETTER_QUEUE="deadLetter.queue";publicstaticfinalStringKEY_FILE_DEAD_LETTER="key.file.dead.letter";//延迟队列publicstaticfinalStringDELAY_QUEUE="delay.queue";/**
     * 文件保存队列
     *
     * @return
     */@BeanpublicQueuequeueFileSave(){returnnewQueue(QUEUE_FILE_SAVE);}/**
     * 文件删除队列
     *
     * @return
     */@BeanpublicQueuequeueFileRemove(){returnnewQueue(QUEUE_FILE_REMOVE);}/**
     * 交换机
     *
     * @return
     */@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange(FILE_EXCHANGE);}/**
     * 绑定文件保存队列到交换机
     *
     * @return
     */@BeanpublicBindingbindFileSave(){returnBindingBuilder.bind(queueFileSave()).to(topicExchange()).with(KEY_FILE_SAVE);}/**
     * 绑定文件删除队列到交换机
     *
     * @return
     */@BeanpublicBindingbindFileRemove(){returnBindingBuilder.bind(queueFileRemove()).to(topicExchange()).with(KEY_FILE_REMOVE);}/**
     * 定义延时队列
     *
     * @return
     */@BeanpublicQueuedelayQueue(){//设置死信交换机和路由keyreturnQueueBuilder.durable(DELAY_QUEUE)//如果消息过时,则会被投递到当前对应的死信交换机.withArgument("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE)//如果消息过时,死信交换机会根据routing-key投递消息到对应的队列.withArgument("x-dead-letter-routing-key",KEY_FILE_DEAD_LETTER).build();}/**
     * 定义死信交换机
     *
     * @return
     */@BeanpublicTopicExchangedeadLetterExchange(){returnnewTopicExchange(DEAD_LETTER_EXCHANGE);}/**
     * 定义死信队列
     *
     * @return
     */@BeanpublicQueuedeadLetterQueue(){returnnewQueue(DEAD_LETTER_QUEUE);}/**
     * 绑定死信队列到死信交换机
     *
     * @return
     */@BeanpublicBindingdeadLetterBinding(){returnBindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(KEY_FILE_DEAD_LETTER);}}

6.1.4 FileDealComp 文件逻辑处理组件伪代码

/**
 * 文件逻辑处理组件
 */@Slf4j@ComponentpublicclassFileDealComp{@AutowiredprivateRabbitTemplate rabbitTemplate;privatefinalThreadPoolExecutor executor =newThreadPoolExecutor(12,// 核心线程数20,// 最大线程数1,// 线程存活时间TimeUnit.SECONDS,// 存活时间单位newArrayBlockingQueue<>(1000)// 任务队列);/**
     * 更新ES数据
     *
     * @param userFileId
     */publicvoiduploadES(Long userFileId){
        executor.execute(()->{FileSearchDTO fileSearchDTO =newFileSearchDTO();// 通过用户文件ID查询用户文件信息...// 通过文件ID查询文件信息...// 将用户文件信息和文件信息同步到fileSearchDTO对象...// 消息队列更新ES
            rabbitTemplate.convertAndSend(RabbitMQConfig.FILE_EXCHANGE,RabbitMQConfig.KEY_FILE_SAVE, fileSearchDTO);});}/**
     * 删除ES数据
     *
     * @param userFileId
     */publicvoiddeleteES(Long userFileId){// 消息队列删除ES
        rabbitTemplate.convertAndSend(RabbitMQConfig.FILE_EXCHANGE,RabbitMQConfig.KEY_FILE_REMOVE, userFileId);}/**
     * 分享文件过期处理
     *
     * @param shareBatchNum 分享批次号
     */publicvoidexpiredShareFile(String shareBatchNum){Share share =newShare();// 根据分享批次号获取分享信息...// 将分享信息同步到share对象...// 定义日期格式SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");long differenceInMillis =0;try{// 解析日期字符串为日期对象Date shareDate = sdf.parse(share.getShareTime());Date endDate = sdf.parse(share.getEndTime());// 计算时间差(毫秒数)
            differenceInMillis = endDate.getTime()- shareDate.getTime();}catch(ParseException e){
            e.printStackTrace();}// 存活时间String expiration =Long.toString(differenceInMillis);// 延时队列
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_QUEUE, share, message ->{
            message.getMessageProperties().setExpiration(expiration);return message;});}}

6.1.5 ExpiredShareFileListener 过期的分享文件处理监听器

@Slf4j@Component@RabbitListener(queues ="my-dlx-queue")publicclassExpiredShareFileListener{@AutowiredprivateShareService shareService;// 死信相关publicstaticfinalStringDEAD_LETTER_EXCHANGE="deadLetter.exchange";publicstaticfinalStringDEAD_LETTER_QUEUE="deadLetter.queue";publicstaticfinalStringKEY_FILE_DEAD_LETTER="key.file.dead.letter";@RabbitListener(bindings ={@QueueBinding(
                    key =KEY_FILE_DEAD_LETTER,
                    value =@Queue(value =DEAD_LETTER_QUEUE, durable ="true"),
                    exchange =@Exchange(value =DEAD_LETTER_EXCHANGE, type =ExchangeTypes.TOPIC, ignoreDeclarationExceptions ="true"))})publicvoidreceiveShareMessage(Share share){
        log.info("监听到文件过期处理操作:{}", share);// 将share的分享状态改为已过期 → 将share的shareStatus由0改为1...

        log.info("操作完成:{}", share);}}

6.2 消费者部分(查询服务)

6.2.1 引入依赖

<!-- RabbitMQ (我的SpringBoot是2.6.8的) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

6.2.2 完整application.yml

server:port:8084spring:# MySQL配置datasource:driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/(数据库名)?characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=trueusername: root
    password: (MySQL密码)
  # nacos注册的服务名application:name: netdisk-search
  cloud:nacos:discovery:# 配置注册服务的IP地址server-addr: (IP地址):8848username: nacos
        password: nacos
  mvc:path match:matching-strategy: ant_path_matcher
  servlet:multipart:enabled:true# 单个文件最大限制max-file-size: 1024MB
      # 多个文件最大限制max-request-size: 2048MB
  # rabbitmq相关的配置rabbitmq:host: (IP地址)
    port:5672virtual-host: (虚拟主机名,比如:/file)
    username: (用户名,默认:guest)
    password: (密码,默认:guest)

# elasticsearch相关的配置elasticsearch:# ES网关地址hostname: (IP地址)
  # ES网关端口port:9200# ES网官方方案scheme: http

6.2.3 FileMQListener 文件处理消息队列监听

@Slf4j@ComponentpublicclassFileMQListener{// 普通交换机publicstaticfinalStringFILE_EXCHANGE="file.exchange";// 文件保存相关publicstaticfinalStringQUEUE_FILE_SAVE="queue.file.save";publicstaticfinalStringKEY_FILE_SAVE="key.file.save";// 文件删除相关publicstaticfinalStringQUEUE_FILE_REMOVE="queue.file.remove";publicstaticfinalStringKEY_FILE_REMOVE="key.file.remove";@AutowiredprivateElasticsearchService elasticsearchService;/**
     * 监听文件信息添加操作
     *
     * @param fileSearchDTO
     */@RabbitListener(bindings ={@QueueBinding(
            key =KEY_FILE_SAVE,
            value =@Queue(value =QUEUE_FILE_SAVE, durable ="true"),
            exchange =@Exchange(value =FILE_EXCHANGE, type =ExchangeTypes.TOPIC, ignoreDeclarationExceptions ="true"))})publicvoidreceiveFileSaveMessage(FileSearchDTO fileSearchDTO){try{
            log.info("监听到文件信息添加操作:{}", fileSearchDTO);// 更新ES数据
            elasticsearchService.uploadES(fileSearchDTO);
            
            log.info("添加完成:{}", fileSearchDTO);}catch(Exception ex){
            ex.printStackTrace();}}/**
     * 监听文件信息删除操作
     *
     * @param userFileId
     */@RabbitListener(bindings ={@QueueBinding(
            key =KEY_FILE_REMOVE,
            value =@Queue(value =QUEUE_FILE_REMOVE, durable ="true"),
            exchange =@Exchange(value =FILE_EXCHANGE, type =ExchangeTypes.TOPIC, ignoreDeclarationExceptions ="true"))})publicvoidreceiveFileDeleteMessage(Long userFileId){try{
            log.info("监听到文件信息删除操作:{}", userFileId);// 删除ES数据
            elasticsearchService.deleteES(userFileId);
            
            log.info("文件信息删除完成:{}", userFileId);}catch(Exception ex){
            ex.printStackTrace();}}}

📫7 代码仓库

netdisk-cloud | Gitee


本文转载自: https://blog.csdn.net/ManGooo0/article/details/132675938
版权归原作者 Cau1i 所有, 如有侵权,请联系我们删除。

“RabbitMQ实现数据库与ElasticSearch的数据同步和分享文件过期处理”的评论:

还没有评论