文章目录
自媒体文章上下架
Kafka概述
入门案例
(1)创建kafka-demo项目,导入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>
(2)生产者发送消息
packagecom.heima.kafka.sample;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importjava.util.Properties;/**
* 生产者
*/publicclassProducerQuickStart{publicstaticvoidmain(String[] args){//1.kafka的配置信息Properties properties =newProperties();//kafka的连接地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.生产者对象KafkaProducer<String,String> producer =newKafkaProducer<String,String>(properties);//封装发送的消息ProducerRecord<String,String> record =newProducerRecord<String,String>("itheima-topic","100001","hello kafka");//3.发送消息
producer.send(record);//4.关闭消息通道,必须关闭,否则消息发送不成功
producer.close();}}
(3)消费者接收消息
packagecom.heima.kafka.sample;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.time.Duration;importjava.util.Collections;importjava.util.Properties;/**
* 消费者
*/publicclassConsumerQuickStart{publicstaticvoidmain(String[] args){//1.添加kafka的配置信息Properties properties =newProperties();//kafka的连接地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group2");//消息的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//2.消费者对象KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(properties);//3.订阅主题
consumer.subscribe(Collections.singletonList("itheima-topic"));//当前线程一直处于监听状态while(true){//4.获取消息ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}
总结
- 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)
- 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)
分区
Kafka高可用设计
集群
发送类型
- 异步发送调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数
//异步消息发送
producer.send(kvProducerRecord,newCallback(){@OverridepublicvoidonCompletion(RecordMetadata recordMetadata,Exception e){if(e !=null){System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}});
参数详解
消费者详解
1.提交当前偏移量(同步提交)
把
enable.auto.commit
设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。
while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println(record.value());System.out.println(record.key());try{
consumer.commitSync();//同步提交当前最新的偏移量}catch(CommitFailedException e){System.out.println("记录提交失败的异常:"+e);}}}
2.异步提交
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。
while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println(record.value());System.out.println(record.key());}
consumer.commitAsync(newOffsetCommitCallback(){@OverridepublicvoidonComplete(Map<TopicPartition,OffsetAndMetadata> map,Exception e){if(e!=null){System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);}}});}
3.同步和异步组合提交
异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。
举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。
try{while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println(record.value());System.out.println(record.key());}
consumer.commitAsync();}}catch(Exception e){+
e.printStackTrace();System.out.println("记录错误信息:"+e);}finally{try{
consumer.commitSync();}finally{
consumer.close();}}
SpringBoot集成Kafka
1.导入spring-kafka依赖信息
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency></dependencies>
2.在resources下创建文件application.yml
server:port:9991spring:application:name: kafka-demo
kafka:bootstrap-servers: 192.168.200.130:9092producer:retries:10key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.消息生产者
packagecom.heima.kafka.controller;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassHelloController{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;@GetMapping("/hello")publicStringhello(){
kafkaTemplate.send("itcast-topic","黑马程序员");return"ok";}}
4.消息消费者
packagecom.heima.kafka.listener;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importorg.springframework.util.StringUtils;@ComponentpublicclassHelloListener{@KafkaListener(topics ="itcast-topic")publicvoidonMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}}}
传递为消息对象
- 发送消息
@GetMapping("/hello")publicStringhello(){User user =newUser();
user.setUsername("xiaowang");
user.setAge(18);
kafkaTemplate.send("user-topic",JSON.toJSONString(user));return"ok";}
- 接收消息
packagecom.heima.kafka.listener;importcom.alibaba.fastjson.JSON;importcom.heima.kafka.pojo.User;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importorg.springframework.util.StringUtils;@ComponentpublicclassHelloListener{@KafkaListener(topics ="user-topic")publicvoidonMessage(String message){if(!StringUtils.isEmpty(message)){User user =JSON.parseObject(message,User.class);System.out.println(user);}}}
文章上下架功能实现
DTO
@DatapublicclassWmNewsDto{privateInteger id;/**
* 是否上架 0 下架 1 上架
*/privateShort enable;}
1)接口定义
在heima-leadnews-wemedia工程下的WmNewsController新增方法
@PostMapping("/down_or_up")publicResponseResultdownOrUp(@RequestBodyWmNewsDto dto){returnnull;}
在WmNewsDto中新增enable属性 ,完整的代码如下:
packagecom.heima.model.wemedia.dtos;importlombok.Data;importjava.util.Date;importjava.util.List;@DatapublicclassWmNewsDto{privateInteger id;/**
* 标题
*/privateString title;/**
* 频道id
*/privateInteger channelId;/**
* 标签
*/privateString labels;/**
* 发布时间
*/privateDate publishTime;/**
* 文章内容
*/privateString content;/**
* 文章封面类型 0 无图 1 单图 3 多图 -1 自动
*/privateShort type;/**
* 提交时间
*/privateDate submitedTime;/**
* 状态 提交为1 草稿为0
*/privateShort status;/**
* 封面图片列表 多张图以逗号隔开
*/privateList<String> images;/**
* 上下架 0 下架 1 上架
*/privateShort enable;}
2)业务层编写
在WmNewsService新增方法
/**
* 文章的上下架
* @param dto
* @return
*/publicResponseResultdownOrUp(WmNewsDto dto);
实现方法
/**
* 文章的上下架
* @param dto
* @return
*/@OverridepublicResponseResultdownOrUp(WmNewsDto dto){//1.检查参数if(dto.getId()==null){returnResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.查询文章WmNews wmNews =getById(dto.getId());if(wmNews ==null){returnResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");}//3.判断文章是否已发布if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){returnResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");}//4.修改文章enableif(dto.getEnable()!=null&& dto.getEnable()>-1&& dto.getEnable()<2){update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable()).eq(WmNews::getId,wmNews.getId()));}returnResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}
3)控制器
@PostMapping("/down_or_up")publicResponseResultdownOrUp(@RequestBodyWmNewsDto dto){return wmNewsService.downOrUp(dto);}
4)测试
1)在heima-leadnews-common模块下导入kafka依赖
<!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>
2)在自媒体端的nacos配置中心配置kafka的生产者
spring:kafka:bootstrap-servers: 192.168.200.130:9092producer:retries:10key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
3)在自媒体端文章上下架后发送消息
//发送消息,通知article端修改文章配置if(wmNews.getArticleId()!=null){Map<String,Object> map =newHashMap<>();
map.put("articleId",wmNews.getArticleId());
map.put("enable",dto.getEnable());
kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));}
常量类:
publicclassWmNewsMessageConstants{publicstaticfinalStringWM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";}
4)在article端的nacos配置中心配置kafka的消费者
spring:kafka:bootstrap-servers: 192.168.200.130:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
5)在article端编写监听,接收数据
packagecom.heima.article.listener;importcom.alibaba.fastjson.JSON;importcom.heima.article.service.ApArticleConfigService;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.StringUtils;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importjava.util.Map;@Component@Slf4jpublicclassArtilceIsDownListener{@AutowiredprivateApArticleConfigService apArticleConfigService;@KafkaListener(topics =WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)publicvoidonMessage(String message){if(StringUtils.isNotBlank(message)){Map map =JSON.parseObject(message,Map.class);
apArticleConfigService.updateByMap(map);
log.info("article端文章配置修改,articleId={}",map.get("articleId"));}}}
6)修改ap_article_config表的数据
新建ApArticleConfigService
packagecom.heima.article.service;importcom.baomidou.mybatisplus.extension.service.IService;importcom.heima.model.article.pojos.ApArticleConfig;importjava.util.Map;publicinterfaceApArticleConfigServiceextendsIService<ApArticleConfig>{/**
* 修改文章配置
* @param map
*/publicvoidupdateByMap(Map map);}
实现类:
packagecom.heima.article.service.impl;importcom.baomidou.mybatisplus.core.toolkit.Wrappers;importcom.baomidou.mybatisplus.extension.service.impl.ServiceImpl;importcom.heima.article.mapper.ApArticleConfigMapper;importcom.heima.article.service.ApArticleConfigService;importcom.heima.model.article.pojos.ApArticleConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importjava.util.Map;@Service@Slf4j@TransactionalpublicclassApArticleConfigServiceImplextendsServiceImpl<ApArticleConfigMapper,ApArticleConfig>implementsApArticleConfigService{/**
* 修改文章配置
* @param map
*/@OverridepublicvoidupdateByMap(Map map){//0 下架 1 上架Object enable = map.get("enable");boolean isDown =true;if(enable.equals(1)){
isDown =false;}//修改文章配置update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));}}
版权归原作者 Jimmy Ding 所有, 如有侵权,请联系我们删除。