0


黑马头条微服务学习day6-kafka及异步通知文章上下架

文章目录

自媒体文章上下架

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka概述

入门案例

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
(1)创建kafka-demo项目,导入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

(2)生产者发送消息

packagecom.heima.kafka.sample;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importjava.util.Properties;/**
 * 生产者
 */publicclassProducerQuickStart{publicstaticvoidmain(String[] args){//1.kafka的配置信息Properties properties =newProperties();//kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.生产者对象KafkaProducer<String,String> producer =newKafkaProducer<String,String>(properties);//封装发送的消息ProducerRecord<String,String> record =newProducerRecord<String,String>("itheima-topic","100001","hello kafka");//3.发送消息
        producer.send(record);//4.关闭消息通道,必须关闭,否则消息发送不成功
        producer.close();}}

在这里插入图片描述
(3)消费者接收消息

packagecom.heima.kafka.sample;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.time.Duration;importjava.util.Collections;importjava.util.Properties;/**
 * 消费者
 */publicclassConsumerQuickStart{publicstaticvoidmain(String[] args){//1.添加kafka的配置信息Properties properties =newProperties();//kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group2");//消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//2.消费者对象KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(properties);//3.订阅主题
        consumer.subscribe(Collections.singletonList("itheima-topic"));//当前线程一直处于监听状态while(true){//4.获取消息ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}

总结

  • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)
  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)

分区

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka高可用设计

集群

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

发送类型

在这里插入图片描述
在这里插入图片描述

  • 异步发送调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数
//异步消息发送
producer.send(kvProducerRecord,newCallback(){@OverridepublicvoidonCompletion(RecordMetadata recordMetadata,Exception e){if(e !=null){System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}});

参数详解

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

消费者详解

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述


在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
1.提交当前偏移量(同步提交)

enable.auto.commit

设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。

只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。

while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println(record.value());System.out.println(record.key());try{
            consumer.commitSync();//同步提交当前最新的偏移量}catch(CommitFailedException e){System.out.println("记录提交失败的异常:"+e);}}}

2.异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。

while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println(record.value());System.out.println(record.key());}
    consumer.commitAsync(newOffsetCommitCallback(){@OverridepublicvoidonComplete(Map<TopicPartition,OffsetAndMetadata> map,Exception e){if(e!=null){System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);}}});}

3.同步和异步组合提交

异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。

举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

try{while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println(record.value());System.out.println(record.key());}
        consumer.commitAsync();}}catch(Exception e){+
    e.printStackTrace();System.out.println("记录错误信息:"+e);}finally{try{
        consumer.commitSync();}finally{
        consumer.close();}}

SpringBoot集成Kafka

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
1.导入spring-kafka依赖信息

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency></dependencies>

2.在resources下创建文件application.yml

server:port:9991spring:application:name: kafka-demo
  kafka:bootstrap-servers: 192.168.200.130:9092producer:retries:10key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.消息生产者

packagecom.heima.kafka.controller;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassHelloController{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;@GetMapping("/hello")publicStringhello(){
        kafkaTemplate.send("itcast-topic","黑马程序员");return"ok";}}

4.消息消费者

packagecom.heima.kafka.listener;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importorg.springframework.util.StringUtils;@ComponentpublicclassHelloListener{@KafkaListener(topics ="itcast-topic")publicvoidonMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}}}

传递为消息对象

在这里插入图片描述

  • 发送消息
@GetMapping("/hello")publicStringhello(){User user =newUser();
    user.setUsername("xiaowang");
    user.setAge(18);

    kafkaTemplate.send("user-topic",JSON.toJSONString(user));return"ok";}
  • 接收消息
packagecom.heima.kafka.listener;importcom.alibaba.fastjson.JSON;importcom.heima.kafka.pojo.User;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importorg.springframework.util.StringUtils;@ComponentpublicclassHelloListener{@KafkaListener(topics ="user-topic")publicvoidonMessage(String message){if(!StringUtils.isEmpty(message)){User user =JSON.parseObject(message,User.class);System.out.println(user);}}}

文章上下架功能实现

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
DTO

@DatapublicclassWmNewsDto{privateInteger id;/**
    * 是否上架  0 下架  1 上架
    */privateShort enable;}

1)接口定义

在heima-leadnews-wemedia工程下的WmNewsController新增方法

@PostMapping("/down_or_up")publicResponseResultdownOrUp(@RequestBodyWmNewsDto dto){returnnull;}

在WmNewsDto中新增enable属性 ,完整的代码如下:

packagecom.heima.model.wemedia.dtos;importlombok.Data;importjava.util.Date;importjava.util.List;@DatapublicclassWmNewsDto{privateInteger id;/**
     * 标题
     */privateString title;/**
     * 频道id
     */privateInteger channelId;/**
     * 标签
     */privateString labels;/**
     * 发布时间
     */privateDate publishTime;/**
     * 文章内容
     */privateString content;/**
     * 文章封面类型  0 无图 1 单图 3 多图 -1 自动
     */privateShort type;/**
     * 提交时间
     */privateDate submitedTime;/**
     * 状态 提交为1  草稿为0
     */privateShort status;/**
     * 封面图片列表 多张图以逗号隔开
     */privateList<String> images;/**
     * 上下架 0 下架  1 上架
     */privateShort enable;}

2)业务层编写

在WmNewsService新增方法

/**
 * 文章的上下架
 * @param dto
 * @return
 */publicResponseResultdownOrUp(WmNewsDto dto);

实现方法

/**
 * 文章的上下架
 * @param dto
 * @return
 */@OverridepublicResponseResultdownOrUp(WmNewsDto dto){//1.检查参数if(dto.getId()==null){returnResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.查询文章WmNews wmNews =getById(dto.getId());if(wmNews ==null){returnResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");}//3.判断文章是否已发布if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){returnResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");}//4.修改文章enableif(dto.getEnable()!=null&& dto.getEnable()>-1&& dto.getEnable()<2){update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable()).eq(WmNews::getId,wmNews.getId()));}returnResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}

3)控制器

@PostMapping("/down_or_up")publicResponseResultdownOrUp(@RequestBodyWmNewsDto dto){return wmNewsService.downOrUp(dto);}

4)测试

在这里插入图片描述
在这里插入图片描述

1)在heima-leadnews-common模块下导入kafka依赖

<!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

2)在自媒体端的nacos配置中心配置kafka的生产者

spring:kafka:bootstrap-servers: 192.168.200.130:9092producer:retries:10key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

3)在自媒体端文章上下架后发送消息

//发送消息,通知article端修改文章配置if(wmNews.getArticleId()!=null){Map<String,Object> map =newHashMap<>();
    map.put("articleId",wmNews.getArticleId());
    map.put("enable",dto.getEnable());
    kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));}

常量类:

publicclassWmNewsMessageConstants{publicstaticfinalStringWM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";}

4)在article端的nacos配置中心配置kafka的消费者

spring:kafka:bootstrap-servers: 192.168.200.130:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

5)在article端编写监听,接收数据

packagecom.heima.article.listener;importcom.alibaba.fastjson.JSON;importcom.heima.article.service.ApArticleConfigService;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.StringUtils;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importjava.util.Map;@Component@Slf4jpublicclassArtilceIsDownListener{@AutowiredprivateApArticleConfigService apArticleConfigService;@KafkaListener(topics =WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)publicvoidonMessage(String message){if(StringUtils.isNotBlank(message)){Map map =JSON.parseObject(message,Map.class);
            apArticleConfigService.updateByMap(map);
            log.info("article端文章配置修改,articleId={}",map.get("articleId"));}}}

6)修改ap_article_config表的数据

新建ApArticleConfigService

packagecom.heima.article.service;importcom.baomidou.mybatisplus.extension.service.IService;importcom.heima.model.article.pojos.ApArticleConfig;importjava.util.Map;publicinterfaceApArticleConfigServiceextendsIService<ApArticleConfig>{/**
     * 修改文章配置
     * @param map
     */publicvoidupdateByMap(Map map);}

实现类:

packagecom.heima.article.service.impl;importcom.baomidou.mybatisplus.core.toolkit.Wrappers;importcom.baomidou.mybatisplus.extension.service.impl.ServiceImpl;importcom.heima.article.mapper.ApArticleConfigMapper;importcom.heima.article.service.ApArticleConfigService;importcom.heima.model.article.pojos.ApArticleConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importjava.util.Map;@Service@Slf4j@TransactionalpublicclassApArticleConfigServiceImplextendsServiceImpl<ApArticleConfigMapper,ApArticleConfig>implementsApArticleConfigService{/**
     * 修改文章配置
     * @param map
     */@OverridepublicvoidupdateByMap(Map map){//0 下架 1 上架Object enable = map.get("enable");boolean isDown =true;if(enable.equals(1)){
            isDown =false;}//修改文章配置update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));}}
标签: 微服务 学习 kafka

本文转载自: https://blog.csdn.net/qq_48642405/article/details/140897719
版权归原作者 Jimmy Ding 所有, 如有侵权,请联系我们删除。

“黑马头条微服务学习day6-kafka及异步通知文章上下架”的评论:

还没有评论