0


黑马头条微服务学习day6-kafka及异步通知文章上下架

文章目录

自媒体文章上下架

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka概述

入门案例

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
(1)创建kafka-demo项目,导入依赖

  1. <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

(2)生产者发送消息

  1. packagecom.heima.kafka.sample;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importjava.util.Properties;/**
  2. * 生产者
  3. */publicclassProducerQuickStart{publicstaticvoidmain(String[] args){//1.kafka的配置信息Properties properties =newProperties();//kafka的连接地址
  4. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数
  5. properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器
  6. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器
  7. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.生产者对象KafkaProducer<String,String> producer =newKafkaProducer<String,String>(properties);//封装发送的消息ProducerRecord<String,String> record =newProducerRecord<String,String>("itheima-topic","100001","hello kafka");//3.发送消息
  8. producer.send(record);//4.关闭消息通道,必须关闭,否则消息发送不成功
  9. producer.close();}}

在这里插入图片描述
(3)消费者接收消息

  1. packagecom.heima.kafka.sample;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.time.Duration;importjava.util.Collections;importjava.util.Properties;/**
  2. * 消费者
  3. */publicclassConsumerQuickStart{publicstaticvoidmain(String[] args){//1.添加kafka的配置信息Properties properties =newProperties();//kafka的连接地址
  4. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//消费者组
  5. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group2");//消息的反序列化器
  6. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
  7. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//2.消费者对象KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(properties);//3.订阅主题
  8. consumer.subscribe(Collections.singletonList("itheima-topic"));//当前线程一直处于监听状态while(true){//4.获取消息ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}

总结

  • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)
  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)

分区

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka高可用设计

集群

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

发送类型

在这里插入图片描述
在这里插入图片描述

  • 异步发送调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数
  1. //异步消息发送
  2. producer.send(kvProducerRecord,newCallback(){@OverridepublicvoidonCompletion(RecordMetadata recordMetadata,Exception e){if(e !=null){System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}});

参数详解

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

消费者详解

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述


在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
1.提交当前偏移量(同步提交)

  1. enable.auto.commit

设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。

只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。

  1. while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println(record.value());System.out.println(record.key());try{
  2. consumer.commitSync();//同步提交当前最新的偏移量}catch(CommitFailedException e){System.out.println("记录提交失败的异常:"+e);}}}

2.异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。

  1. while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println(record.value());System.out.println(record.key());}
  2. consumer.commitAsync(newOffsetCommitCallback(){@OverridepublicvoidonComplete(Map<TopicPartition,OffsetAndMetadata> map,Exception e){if(e!=null){System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);}}});}

3.同步和异步组合提交

异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。

举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

  1. try{while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println(record.value());System.out.println(record.key());}
  2. consumer.commitAsync();}}catch(Exception e){+
  3. e.printStackTrace();System.out.println("记录错误信息:"+e);}finally{try{
  4. consumer.commitSync();}finally{
  5. consumer.close();}}

SpringBoot集成Kafka

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
1.导入spring-kafka依赖信息

  1. <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency></dependencies>

2.在resources下创建文件application.yml

  1. server:port:9991spring:application:name: kafka-demo
  2. kafka:bootstrap-servers: 192.168.200.130:9092producer:retries:10key-serializer: org.apache.kafka.common.serialization.StringSerializer
  3. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  4. consumer:group-id: ${spring.application.name}-test
  5. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  6. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.消息生产者

  1. packagecom.heima.kafka.controller;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassHelloController{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;@GetMapping("/hello")publicStringhello(){
  2. kafkaTemplate.send("itcast-topic","黑马程序员");return"ok";}}

4.消息消费者

  1. packagecom.heima.kafka.listener;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importorg.springframework.util.StringUtils;@ComponentpublicclassHelloListener{@KafkaListener(topics ="itcast-topic")publicvoidonMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}}}

传递为消息对象

在这里插入图片描述

  • 发送消息
  1. @GetMapping("/hello")publicStringhello(){User user =newUser();
  2. user.setUsername("xiaowang");
  3. user.setAge(18);
  4. kafkaTemplate.send("user-topic",JSON.toJSONString(user));return"ok";}
  • 接收消息
  1. packagecom.heima.kafka.listener;importcom.alibaba.fastjson.JSON;importcom.heima.kafka.pojo.User;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importorg.springframework.util.StringUtils;@ComponentpublicclassHelloListener{@KafkaListener(topics ="user-topic")publicvoidonMessage(String message){if(!StringUtils.isEmpty(message)){User user =JSON.parseObject(message,User.class);System.out.println(user);}}}

文章上下架功能实现

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
DTO

  1. @DatapublicclassWmNewsDto{privateInteger id;/**
  2. * 是否上架 0 下架 1 上架
  3. */privateShort enable;}

1)接口定义

在heima-leadnews-wemedia工程下的WmNewsController新增方法

  1. @PostMapping("/down_or_up")publicResponseResultdownOrUp(@RequestBodyWmNewsDto dto){returnnull;}

在WmNewsDto中新增enable属性 ,完整的代码如下:

  1. packagecom.heima.model.wemedia.dtos;importlombok.Data;importjava.util.Date;importjava.util.List;@DatapublicclassWmNewsDto{privateInteger id;/**
  2. * 标题
  3. */privateString title;/**
  4. * 频道id
  5. */privateInteger channelId;/**
  6. * 标签
  7. */privateString labels;/**
  8. * 发布时间
  9. */privateDate publishTime;/**
  10. * 文章内容
  11. */privateString content;/**
  12. * 文章封面类型 0 无图 1 单图 3 多图 -1 自动
  13. */privateShort type;/**
  14. * 提交时间
  15. */privateDate submitedTime;/**
  16. * 状态 提交为1 草稿为0
  17. */privateShort status;/**
  18. * 封面图片列表 多张图以逗号隔开
  19. */privateList<String> images;/**
  20. * 上下架 0 下架 1 上架
  21. */privateShort enable;}

2)业务层编写

在WmNewsService新增方法

  1. /**
  2. * 文章的上下架
  3. * @param dto
  4. * @return
  5. */publicResponseResultdownOrUp(WmNewsDto dto);

实现方法

  1. /**
  2. * 文章的上下架
  3. * @param dto
  4. * @return
  5. */@OverridepublicResponseResultdownOrUp(WmNewsDto dto){//1.检查参数if(dto.getId()==null){returnResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.查询文章WmNews wmNews =getById(dto.getId());if(wmNews ==null){returnResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");}//3.判断文章是否已发布if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){returnResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");}//4.修改文章enableif(dto.getEnable()!=null&& dto.getEnable()>-1&& dto.getEnable()<2){update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable()).eq(WmNews::getId,wmNews.getId()));}returnResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}

3)控制器

  1. @PostMapping("/down_or_up")publicResponseResultdownOrUp(@RequestBodyWmNewsDto dto){return wmNewsService.downOrUp(dto);}

4)测试

在这里插入图片描述
在这里插入图片描述

1)在heima-leadnews-common模块下导入kafka依赖

  1. <!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

2)在自媒体端的nacos配置中心配置kafka的生产者

  1. spring:kafka:bootstrap-servers: 192.168.200.130:9092producer:retries:10key-serializer: org.apache.kafka.common.serialization.StringSerializer
  2. value-serializer: org.apache.kafka.common.serialization.StringSerializer

3)在自媒体端文章上下架后发送消息

  1. //发送消息,通知article端修改文章配置if(wmNews.getArticleId()!=null){Map<String,Object> map =newHashMap<>();
  2. map.put("articleId",wmNews.getArticleId());
  3. map.put("enable",dto.getEnable());
  4. kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));}

常量类:

  1. publicclassWmNewsMessageConstants{publicstaticfinalStringWM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";}

4)在article端的nacos配置中心配置kafka的消费者

  1. spring:kafka:bootstrap-servers: 192.168.200.130:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  2. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

5)在article端编写监听,接收数据

  1. packagecom.heima.article.listener;importcom.alibaba.fastjson.JSON;importcom.heima.article.service.ApArticleConfigService;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.StringUtils;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importjava.util.Map;@Component@Slf4jpublicclassArtilceIsDownListener{@AutowiredprivateApArticleConfigService apArticleConfigService;@KafkaListener(topics =WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)publicvoidonMessage(String message){if(StringUtils.isNotBlank(message)){Map map =JSON.parseObject(message,Map.class);
  2. apArticleConfigService.updateByMap(map);
  3. log.info("article端文章配置修改,articleId={}",map.get("articleId"));}}}

6)修改ap_article_config表的数据

新建ApArticleConfigService

  1. packagecom.heima.article.service;importcom.baomidou.mybatisplus.extension.service.IService;importcom.heima.model.article.pojos.ApArticleConfig;importjava.util.Map;publicinterfaceApArticleConfigServiceextendsIService<ApArticleConfig>{/**
  2. * 修改文章配置
  3. * @param map
  4. */publicvoidupdateByMap(Map map);}

实现类:

  1. packagecom.heima.article.service.impl;importcom.baomidou.mybatisplus.core.toolkit.Wrappers;importcom.baomidou.mybatisplus.extension.service.impl.ServiceImpl;importcom.heima.article.mapper.ApArticleConfigMapper;importcom.heima.article.service.ApArticleConfigService;importcom.heima.model.article.pojos.ApArticleConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importjava.util.Map;@Service@Slf4j@TransactionalpublicclassApArticleConfigServiceImplextendsServiceImpl<ApArticleConfigMapper,ApArticleConfig>implementsApArticleConfigService{/**
  2. * 修改文章配置
  3. * @param map
  4. */@OverridepublicvoidupdateByMap(Map map){//0 下架 1 上架Object enable = map.get("enable");boolean isDown =true;if(enable.equals(1)){
  5. isDown =false;}//修改文章配置update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));}}
标签: 微服务 学习 kafka

本文转载自: https://blog.csdn.net/qq_48642405/article/details/140897719
版权归原作者 Jimmy Ding 所有, 如有侵权,请联系我们删除。

“黑马头条微服务学习day6-kafka及异步通知文章上下架”的评论:

还没有评论