0


2024/9/21黑马头条跟学笔记(十一) 最有含金量的一集,大数据权重计算推送

1)今日内容

image-20240921102058109

用户点赞,实时计算,实时替换redis数据给用户展示,

一个视频同一时间内100万人点赞,上推荐页面,而不是等到2点后推送

2)流式计算

2.1)概述

image-20240921102242463

像字符流和字节流一样,开通一条管道,输送数据,上传文件的进度条,流式计算

应用场景

image-20240921102512221

dashboard

公交车数据,滴滴司机多久到

实时文章(上首页,上热门)

3)kafkaStream

3.1)概述

image-20240921103018935

不用部署在linux

自由搭配java

事件时间窗口操作(一段时间内的所有操作聚合处理一次性处理)

image-20240921103130885

-E-的结构进行加工, 大哥派发任务给3个小弟,3个小弟做完后给二哥

image-20240921103333958

流里的每一个数据集记录进行累加,而不是更新

3.2)入门案例

image-20240921103511473

分析单词出现个数

依赖

直接拷贝代码,看个大概以后有用到再深入

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency>

快速入门

packagecom.heima.kafka.sample;importorg.apache.kafka.common.serialization.Serdes;importorg.apache.kafka.streams.KafkaStreams;importorg.apache.kafka.streams.KeyValue;importorg.apache.kafka.streams.StreamsBuilder;importorg.apache.kafka.streams.StreamsConfig;importorg.apache.kafka.streams.kstream.KStream;importorg.apache.kafka.streams.kstream.TimeWindows;importorg.apache.kafka.streams.kstream.ValueMapper;importjava.time.Duration;importjava.util.Arrays;importjava.util.Properties;/**
 * 流式处理
 */publicclassKafkaStreamQuickStart{publicstaticvoidmain(String[] args){//kafka的配置信心Properties prop =newProperties();
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream 构建器StreamsBuilder streamsBuilder =newStreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建kafkaStream对象KafkaStreams kafkaStreams =newKafkaStreams(streamsBuilder.build(),prop);//开启流式计算
        kafkaStreams.start();}/**
     * 流式计算
     * 消息的内容:hello kafka  hello itcast
     * @param streamsBuilder
     */privatestaticvoidstreamProcessor(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String,String> stream = streamsBuilder.stream("itcast-topic-input");/**
         * 处理消息的value
         */
        stream.flatMapValues(newValueMapper<String,Iterable<String>>(){@OverridepublicIterable<String>apply(String value){returnArrays.asList(value.split(" "));}})//按照value进行聚合处理.groupBy((key,value)->value)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词的个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);returnnewKeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}}

大概内容为

1.配置对象prop填写配置参数

kv的序列反序列化器,

服务端地址

流的ID

2.管道流构造

3.管道流计算

①读取topic对应的信息 ,每条消息有K和value value为具体的单词

②调用值转换器flatMapValues,匿名内部类迭代器,转所有单词为数组

③分组,重复的单词作为key,value为单词

④时间窗口,10秒内的聚合处理一次

⑤转stream,map遍历打印,并将键和值转换为字符串类型。

⑥加工后的数据发回给 对应topic的生产者

测试

image-20240921112330697

生产者指定topic,循环5次发信息
packagecom.heima.kafka.sample;importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.producer.*;importjava.util.Properties;importjava.util.concurrent.ExecutionException;/**
 * 生产者
 */@Slf4jpublicclassProducerQuickStart{publicstaticvoidmain(String[] args)throwsExecutionException,InterruptedException{// 1.kafka链接配置信息Properties prop =newProperties();// kafka链接地址
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.233.136:9092");// key和value的序列化
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 设置acks配置
        prop.put(ProducerConfig.ACKS_CONFIG,"all");//设置超时重传次数
        prop.put(ProducerConfig.RETRIES_CONFIG,"10");// 2.创建kafka生产者对象KafkaProducer<String,String> producer =newKafkaProducer<String,String>(prop);// 3.发送record消息/**
         * 第一个参数 :topic
         * 第二个参数:消息的key
         * 第三个参数:消息的value
         */// ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("topic-first","key-001","hello kafka");// ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("topic-first",0,"key-001","hello kafka");// ProducerRecord<String, String> kvProducerRecord = new ProducerRecord<String, String>("topic-first", "hello kafka");for(int i =0; i <5; i++){ProducerRecord<String,String> kvProducerRecord =newProducerRecord<String,String>("itcast-topic-input","hello kafka");
            producer.send(kvProducerRecord);}// // 同步消息发送// RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();// System.out.println(recordMetadata.offset());// System.out.println("发送了消息");// 异步消息发送// producer.send(kvProducerRecord, new Callback()  {//     @Override//     public void onCompletion(RecordMetadata recordMetadata, Exception e) {//         if (e != null) {//             System.out.println("记录异常消息到日志表中");//         }//         System.out.println("发送了消息,偏移量为:"+recordMetadata.offset());//     }// });// 4.关闭消息通道  必须要关闭,否则消息发送不成功
        producer.close();}}
消费者订阅

image-20240921113210772

注释掉原来的订阅内容

依次启动

image-20240921113350072

image-20240921115120238

3.3)springboot集成kStream

springboot依赖没有很好的配置好kstream,自己搞一个bean配置,然后写yml配置文件,最后设置kstream监听,测试就还是用刚才的producer测试

配置类

packagecom.heima.kafka.config;importlombok.Getter;importlombok.Setter;importorg.apache.kafka.common.serialization.Serdes;importorg.apache.kafka.streams.StreamsConfig;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.annotation.EnableKafkaStreams;importorg.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;importorg.springframework.kafka.config.KafkaStreamsConfiguration;importjava.util.HashMap;importjava.util.Map;/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */@Setter@Getter@Configuration@EnableKafkaStreams@ConfigurationProperties(prefix="kafka")publicclassKafkaStreamConfig{privatestaticfinalintMAX_MESSAGE_SIZE=16*1024*1024;privateString hosts;privateString group;@Bean(name =KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)publicKafkaStreamsConfigurationdefaultKafkaStreamsConfig(){Map<String,Object> props =newHashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG,this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG,10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());returnnewKafkaStreamsConfiguration(props);}}

配置文件

kafka:hosts: 192.168.233.136:9092group: ${spring.application.name}

监听器

packagecom.heima.kafka.stream;importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.streams.KeyValue;importorg.apache.kafka.streams.StreamsBuilder;importorg.apache.kafka.streams.kstream.KStream;importorg.apache.kafka.streams.kstream.TimeWindows;importorg.apache.kafka.streams.kstream.ValueMapper;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.time.Duration;importjava.util.Arrays;@Configuration@Slf4jpublicclassKafkaStreamHelloListener{@BeanpublicKStream<String,String>kStream(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String,String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues(newValueMapper<String,Iterable<String>>(){@OverridepublicIterable<String>apply(String value){returnArrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);returnnewKeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");return stream;}}

测试

启动引导类,然后producer生产看下控制台有无接收到

image-20240921162842653

4)实时计算热点(和es一起最蒙逼的一集)

4.1)思路

image-20240921140509290

任何行为都会触发 stream流式处理, 在一个时间窗口,范围内聚合发消息到数据库更新行为值

重新根据新行为值(点赞service里)计算当前文章分值

查redis,与频道和推荐最小的比较,大于就替换,保证永远是最热度的文章

4.2)步骤

4.2.1)行为服务集成kafka

nacos
kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

image-20240921142004609

4.2.2)消息实体

传递什么值能更新到数据库?

文章id,行为类型(enum),行为是加分还是减分的

packagecom.heima.model.mess;importlombok.Data;@DatapublicclassUpdateArticleMess{/**
     * 修改文章的字段类型
      */privateUpdateArticleType type;/**
     * 文章ID
     */privateLong articleId;/**
     * 修改数据的增量,可为正负
     */privateInteger add;publicenumUpdateArticleType{COLLECTION,COMMENT,LIKES,VIEWS;}}

4.2.3)topic常量

packagecom.heima.common.constants;publicclassHotArticleConstants{publicstaticfinalStringHOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";}

4.2.4)业务层

点赞的消息发送
packagecom.heima.behavior.service.impl;importcom.alibaba.fastjson.JSON;importcom.heima.behavior.service.LikeService;importcom.heima.common.constants.BeHaviorConstants;importcom.heima.common.constants.HotArticleConstants;importcom.heima.common.constants.UserConstants;importcom.heima.common.redis.CacheService;importcom.heima.model.behavior.LikesBehaviorDto;importcom.heima.model.common.dtos.ResponseResult;importcom.heima.model.common.enums.AppHttpCodeEnum;importcom.heima.model.mess.UpdateArticleMess;importcom.heima.model.user.pojos.ApUser;importcom.heima.utils.thread.AppThreadLocalUtil;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Service;importstaticorg.apache.hadoop.hbase.Version.user;@ServicepublicclassLikeServiceImplimplementsLikeService{@AutowiredprivateCacheService cacheService;@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;/**
     * 用户点赞
     *
     * @param dto
     */@OverridepublicResponseResultuserLikesArticle(LikesBehaviorDto dto){// 1.登录状态判断String userId =AppThreadLocalUtil.getUser().getId().toString();if(userId.isEmpty()){returnResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);}// 2.参数校验if(dto ==null){returnResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}// 3.拼接key,后续查找String key ="";// LIKE-ARTICLE/COMMENT-123451if(dto.getType()==0){
            key =BeHaviorConstants.LIKE_ARTICLE+ dto.getArticleId();}if(dto.getType()==1){
            key =BeHaviorConstants.LIKE_DYNAMIC+ dto.getArticleId();}if(dto.getType()==2){
            key =BeHaviorConstants.LIKE_COMMENT+ dto.getArticleId();}UpdateArticleMess articleMess =newUpdateArticleMess();
        articleMess.setArticleId(dto.getArticleId());
        articleMess.setType(UpdateArticleMess.UpdateArticleType.LIKES);// 3.判断是点赞还是取消if(dto.getOperation()==0){if(cacheService.hGet(key,userId)!=null){returnResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"已点赞,请勿重复操作");}
            cacheService.hPut(key,userId,JSON.toJSONString(dto));
            articleMess.setAdd(1);}else{
            cacheService.hDelete(key,userId);
            articleMess.setAdd(-1);}
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(articleMess));returnResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}}
行为的消息发送
packagecom.heima.behavior.service.impl;importcom.alibaba.fastjson.JSON;importcom.heima.behavior.service.ReadService;importcom.heima.common.constants.BeHaviorConstants;importcom.heima.common.constants.HotArticleConstants;importcom.heima.common.redis.CacheService;importcom.heima.model.behavior.ReadBehaviorDto;importcom.heima.model.common.dtos.ResponseResult;importcom.heima.model.common.enums.AppHttpCodeEnum;importcom.heima.model.mess.UpdateArticleMess;importcom.heima.model.user.pojos.ApUser;importcom.heima.utils.thread.AppThreadLocalUtil;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Service;@ServicepublicclassReadServiceImplimplementsReadService{@AutowiredprivateCacheService cacheServicel;@AutowiredprivateKafkaTemplate kafkaTemplate;/**
     * 用户访问,记录+1
     *
     * @param dto
     */@OverridepublicResponseResultuserReadArticle(ReadBehaviorDto dto){// 1.登录校验ApUser user =AppThreadLocalUtil.getUser();if(user ==null){returnResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);}// 2.参数校验if(dto ==null|| dto.getArticleId()==null){returnResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}// 2.1拼串String key =BeHaviorConstants.READ_ARTICLE+ dto.getArticleId();// 3.查出来,转dto,根据查出来的count和前端传来的count相加,再put// 这一步不能直接查出来转对象,因为有可能为空,后续再转对象// 在redis里,转对象,给dto加值,然后再Hput到redis// 不在,那就直接拿前端那个dto  塞进去// 注意这里Object不能.toString 因为有可能为空,应cast转为stringString redisString =(String) cacheServicel.hGet(key, user.getId().toString());if(redisString !=null){ReadBehaviorDto readBehaviorDto =JSON.parseObject(redisString,ReadBehaviorDto.class);
            dto.setCount(readBehaviorDto.getCount()+ dto.getCount());}
        cacheServicel.hPut(BeHaviorConstants.READ_ARTICLE+ dto.getArticleId().toString(), user.getId().toString(),JSON.toJSONString(dto));UpdateArticleMess articleMess =newUpdateArticleMess();
        articleMess.setArticleId(dto.getArticleId());
        articleMess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
        articleMess.setAdd(1);
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(articleMess));returnResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}}

结尾处组装mess发送消息即可

4.2.5)聚合stream

先集成到服务里

参考demo

nacos
kafka:hosts: 192.168.233.136:9092group: ${spring.application.name}
config
packagecom.heima.article.config;importlombok.Getter;importlombok.Setter;importorg.apache.kafka.common.serialization.Serdes;importorg.apache.kafka.streams.StreamsConfig;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.annotation.EnableKafkaStreams;importorg.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;importorg.springframework.kafka.config.KafkaStreamsConfiguration;importjava.util.HashMap;importjava.util.Map;/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */@Setter@Getter@Configuration@EnableKafkaStreams@ConfigurationProperties(prefix="kafka")publicclassKafkaStreamConfig{privatestaticfinalintMAX_MESSAGE_SIZE=16*1024*1024;privateString hosts;privateString group;@Bean(name =KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)publicKafkaStreamsConfigurationdefaultKafkaStreamsConfig(){Map<String,Object> props =newHashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG,this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG,10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());returnnewKafkaStreamsConfiguration(props);}}
常量增加
publicstaticfinalStringHOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
数据加工实体类
packagecom.heima.model.mess;importlombok.Data;@DatapublicclassArticleVisitStreamMess{/**
     * 文章id
     */privateLong articleId;/**
     * 阅读
     */privateint view;/**
     * 收藏
     */privateint collect;/**
     * 评论
     */privateint comment;/**
     * 点赞
     */privateint like;}
聚合类
packagecom.heima.article.stream;importcom.alibaba.fastjson.JSON;importcom.heima.common.constants.HotArticleConstants;importcom.heima.model.mess.ArticleVisitStreamMess;importcom.heima.model.mess.UpdateArticleMess;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.StringUtils;importorg.apache.kafka.streams.KeyValue;importorg.apache.kafka.streams.StreamsBuilder;importorg.apache.kafka.streams.kstream.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.time.Duration;@Configuration@Slf4jpublicclassHotArticleStreamHandler{@BeanpublicKStream<String,String>kStream(StreamsBuilder streamsBuilder){//接收消息KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);//聚合流式处理
        stream.map((key,value)->{UpdateArticleMess mess =JSON.parseObject(value,UpdateArticleMess.class);//重置消息的key:1234343434   和  value: likes:1returnnewKeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());})//按照文章id进行聚合.groupBy((key,value)->key)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))/**
                 * 自行的完成聚合的计算
                 */.aggregate(newInitializer<String>(){/**
                     * 初始方法,返回值是消息的value
                     * @return
                     */@OverridepublicStringapply(){return"COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";}/**
                     * 真正的聚合操作,返回值是消息的value
                     */},newAggregator<String,String,String>(){@OverridepublicStringapply(String key,String value,String aggValue){if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry = aggValue.split(",");int col =0,com=0,lik=0,vie=0;for(String agg : aggAry){String[] split = agg.split(":");/**
                             * 获得初始值,也是时间窗口内计算之后的值
                             */switch(UpdateArticleMess.UpdateArticleType.valueOf(split[0])){caseCOLLECTION:
                                    col =Integer.parseInt(split[1]);break;caseCOMMENT:
                                    com =Integer.parseInt(split[1]);break;caseLIKES:
                                    lik =Integer.parseInt(split[1]);break;caseVIEWS:
                                    vie =Integer.parseInt(split[1]);break;}}/**
                         * 累加操作
                         */String[] valAry = value.split(":");switch(UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){caseCOLLECTION:
                                col +=Integer.parseInt(valAry[1]);break;caseCOMMENT:
                                com +=Integer.parseInt(valAry[1]);break;caseLIKES:
                                lik +=Integer.parseInt(valAry[1]);break;caseVIEWS:
                                vie +=Integer.parseInt(valAry[1]);break;}String formatStr =String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);System.out.println("文章的id:"+key);System.out.println("当前时间窗口内的消息处理结果:"+formatStr);return formatStr;}},Materialized.as("hot-atricle-stream-count-001")).toStream().map((key,value)->{returnnewKeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));})//发送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/**
     * 格式化消息的value数据
     * @param articleId
     * @param value
     * @return
     */publicStringformatObj(String articleId,String value){ArticleVisitStreamMess mess =newArticleVisitStreamMess();
        mess.setArticleId(Long.valueOf(articleId));//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry = value.split(",");for(String val : valAry){String[] split = val.split(":");switch(UpdateArticleMess.UpdateArticleType.valueOf(split[0])){caseCOLLECTION:
                    mess.setCollect(Integer.parseInt(split[1]));break;caseCOMMENT:
                    mess.setComment(Integer.parseInt(split[1]));break;caseLIKES:
                    mess.setLike(Integer.parseInt(split[1]));break;caseVIEWS:
                    mess.setView(Integer.parseInt(split[1]));break;}}
        log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));returnJSON.toJSONString(mess);}}

大概就是一开始为like collect view comment设置初始值

根据上一次的流进行递增一开始setAdd的正/负值,10秒内统计累加或累减后的值

最后糅杂 计算后的值和文章id 的KeyValue对象,to发送给下游塞到数据库里

到现在完成了上半部分流程,接下来下半部分
image-20240921173412749

4.2.6)数据库数量业务

思路

1.更新到数据库

2.分值*3

3.分别替换当前文章对应媒体或推荐页的redis,

先把redis里的数据拿出来用list装好,然后开始验证,后面加工处理完在放回redis

①存在redis->修改

②不存在redis

  • 大于30条,sorted把最大的冒泡到最后一位然后revese getsize-1 拿到垫底的哪个哥们,然后比对,如果比当前文章还要拉跨,那么直接remove,然后拷贝刚才aparticle的完整消息到新的文章,add进去list
  • 小于30条,空位管够,直接补充其他数据(bean copyProperty)然后add即可

③此时list,add进来的数据是大于最后一条的,但是不知道有没有大于其他,还得继续排序

④ 准备好key,和json字符串的数组塞回redis

代码

apArticleService

/**
     * 更新文章的分值  同时更新缓存中的热点文章数据
     * @param mess
     */publicvoidupdateScore(ArticleVisitStreamMess mess);

impl

/**
     * 更新文章的分值  同时更新缓存中的热点文章数据
     * @param mess
     */@OverridepublicvoidupdateScore(ArticleVisitStreamMess mess){//1.更新文章的阅读、点赞、收藏、评论的数量ApArticle apArticle =updateArticle(mess);//2.计算文章的分值Integer score =computeScore(apArticle);
        score = score *3;//3.替换当前文章对应频道的热点数据replaceDataToRedis(apArticle, score,ArticleConstants.HOT_ARTICLE_FIRST_PAGE+ apArticle.getChannelId());//4.替换推荐对应的热点数据replaceDataToRedis(apArticle, score,ArticleConstants.HOT_ARTICLE_FIRST_PAGE+ArticleConstants.DEFAULT_TAG);}/**
     * 替换数据并且存入到redis
     * @param apArticle
     * @param score
     * @param s
     */privatevoidreplaceDataToRedis(ApArticle apArticle,Integer score,String s){String articleListStr = cacheService.get(s);if(StringUtils.isNotBlank(articleListStr)){List<HotArticleVo> hotArticleVoList =JSON.parseArray(articleListStr,HotArticleVo.class);boolean flag =true;//如果缓存中存在该文章,只更新分值for(HotArticleVo hotArticleVo : hotArticleVoList){if(hotArticleVo.getId().equals(apArticle.getId())){
                    hotArticleVo.setScore(score);
                    flag =false;break;}}//如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换if(flag){if(hotArticleVoList.size()>=30){
                    hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size()-1);if(lastHot.getScore()< score){
                        hotArticleVoList.remove(lastHot);HotArticleVo hot =newHotArticleVo();BeanUtils.copyProperties(apArticle, hot);
                        hot.setScore(score);
                        hotArticleVoList.add(hot);}}else{HotArticleVo hot =newHotArticleVo();BeanUtils.copyProperties(apArticle, hot);
                    hot.setScore(score);
                    hotArticleVoList.add(hot);}}//缓存到redis
            hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
            cacheService.set(s,JSON.toJSONString(hotArticleVoList));}}/**
     * 更新文章行为数量
     * @param mess
     */privateApArticleupdateArticle(ArticleVisitStreamMess mess){ApArticle apArticle =getById(mess.getArticleId());
        apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
        apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
        apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
        apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());updateById(apArticle);return apArticle;}/**
     * 计算文章的具体分值
     * @param apArticle
     * @return
     */privateIntegercomputeScore(ApArticle apArticle){Integer score =0;if(apArticle.getLikes()!=null){
            score += apArticle.getLikes()*ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;}if(apArticle.getViews()!=null){
            score += apArticle.getViews();}if(apArticle.getComment()!=null){
            score += apArticle.getComment()*ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;}if(apArticle.getCollection()!=null){
            score += apArticle.getCollection()*ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;}return score;}
下游监听器
packagecom.heima.article.listener;importcom.alibaba.fastjson.JSON;importcom.heima.article.service.ApArticleService;importcom.heima.common.constants.HotArticleConstants;importcom.heima.model.mess.ArticleVisitStreamMess;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.StringUtils;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassArticleIncrHandleListener{@AutowiredprivateApArticleService apArticleService;@KafkaListener(topics =HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)publicvoidonMessage(String mess){if(StringUtils.isNotBlank(mess)){ArticleVisitStreamMess articleVisitStreamMess =JSON.parseObject(mess,ArticleVisitStreamMess.class);
            apArticleService.updateScore(articleVisitStreamMess);}}}

监听,转化对象,调方法

测试

image-20240921182609682

后续集成到博客再来学这一块内容,挖个坑

标签: docker 容器 linux

本文转载自: https://blog.csdn.net/Zwwxd666/article/details/142421729
版权归原作者 Zww0891 所有, 如有侵权,请联系我们删除。

“2024/9/21黑马头条跟学笔记(十一) 最有含金量的一集,大数据权重计算推送”的评论:

还没有评论