🎈 1 参考文档
RabbitMQ实现数据库与ElasticSearch的数据同步 | Hannya。-CSDN
企业级开发项目实战——基于RabbitMQ实现数据库、elasticsearch的数据同步 | 波总说先赚它一个小目标-CSDN
SPringBoot集成RabbitMQ实现30秒过期删除功能 | 军大君-CSDN
🔍 2 个人需求
- 当进行文件上传、文件创建、文件重命名等操作时:通过RabbitMQ:- 生产者:文件服务,执行上传、创建、重命名等文件操作,将用户文件信息(例如文件名、文件ID等)发送到RabbitMQ新增队列。- 消费者:查询服务,监听RabbitMQ新增队列,一旦收到消息,将用户文件信息新增或更新到Elasticsearch中。
- 文件删除时:通过RabbitMQ:- 生产者:文件服务,执行文件删除操作,将用户文件ID发送到RabbitMQ删除队列。- 消费者:查询服务,监听 RabbitMQ 队列,一旦收到消息,通过用户文件ID从Elasticsearch中删除相应的用户文件信息。
- 根据文件名进行文件模糊查询:通过OpenFeign:- 生产者:文件服务,查询服务调用文件服务提供的OpenFeign接口,通过用户文件ID从查询该用户文件是否存在。- 消费者:查询服务,如果不存在,将数据根据用户文件ID从Elasticsearch中删除。
- 分享文件时间到期处理:通过RabbitMQ的TTL(生存时间) + 死信队列:- 生产者:文件服务, 使用TTL模拟一个“延时队列”,在文件分享时间到期后,将消息传递到死信队列。- 消费者:文件服务,死信监听器监听到之后,将分享文件的分享状态改为已过期状态。
🔈3 声明
只是提供思路,代码不是很完整,直接复制运行不了。
最后面有完整网盘项目代码。
🚀4 OpenFeign相关部分(查询服务)
4.1 引入依赖
<!-- nacos --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!-- openfeign --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><!-- loadbalancer --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-loadbalancer</artifactId></dependency>
4.2 application.yml
spring:# nacos注册的服务名application:name: netdisk-search
cloud:nacos:discovery:# 配置注册服务的IP地址server-addr: (IP地址):8848username: nacos
password: nacos
4.3 FileFeignService 接口
@FeignClient(name ="netdisk-file", configuration =FeignInterceptor.class)publicinterfaceFileFeignService{@RequestMapping("/file/getUserFile/{userFileId}")ResultResponse<Boolean>getUserFile(@PathVariableLong userFileId);}
4.4 @EnableFeignClients 注解
@ComponentScan(value ="com.cauli.search.*")@EnableFeignClients(basePackages ="com.cauli.search")@SpringBootApplicationpublicclassNetdiskSearchApplication{publicstaticvoidmain(String[] args){SpringApplication.run(NetdiskSearchApplication.class, args);}}
🚀5 Elasticsearch相关部分(查询服务)
5.1 引入依赖
<!-- elasticsearch --><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.0.1</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.12.3</version></dependency><dependency><groupId>jakarta.json</groupId><artifactId>jakarta.json-api</artifactId><version>2.0.1</version></dependency>
5.2 application.yml
# elasticsearch相关的配置elasticsearch:# ES网关地址hostname: (IP地址)
# ES网关端口port:9200# ES网官方方案scheme: http
5.3 ElasticSearchConfig 配置类
@ConfigurationpublicclassElasticSearchConfig{@Value("${elasticsearch.hostname}")String hostname;@Value("${elasticsearch.port}")int port;@Value("${elasticsearch.scheme}")String scheme;@BeanpublicElasticsearchClientelasticsearchClient(){// 创建低级客户端RestClient client =RestClient.builder(newHttpHost(hostname, port,scheme)).build();// 创建API客户端,使用Jackson映射器创建传输层ElasticsearchTransport transport =newRestClientTransport(client,newJacksonJsonpMapper());returnnewElasticsearchClient(transport);}}
5.4 Elasticsearch 服务类和服务实现类
publicinterfaceElasticsearchService{/**
* 更新ES数据
*
* @param fileSearchDTO
*/voiduploadES(FileSearchDTO fileSearchDTO);/**
* 删除ES数据
*
* @param userFileId
*/voiddeleteES(Long userFileId);/**
* 搜索ES数据
*
* @return
*/List<SearchFileVO>searchES(SearchFileQueryDTO searchFileVO);}
@Slf4j@ServicepublicclassElasticsearchServiceImplimplementsElasticsearchService{@AutowiredprivateElasticsearchClient elasticsearchClient;@ResourceprivateFileFeignService feignService;privatefinalThreadPoolExecutor executor =newThreadPoolExecutor(12,// 核心线程数20,// 最大线程数1,// 线程存活时间TimeUnit.SECONDS,// 存活时间单位newArrayBlockingQueue<>(1000)// 任务队列);publicvoiduploadES(FileSearchDTO fileSearchDTO){
executor.execute(()->{try{
elasticsearchClient.index(i -> i.index("file_search").id(fileSearchDTO.getUserFileId()).document(fileSearchDTO));}catch(IOException e){thrownewRuntimeException(e);}});}publicvoiddeleteES(Long userFileId){
executor.execute(()->{try{
elasticsearchClient.delete(d -> d
.index("file_search").id(String.valueOf(userFileId)));}catch(Exception e){
log.debug("ES删除操作失败,请检查配置");}});}@OverridepublicList<SearchFileVO>searchES(SearchFileQueryDTO searchFileQueryDTO){int pageNum =(int) searchFileQueryDTO.getPageNum()-1;int pageSize =(int)(searchFileQueryDTO.getPageSize()==0?10: searchFileQueryDTO.getPageSize());SearchResponse<FileSearchDTO> search =null;try{
search = elasticsearchClient.search(s -> s
.index("file_search").query(_1 -> _1
.bool(_2 -> _2
.must(_3 -> _3
.bool(_4 -> _4
.should(_5 -> _5
.match(_6 -> _6
.field("fileName").query(searchFileQueryDTO.getFileName()))).should(_5 -> _5
.wildcard(_6 -> _6
.field("fileName").wildcard("*"+ searchFileQueryDTO.getFileName()+"*"))))).must(_3 -> _3
.term(_4 -> _4
.field("userId").value(StpUtil.getLoginIdAsLong()))))).from(pageNum).size(pageSize).highlight(h -> h
.fields("fileName", f -> f.type("plain").preTags("<span class='keyword'>").postTags("</span>")).encoder(HighlighterEncoder.Html)),FileSearchDTO.class);}catch(IOException e){
e.printStackTrace();}List<SearchFileVO> searchFileVOList =newArrayList<>();if(search !=null){for(Hit<FileSearchDTO> hit : search.hits().hits()){SearchFileVO searchFileVO =newSearchFileVO();BeanUtil.copyProperties(hit.source(), searchFileVO);
searchFileVO.setHighLight(hit.highlight());
searchFileVOList.add(searchFileVO);// 如果文件不存在,也从ES中删除if(!feignService.getUserFile(searchFileVO.getUserFileId()).getData()){
executor.execute(()->this.deleteES(searchFileVO.getUserFileId()));}}}return searchFileVOList;}}
5.5 ElasticsearchController 前端控制器
@RestController@RequestMapping("/search")publicclassElasticsearchController{@AutowiredprivateElasticsearchService elasticService;@GetMapping(value ="/searchFile")publicRestResult<SearchFileVO>searchFile(SearchFileQueryDTO searchFileQueryDTO){List<SearchFileVO> searchFileVOList = elasticService.searchES(searchFileQueryDTO);returnRestResult.success().dataList(searchFileVOList, searchFileVOList.size());}}
5.6 相关实体类
/**
* 文件搜索VO
*/@DatapublicclassSearchFileVO{@JsonSerialize(using =ToStringSerializer.class)privateLong userFileId;privateString fileName;privateString filePath;privateString extendName;privateLong fileSize;privateString fileUrl;privateMap<String,List<String>> highLight;privateInteger isDir;}
/**
* 文件搜索DTO
*/@Data@JsonIgnoreProperties(ignoreUnknown =true)publicclassFileSearchDTO{privateString indexName;privateString userFileId;privateString fileId;privateString fileName;privateString content;privateString fileUrl;privateLong fileSize;privateInteger storageType;privateString identifier;privateLong userId;privateString filePath;privateString extendName;privateInteger isDir;privateString deleteTime;privateString deleteBatchNum;}
/**
* 文件查询条件DTO
*/@DatapublicclassSearchFileQueryDTO{@ApiModelProperty("文件名")privateString fileName;@ApiModelProperty("当前页")privatelong pageNum;@ApiModelProperty("每页数量")privatelong pageSize;}
🚀6 RabbitMQ相关部分
6.1 生产者部分(文件服务)
6.1.1 引入依赖
<!-- nacos --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!-- RabbitMQ(我的SpringBoot是2.6.8的) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
6.1.2 完整application.yml
server:port:8083spring:# MySQL配置datasource:driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/(数据库名)?characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=trueusername: root
password: (MySQL密码)
# nacos注册的服务名application:name: netdisk-file
cloud:nacos:discovery:# 配置注册服务的IP地址server-addr: (IP地址):8848username: nacos
password: nacos
# rabbitmq相关的配置rabbitmq:host: (IP地址)
port:5672virtual-host: (虚拟主机名,比如:/file)
username: (用户名,默认:guest)
password: (密码,默认:guest)
6.1.3 RabbitMQConfig 配置类
@ConfigurationpublicclassRabbitMQConfig{// 普通交换机publicstaticfinalStringFILE_EXCHANGE="file.exchange";// 文件保存相关publicstaticfinalStringQUEUE_FILE_SAVE="queue.file.save";publicstaticfinalStringKEY_FILE_SAVE="key.file.save";// 文件删除相关publicstaticfinalStringQUEUE_FILE_REMOVE="queue.file.remove";publicstaticfinalStringKEY_FILE_REMOVE="key.file.remove";// 死信相关publicstaticfinalStringDEAD_LETTER_EXCHANGE="deadLetter.exchange";publicstaticfinalStringDEAD_LETTER_QUEUE="deadLetter.queue";publicstaticfinalStringKEY_FILE_DEAD_LETTER="key.file.dead.letter";//延迟队列publicstaticfinalStringDELAY_QUEUE="delay.queue";/**
* 文件保存队列
*
* @return
*/@BeanpublicQueuequeueFileSave(){returnnewQueue(QUEUE_FILE_SAVE);}/**
* 文件删除队列
*
* @return
*/@BeanpublicQueuequeueFileRemove(){returnnewQueue(QUEUE_FILE_REMOVE);}/**
* 交换机
*
* @return
*/@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange(FILE_EXCHANGE);}/**
* 绑定文件保存队列到交换机
*
* @return
*/@BeanpublicBindingbindFileSave(){returnBindingBuilder.bind(queueFileSave()).to(topicExchange()).with(KEY_FILE_SAVE);}/**
* 绑定文件删除队列到交换机
*
* @return
*/@BeanpublicBindingbindFileRemove(){returnBindingBuilder.bind(queueFileRemove()).to(topicExchange()).with(KEY_FILE_REMOVE);}/**
* 定义延时队列
*
* @return
*/@BeanpublicQueuedelayQueue(){//设置死信交换机和路由keyreturnQueueBuilder.durable(DELAY_QUEUE)//如果消息过时,则会被投递到当前对应的死信交换机.withArgument("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE)//如果消息过时,死信交换机会根据routing-key投递消息到对应的队列.withArgument("x-dead-letter-routing-key",KEY_FILE_DEAD_LETTER).build();}/**
* 定义死信交换机
*
* @return
*/@BeanpublicTopicExchangedeadLetterExchange(){returnnewTopicExchange(DEAD_LETTER_EXCHANGE);}/**
* 定义死信队列
*
* @return
*/@BeanpublicQueuedeadLetterQueue(){returnnewQueue(DEAD_LETTER_QUEUE);}/**
* 绑定死信队列到死信交换机
*
* @return
*/@BeanpublicBindingdeadLetterBinding(){returnBindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(KEY_FILE_DEAD_LETTER);}}
6.1.4 FileDealComp 文件逻辑处理组件伪代码
/**
* 文件逻辑处理组件
*/@Slf4j@ComponentpublicclassFileDealComp{@AutowiredprivateRabbitTemplate rabbitTemplate;privatefinalThreadPoolExecutor executor =newThreadPoolExecutor(12,// 核心线程数20,// 最大线程数1,// 线程存活时间TimeUnit.SECONDS,// 存活时间单位newArrayBlockingQueue<>(1000)// 任务队列);/**
* 更新ES数据
*
* @param userFileId
*/publicvoiduploadES(Long userFileId){
executor.execute(()->{FileSearchDTO fileSearchDTO =newFileSearchDTO();// 通过用户文件ID查询用户文件信息...// 通过文件ID查询文件信息...// 将用户文件信息和文件信息同步到fileSearchDTO对象...// 消息队列更新ES
rabbitTemplate.convertAndSend(RabbitMQConfig.FILE_EXCHANGE,RabbitMQConfig.KEY_FILE_SAVE, fileSearchDTO);});}/**
* 删除ES数据
*
* @param userFileId
*/publicvoiddeleteES(Long userFileId){// 消息队列删除ES
rabbitTemplate.convertAndSend(RabbitMQConfig.FILE_EXCHANGE,RabbitMQConfig.KEY_FILE_REMOVE, userFileId);}/**
* 分享文件过期处理
*
* @param shareBatchNum 分享批次号
*/publicvoidexpiredShareFile(String shareBatchNum){Share share =newShare();// 根据分享批次号获取分享信息...// 将分享信息同步到share对象...// 定义日期格式SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");long differenceInMillis =0;try{// 解析日期字符串为日期对象Date shareDate = sdf.parse(share.getShareTime());Date endDate = sdf.parse(share.getEndTime());// 计算时间差(毫秒数)
differenceInMillis = endDate.getTime()- shareDate.getTime();}catch(ParseException e){
e.printStackTrace();}// 存活时间String expiration =Long.toString(differenceInMillis);// 延时队列
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_QUEUE, share, message ->{
message.getMessageProperties().setExpiration(expiration);return message;});}}
6.1.5 ExpiredShareFileListener 过期的分享文件处理监听器
@Slf4j@Component@RabbitListener(queues ="my-dlx-queue")publicclassExpiredShareFileListener{@AutowiredprivateShareService shareService;// 死信相关publicstaticfinalStringDEAD_LETTER_EXCHANGE="deadLetter.exchange";publicstaticfinalStringDEAD_LETTER_QUEUE="deadLetter.queue";publicstaticfinalStringKEY_FILE_DEAD_LETTER="key.file.dead.letter";@RabbitListener(bindings ={@QueueBinding(
key =KEY_FILE_DEAD_LETTER,
value =@Queue(value =DEAD_LETTER_QUEUE, durable ="true"),
exchange =@Exchange(value =DEAD_LETTER_EXCHANGE, type =ExchangeTypes.TOPIC, ignoreDeclarationExceptions ="true"))})publicvoidreceiveShareMessage(Share share){
log.info("监听到文件过期处理操作:{}", share);// 将share的分享状态改为已过期 → 将share的shareStatus由0改为1...
log.info("操作完成:{}", share);}}
6.2 消费者部分(查询服务)
6.2.1 引入依赖
<!-- RabbitMQ (我的SpringBoot是2.6.8的) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
6.2.2 完整application.yml
server:port:8084spring:# MySQL配置datasource:driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/(数据库名)?characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=trueusername: root
password: (MySQL密码)
# nacos注册的服务名application:name: netdisk-search
cloud:nacos:discovery:# 配置注册服务的IP地址server-addr: (IP地址):8848username: nacos
password: nacos
mvc:path match:matching-strategy: ant_path_matcher
servlet:multipart:enabled:true# 单个文件最大限制max-file-size: 1024MB
# 多个文件最大限制max-request-size: 2048MB
# rabbitmq相关的配置rabbitmq:host: (IP地址)
port:5672virtual-host: (虚拟主机名,比如:/file)
username: (用户名,默认:guest)
password: (密码,默认:guest)
# elasticsearch相关的配置elasticsearch:# ES网关地址hostname: (IP地址)
# ES网关端口port:9200# ES网官方方案scheme: http
6.2.3 FileMQListener 文件处理消息队列监听
@Slf4j@ComponentpublicclassFileMQListener{// 普通交换机publicstaticfinalStringFILE_EXCHANGE="file.exchange";// 文件保存相关publicstaticfinalStringQUEUE_FILE_SAVE="queue.file.save";publicstaticfinalStringKEY_FILE_SAVE="key.file.save";// 文件删除相关publicstaticfinalStringQUEUE_FILE_REMOVE="queue.file.remove";publicstaticfinalStringKEY_FILE_REMOVE="key.file.remove";@AutowiredprivateElasticsearchService elasticsearchService;/**
* 监听文件信息添加操作
*
* @param fileSearchDTO
*/@RabbitListener(bindings ={@QueueBinding(
key =KEY_FILE_SAVE,
value =@Queue(value =QUEUE_FILE_SAVE, durable ="true"),
exchange =@Exchange(value =FILE_EXCHANGE, type =ExchangeTypes.TOPIC, ignoreDeclarationExceptions ="true"))})publicvoidreceiveFileSaveMessage(FileSearchDTO fileSearchDTO){try{
log.info("监听到文件信息添加操作:{}", fileSearchDTO);// 更新ES数据
elasticsearchService.uploadES(fileSearchDTO);
log.info("添加完成:{}", fileSearchDTO);}catch(Exception ex){
ex.printStackTrace();}}/**
* 监听文件信息删除操作
*
* @param userFileId
*/@RabbitListener(bindings ={@QueueBinding(
key =KEY_FILE_REMOVE,
value =@Queue(value =QUEUE_FILE_REMOVE, durable ="true"),
exchange =@Exchange(value =FILE_EXCHANGE, type =ExchangeTypes.TOPIC, ignoreDeclarationExceptions ="true"))})publicvoidreceiveFileDeleteMessage(Long userFileId){try{
log.info("监听到文件信息删除操作:{}", userFileId);// 删除ES数据
elasticsearchService.deleteES(userFileId);
log.info("文件信息删除完成:{}", userFileId);}catch(Exception ex){
ex.printStackTrace();}}}
📫7 代码仓库
netdisk-cloud | Gitee
版权归原作者 Cau1i 所有, 如有侵权,请联系我们删除。