0


Kafka系列(四)

本文接kafka三,代码实践kafkaStream的应用,用来完成流式计算。

kafkastream

  1. 关于流式计算也就是实时处理,无时间概念边界的处理一些数据。想要更有性价比地和java程序进行结合,因此了解了kafka。但是本人阅读了kafka地官网,觉得可阅读性并不是很高,当然是个人认为,就是界面做的就不是很舒服。

简介

简介一下kafkaStream

Kafka Stream的特点
  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证(想要保证消息有序性就要设置一个分区)
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)
关键概念
  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

Kstream

(1)数据结构类似于map,key-value键值对

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。 数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->(“” alice“,3)

如果流处理应用是要总结每个用户的价值,它将返回

  1. alice4

。因为第二条数据记录不会覆盖第一条,而是做了一个insert,累加。

代码实现

依赖
  1. <!-- kafkfa -->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. <exclusions>
  6. <exclusion>
  7. <groupId>org.apache.kafka</groupId>
  8. <artifactId>kafka-clients</artifactId>
  9. </exclusion>
  10. </exclusions>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.kafka</groupId>
  14. <artifactId>kafka-clients</artifactId>
  15. </dependency>
  16. <dependency>
  17. <groupId>com.alibaba</groupId>
  18. <artifactId>fastjson</artifactId>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.kafka</groupId>
  22. <artifactId>kafka-streams</artifactId>
  23. <exclusions>
  24. <exclusion>
  25. <artifactId>connect-json</artifactId>
  26. <groupId>org.apache.kafka</groupId>
  27. </exclusion>
  28. <exclusion>
  29. <groupId>org.apache.kafka</groupId>
  30. <artifactId>kafka-clients</artifactId>
  31. </exclusion>
  32. </exclusions>
  33. </dependency>
kafkaStream配置类

需要在nacos的配置里面配置hosts属性和group,本地等怎么配置都可以,只要能读取到就行。

  1. /**
  2. * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
  3. */
  4. @Setter
  5. @Getter
  6. @Configuration
  7. @EnableKafkaStreams
  8. @ConfigurationProperties(prefix="kafka")
  9. public class KafkaStreamConfig {
  10. private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
  11. private String hosts;
  12. private String group;
  13. @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  14. public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
  15. Map<String, Object> props = new HashMap<>();
  16. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
  17. props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
  18. props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
  19. props.put(StreamsConfig.RETRIES_CONFIG, 10);
  20. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  21. props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  22. return new KafkaStreamsConfiguration(props);
  23. }
  24. }

这里生产者和消费者我就不再举例子了,直接举中间这个stream怎么写。

stream需要知道是谁发的,所以生产者和stream需要绑定一个相同的主题,而stream需要知道要给谁发送过去,消费者知道是谁发的,所以stream和消费者又有一个相同的主题。

streamhandler代码

具体的每一行代码的含义结合个人理解都在注释里面。

  1. package com.neu.article.stream;
  2. import com.alibaba.fastjson.JSON;
  3. import com.neu.base.constants.HotArticleConstants;
  4. import com.neu.base.model.mess.ArticleVisitStreamMess;
  5. import com.neu.base.model.mess.UpdateArticleMess;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.commons.lang3.StringUtils;
  8. import org.apache.kafka.streams.KeyValue;
  9. import org.apache.kafka.streams.StreamsBuilder;
  10. import org.apache.kafka.streams.kstream.*;
  11. import org.springframework.context.annotation.Bean;
  12. import org.springframework.context.annotation.Configuration;
  13. import java.time.Duration;
  14. @Configuration
  15. @Slf4j
  16. public class HotArticleStreamHandler {
  17. @Bean
  18. public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
  19. //接收消息
  20. KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
  21. //聚合流式处理
  22. stream.map((key,value)->{
  23. UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
  24. //重置消息的key:1234343434(文章id) 和 value: likes:1 当前文章点赞一次
  25. //mess.getType().name():用于区分是点赞还是阅读 mess.getAdd():用于区分是加1还是减1
  26. return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
  27. })
  28. //按照文章id进行聚合
  29. .groupBy((key,value)->key)
  30. //时间窗口
  31. .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
  32. /**
  33. * 自行的完成聚合的计算
  34. */
  35. .aggregate(new Initializer<String>() {
  36. /**
  37. * 初始方法,返回值是消息的value->aggValue,聚合之后的value
  38. * @return
  39. */
  40. @Override
  41. public String apply() {
  42. return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
  43. }
  44. /**
  45. * 真正的聚合操作,返回值是消息的value
  46. */
  47. }, new Aggregator<String, String, String>() {
  48. /**
  49. *
  50. * @param key 消息的key :mess.getArticleId().toString()
  51. * @param value 消息的value likes:1
  52. * @param aggValue 初始化消息聚合后的一个值 COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
  53. * @return
  54. */
  55. @Override
  56. public String apply(String key, String value, String aggValue) {
  57. System.out.println(value);
  58. if(StringUtils.isBlank(value)){
  59. return aggValue;
  60. }
  61. String[] aggAry = aggValue.split(",");
  62. int col = 0,com=0,lik=0,vie=0;
  63. for (String agg : aggAry) {
  64. //agg遍历第一次的时候最开始为 COLLECTION:0
  65. String[] split = agg.split(":");//split[0] = COLLECTION split[1] = 0
  66. /**
  67. * 获得初始值,也是时间窗口内计算之后的值
  68. */
  69. switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
  70. case COLLECTION:
  71. col = Integer.parseInt(split[1]);
  72. break;
  73. case COMMENT:
  74. com = Integer.parseInt(split[1]);
  75. break;
  76. case LIKES:
  77. lik = Integer.parseInt(split[1]);
  78. break;
  79. case VIEWS:
  80. vie = Integer.parseInt(split[1]);
  81. break;
  82. }
  83. }
  84. /**
  85. * 累加操作 likes:1
  86. */
  87. String[] valAry = value.split(":");
  88. //valAry[0] = likes valAry[1] = 1
  89. switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
  90. case COLLECTION:
  91. col += Integer.parseInt(valAry[1]);
  92. break;
  93. case COMMENT:
  94. com += Integer.parseInt(valAry[1]);
  95. break;
  96. case LIKES:
  97. lik += Integer.parseInt(valAry[1]);
  98. break;
  99. case VIEWS:
  100. vie += Integer.parseInt(valAry[1]);
  101. break;
  102. }
  103. //返回值是有要求的,必须与初始化apply方法的返回值形式一致
  104. String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
  105. System.out.println("文章的id:"+key);
  106. System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
  107. return formatStr;
  108. }
  109. //Materialized.as("hot-atricle-stream-count-001"):用于指定六十处理的状态,字符串可以随便给,多个流处理的话不重复就行
  110. }, Materialized.as("hot-atricle-stream-count-001"))
  111. .toStream()
  112. .map((key,value)->{
  113. //key.key().toString():文章id,value:COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
  114. return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
  115. })
  116. //发送消息
  117. .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);
  118. return stream;
  119. }
  120. /**
  121. * 格式化消息的value数据
  122. * @param articleId
  123. * @param value
  124. * @return
  125. */
  126. public String formatObj(String articleId,String value){
  127. ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
  128. mess.setArticleId(Long.valueOf(articleId));
  129. //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
  130. String[] valAry = value.split(",");
  131. for (String val : valAry) {
  132. String[] split = val.split(":");
  133. switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
  134. case COLLECTION:
  135. mess.setCollect(Integer.parseInt(split[1]));
  136. break;
  137. case COMMENT:
  138. mess.setComment(Integer.parseInt(split[1]));
  139. break;
  140. case LIKES:
  141. mess.setLike(Integer.parseInt(split[1]));
  142. break;
  143. case VIEWS:
  144. mess.setView(Integer.parseInt(split[1]));
  145. break;
  146. }
  147. }
  148. log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
  149. return JSON.toJSONString(mess);
  150. }
  151. }
标签: kafka 分布式

本文转载自: https://blog.csdn.net/Java_Zzm/article/details/135632892
版权归原作者 happystudy_neu 所有, 如有侵权,请联系我们删除。

“Kafka系列(四)”的评论:

还没有评论