0


使用kafka完成数据的实时同步,同步到es中。(使用kafka实现自动上下架 upper、lower)

文章目录

  • 上架:新增专辑到 es
  • 下架:删除专辑
  1. 新增:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步新增数据
  2. 更新:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步更新数据如果是私有的专辑则发送消息给kafka,search通过监听器获取消息es删除数据
  3. 删除:发送消息给kafka,search通过监听器获取消息es删除数据

1、发送消息 KafkaService

packagecom.atguigu.tingshu.common.service;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.support.SendResult;importorg.springframework.stereotype.Service;importjava.util.concurrent.CompletableFuture;@ServicepublicclassKafkaService{privatestaticfinalLogger logger =LoggerFactory.getLogger(KafkaService.class);@AutowiredprivateKafkaTemplate kafkaTemplate;/**
     * 向指定主题发送消息
     * 此方法通过调用重载的sendMsg方法,向指定主题发送消息,使用默认的消息标签和消息键
     *
     * @param topic 发送消息的主题
     * @param msg   需要发送的消息内容
     */publicvoidsendMsg(String topic,String msg){// 调用重载的sendMsg方法,传入默认值以简化调用this.sendMsg(topic,null,null, msg);}/**
     * 发送消息到指定的Kafka主题
     *
     * @param topic 消息主题
     * @param partition 分区编号
     * @param key 消息键值
     * @param msg 消息内容
     */publicvoidsendMsg(String topic,Integer partition,String key,String msg){// 发生消息并返回异步结果CompletableFuture<SendResult> future =this.kafkaTemplate.send(topic, partition, key, msg);// 异步处理发送结果
        future.whenCompleteAsync((result, ex)->{if(ex !=null){// 如果发送过程中出现异常
                logger.error("生产者发送消息失败!原因:{}", ex.getMessage());}});}}
  • whenCompleteAsync:异步完成时的处理、当异步操作完成时

在这里插入图片描述

2、生产者 service-album -> AlbumInfoServiceImpl

在这里插入图片描述

2.1、新增 saveAlbumInfo()

  • 新增:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步新增数据在这里插入图片描述在这里插入图片描述

在这里插入图片描述

@Slf4j@Service@SuppressWarnings({"unchecked","rawtypes"})publicclassAlbumInfoServiceImplextendsServiceImpl<AlbumInfoMapper,AlbumInfo>implementsAlbumInfoService{@AutowiredprivateAlbumAttributeValueMapper attributeValueMapper;@AutowiredprivateAlbumStatService albumStatService;@AutowiredprivateKafkaService kafkaService;@Transactional(rollbackFor =Exception.class)@OverridepublicvoidsaveAlbumInfo(AlbumInfoVo albumInfoVo)throwsFileNotFoundException{// 1.保存专辑信息表AlbumInfo albumInfo =newAlbumInfo();BeanUtils.copyProperties(albumInfoVo, albumInfo);// 设置当前用户的idLong userId =AuthContextHolder.getUserId();
        albumInfo.setUserId(userId ==null?1: userId);
        albumInfo.setTracksForFree(5);
        albumInfo.setSecondsForFree(30);
        albumInfo.setStatus(SystemConstant.ALBUM_STATUS_PASS);this.save(albumInfo);// 主键回写获取专辑idLong albumInfoId = albumInfo.getId();// 2.保存专辑标签值表List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();if(!CollectionUtils.isEmpty(albumAttributeValueVoList)){
            albumAttributeValueVoList.forEach(albumAttributeValueVo ->{AlbumAttributeValue albumAttributeValue =newAlbumAttributeValue();BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);
                albumAttributeValue.setAlbumId(albumInfoId);this.attributeValueMapper.insert(albumAttributeValue);});}//        new FileInputStream("xxx");//        try {//            TimeUnit.SECONDS.sleep(3);//        } catch (InterruptedException e) {//            throw new RuntimeException(e);//        }// 3.保存统计信息:专辑状态表// this.saveAlbumStat(albumInfoId);this.albumStatService.saveAlbumStat(albumInfoId);if(StringUtils.equals(albumInfo.getIsOpen(),"1")){this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumInfoId.toString());}//        int i = 1/0;}}

在这里插入图片描述

2.2、更新 updateAlbumInfo()

  • 更新:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步更新数据如果是私有的专辑则发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j@Service@SuppressWarnings({"unchecked","rawtypes"})publicclassAlbumInfoServiceImplextendsServiceImpl<AlbumInfoMapper,AlbumInfo>implementsAlbumInfoService{@AutowiredprivateAlbumAttributeValueMapper attributeValueMapper;@AutowiredprivateKafkaService kafkaService;@Transactional@OverridepublicvoidupdateAlbumInfo(Long albumId,AlbumInfoVo albumInfoVo){AlbumInfo albumInfo =newAlbumInfo();BeanUtils.copyProperties(albumInfoVo, albumInfo);
        albumInfo.setId(albumId);this.updateById(albumInfo);// 更新专辑标签值表:先删除该专辑所有的标签及值 再去新增this.attributeValueMapper.delete(newLambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();if(!CollectionUtils.isEmpty(albumAttributeValueVoList)){
            albumAttributeValueVoList.forEach(albumAttributeValueVo ->{AlbumAttributeValue albumAttributeValue =newAlbumAttributeValue();BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);
                albumAttributeValue.setAlbumId(albumId);this.attributeValueMapper.insert(albumAttributeValue);});}if(StringUtils.equals(albumInfoVo.getIsOpen(),"1")){this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumId.toString());}else{this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());}}}

在这里插入图片描述

2.3、删除 removeAlbumInfo()

  • 删除:发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j@Service@SuppressWarnings({"unchecked","rawtypes"})publicclassAlbumInfoServiceImplextendsServiceImpl<AlbumInfoMapper,AlbumInfo>implementsAlbumInfoService{@AutowiredprivateAlbumAttributeValueMapper attributeValueMapper;@AutowiredprivateAlbumStatMapper albumStatMapper;@AutowiredprivateKafkaService kafkaService;@Transactional@OverridepublicvoidremoveAlbumInfo(Long albumId){this.removeById(albumId);this.albumStatMapper.delete(newLambdaUpdateWrapper<AlbumStat>().eq(AlbumStat::getAlbumId, albumId));this.attributeValueMapper.delete(newLambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());}}

在这里插入图片描述

3、消费者 service-search - > AlbumListener.java

在这里插入图片描述

packagecom.atguigu.tingshu.search.listener;@ComponentpublicclassAlbumListener{@AutowiredprivateAlbumInfoFeignClient albumInfoFeignClient;@AutowiredprivateUserInfoFeignClient userInfoFeignClient;@AutowiredprivateCategoryFeignClient categoryFeignClient;@AutowiredprivateElasticsearchTemplate elasticsearchTemplate;@KafkaListener(topics =KafkaConstant.QUEUE_ALBUM_UPPER)publicvoidupper(String albumId){if(StringUtils.isBlank(albumId)){return;}// 根据专辑id查询专辑Result<AlbumInfo> albumInfoResult =this.albumInfoFeignClient.getAlbumInfo(Long.valueOf(albumId));Assert.notNull(albumInfoResult,"同步数据时,获取专辑信息失败!");AlbumInfo albumInfo = albumInfoResult.getData();Assert.notNull(albumInfo,"同步数据时,没有对应的专辑!");AlbumInfoIndex albumInfoIndex =newAlbumInfoIndex();// 把专辑信息中的数据复制到index对象BeanUtils.copyProperties(albumInfo, albumInfoIndex);// 查询主播获取主播信息Result<UserInfoVo> userInfoVoResult =this.userInfoFeignClient.getUserById(albumInfo.getUserId());Assert.notNull(userInfoVoResult,"数据导入时,获取主播信息失败!");UserInfoVo userInfoVo = userInfoVoResult.getData();if(userInfoVo !=null){
            albumInfoIndex.setAnnouncerId(userInfoVo.getId());
            albumInfoIndex.setAnnouncerName(userInfoVo.getNickname());}// 根据三级分类id查询一二三级分类Result<BaseCategoryView> categoryResult =this.categoryFeignClient.getAllLevelCategories(albumInfo.getCategory3Id());Assert.notNull(categoryResult,"数据导入时,获取分类信息失败!");BaseCategoryView baseCategoryView = categoryResult.getData();if(baseCategoryView !=null){
            albumInfoIndex.setCategory1Id(baseCategoryView.getCategory1Id());
            albumInfoIndex.setCategory2Id(baseCategoryView.getCategory2Id());}// 查询专辑统计信息//                Result<AlbumStatVo> albumStatesResult = this.albumInfoFeignClient.getAlbumStates(albumInfo.getId());//                Assert.notNull(albumStatesResult, "数据导入时,获取专辑统计信息失败!");//                AlbumStatVo albumStatVo = albumStatesResult.getData();//                if (albumStatVo != null) {//                    BeanUtils.copyProperties(albumStatVo, albumInfoIndex);//                }// 假数据:int playNum =(newRandom().nextInt(100)+1)*10000;
        albumInfoIndex.setPlayStatNum(playNum);int subscribeNum =(newRandom().nextInt(100)+1)*10000;
        albumInfoIndex.setSubscribeStatNum(subscribeNum);int buyNum =(newRandom().nextInt(100)+1)*10000;
        albumInfoIndex.setBuyStatNum(buyNum);int commentNum =(newRandom().nextInt(100)+1)*10000;
        albumInfoIndex.setCommentStatNum(commentNum);// 热度
        albumInfoIndex.setHotScore(playNum *0.1+ commentNum *0.2+ subscribeNum *0.3+ buyNum *0.4);// 标签Result<List<AlbumAttributeValue>> albumAttributeValueResult =this.albumInfoFeignClient.getAlbumAttributeValue(albumInfo.getId());Assert.notNull(albumAttributeValueResult,"数据导入时,获取标签及值失败!");List<AlbumAttributeValue> albumAttributeValues = albumAttributeValueResult.getData();if(!CollectionUtils.isEmpty(albumAttributeValues)){// 把List<AlbumAttributeValue> 转化成  List<AttributeValueIndex>
            albumInfoIndex.setAttributeValueIndexList(albumAttributeValues.stream().map(albumAttributeValue ->{AttributeValueIndex attributeValueIndex =newAttributeValueIndex();BeanUtils.copyProperties(albumAttributeValue, attributeValueIndex);return attributeValueIndex;}).collect(Collectors.toList()));}this.elasticsearchTemplate.save(albumInfoIndex);}@KafkaListener(topics =KafkaConstant.QUEUE_ALBUM_LOWER)publicvoidlower(String albumId){if(StringUtils.isBlank(albumId)){return;}this.elasticsearchTemplate.delete(albumId,AlbumInfoIndex.class);}}
标签: kafka 分布式 es

本文转载自: https://blog.csdn.net/m0_65152767/article/details/141610060
版权归原作者 小丁学Java 所有, 如有侵权,请联系我们删除。

“使用kafka完成数据的实时同步,同步到es中。(使用kafka实现自动上下架 upper、lower)”的评论:

还没有评论