0


【Kafka】Kafka Stream简单使用

一、实时流式计算

1. 概念

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算
在这里插入图片描述
流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

2. 应用场景

  • 日志分析: 网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
  • 大屏看板统计: 可以实时的查看网站注册数量,订单数量,购买数量,金额等。
  • 公交实时数据: 可以随时更新公交车方位,计算多久到达站牌等
  • 实时文章分值计算

比如应用较广的 头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐

3. Kafka Stream

近些年来,开源流处理领域涌现出了很多优秀框架。光是在 Apache 基金会孵化的项目,关于流处理的大数据框架就有十几个之多,比如早期的

 Apache Samza

Apache Storm

,以及这些年火爆的

Spark

以及

Flink

等。

3.1 Kafka Streams的特点
  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed joinaggregation
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

在这里插入图片描述

3.2 关键概念

一个最简单的

Streaming

的结构如下图所示:
在这里插入图片描述

从一个

Topic

中读取到数据,经过一些处理操作之后,写入到另一个

Topic

中,这就是一个最简单的

Streaming流式计算

。其中,

Source Topic

中的数据会源源不断的产生新数据。
那么,我们再在上面的结构之上扩展一下,假设定义了多个

Source Topic

Destination Topic

,那就构成如下图所示的较为复杂的拓扑结构:
在这里插入图片描述

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器
  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题在这里插入图片描述Kafka Streams被认为是开发实时应用程序的最简单方法。它是一个Kafka的客户端API库,编写简单的java就可以实现流式处理。
3.3 KStream
KStream

:数据结构类似于

map

,如下图,

key-value

键值对

在这里插入图片描述

KStream

数据流(data stream),是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream

负责抽象的,就是数据流。与Kafka自身

topic

中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

二、测试kafkaStream

先看下简单的

kafkaStream

KStream

测试

需求分析:求单词个数(word count)
在这里插入图片描述

1.

pom.xml

引入依赖:

<!-- kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency>

2. 配置文件

server:port:9991spring:application:name: kafka-demo
  kafka:bootstrap-servers: 192.168.200.130:9092producer:retries:10key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      compression-type: lz4
    consumer:group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3. 编写生产者

ProducerQuickStart.java 
packagecom.kafka.sample;importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.producer.*;importjava.util.Properties;@Slf4jpublicclassProducerQuickStart{publicstaticvoidmain(String[] args){//1. kafka的配置信息Properties prop =newProperties();//kafka的链接信息
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//配置重试次数
        prop.put(ProducerConfig.RETRIES_CONFIG,5);//数据压缩
        prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");//ack配置  消息确认机制   默认ack=1,即只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应//        prop.put(ProducerConfig.ACKS_CONFIG,"all");消息key的序列化器
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2. 生产者对象KafkaProducer<String,String> producer =newKafkaProducer<String,String>(prop);//封装发送的消息ProducerRecord<String,String> producerRecord =newProducerRecord<String,String>("itcast-topic-input","key_001","hello kafka");//3. 发送消息for(int i =0; i <5; i++){
            producer.send(producerRecord);}//4. 关闭消息通道  必须关闭,否则消息发不出去
        producer.close();}}

4 编写kafkaStream流式处理

KafkaStreamQuickStart.java 
packagecom.kafka.sample;importorg.apache.kafka.common.serialization.Serdes;importorg.apache.kafka.streams.KafkaStreams;importorg.apache.kafka.streams.KeyValue;importorg.apache.kafka.streams.StreamsBuilder;importorg.apache.kafka.streams.StreamsConfig;importorg.apache.kafka.streams.kstream.KStream;importorg.apache.kafka.streams.kstream.TimeWindows;importorg.apache.kafka.streams.kstream.ValueMapper;importjava.time.Duration;importjava.util.Arrays;importjava.util.Properties;/**
 * 流式处理
 */publicclassKafkaStreamQuickStart{publicstaticvoidmain(String[] args){//kafka的配置信心Properties prop =newProperties();
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream 构建器StreamsBuilder streamsBuilder =newStreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建kafkaStream对象KafkaStreams kafkaStreams =newKafkaStreams(streamsBuilder.build(),prop);//开启流式计算
        kafkaStreams.start();}/**
     * 流式计算
     * 消息的内容:hello kafka  hello itcast
     * @param streamsBuilder
     */privatestaticvoidstreamProcessor(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String,String> stream = streamsBuilder.stream("itcast-topic-input");/**
         * 处理消息的value
         */
        stream.flatMapValues(newValueMapper<String,Iterable<String>>(){@OverridepublicIterable<String>apply(String value){returnArrays.asList(value.split(" "));}})//按照value进行聚合处理.groupBy((key,value)->value)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词的个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);returnnewKeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}}

5. 编写消费者

ConsumerQuickStart.java 
packagecom.kafka.sample;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.time.Duration;importjava.util.Collections;importjava.util.Properties;publicclassConsumerQuickStart{publicstaticvoidmain(String[] args){//1. 添加kafka的配置信息Properties properties =newProperties();// 配置链接信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//配置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-2");//配置消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//2. 消费者对象KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(properties);//3. 订阅主题
        consumer.subscribe(Collections.singletonList("itcast-topic-out"));//当前线程一直监听消息while(true){//4. 消费者拉取消息: 每秒拉取一次ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){System.out.println(record.key());System.out.println(record.value());}}}}

启动项目:

  1. 在远端(192.168.200.130:9092)启动docker中的kafka容器
  2. 启动消费者ConsumerQuickStartmain函数
  3. 启动kafkastreammian函数
  4. 启动生产者ProducerQuickStartmain函数

5. 控制台打印结果:

在这里插入图片描述
在这里插入图片描述

整个过程:
生产者向

kafka

中发送了5条

“hello kafka”

消息,topic均为

itcast-topic-input

。kafkastream监听这个topic,每10秒进行一次流式处理,将

“hello kakfa”

字符串分割,并统计每个单词出现的次数。然后转为

kstream

,发送消息到kafka中的

topic=itcast-topic-out”

。消费者监听

“itcast-topic-out”

的topic,消费消息。

三、Springboot整合kafkaStream

1. 配置文件新增

application.yml
server:port:9991spring:application:name: kafka-demo
  kafka:bootstrap-servers: 192.168.200.130:9092producer:retries:10key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      compression-type: lz4
    consumer:group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# kafkaStream新增以下配置kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}

2. 在微服务中新增配置类

KafkaStreamConfig.java 
packagecom.kafka.config;importlombok.Getter;importlombok.Setter;importorg.apache.kafka.common.serialization.Serdes;importorg.apache.kafka.streams.StreamsConfig;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.annotation.EnableKafkaStreams;importorg.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;importorg.springframework.kafka.config.KafkaStreamsConfiguration;importjava.util.HashMap;importjava.util.Map;/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */@Setter@Getter@Configuration@EnableKafkaStreams@ConfigurationProperties(prefix="kafka")publicclassKafkaStreamConfig{privatestaticfinalintMAX_MESSAGE_SIZE=16*1024*1024;privateString hosts;privateString group;@Bean(name =KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)publicKafkaStreamsConfigurationdefaultKafkaStreamsConfig(){Map<String,Object> props =newHashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG,this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG,10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());returnnewKafkaStreamsConfiguration(props);}}

3. 使用kafkaStream监听消息

KafkaStreamHelloListener.java
packagecom.kafka.stream;importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.streams.KeyValue;importorg.apache.kafka.streams.StreamsBuilder;importorg.apache.kafka.streams.kstream.KStream;importorg.apache.kafka.streams.kstream.TimeWindows;importorg.apache.kafka.streams.kstream.ValueMapper;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.time.Duration;importjava.util.Arrays;@Configuration@Slf4jpublicclassKafkaStreamHelloListener{@BeanpublicKStream<String,String>kStream(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String,String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues(newValueMapper<String,Iterable<String>>(){@OverridepublicIterable<String>apply(String value){returnArrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);returnnewKeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");return stream;}}

测试:

启动

springboot

应用程序,运行之前的

ProducerQuickStart

来生产消息,约10秒后,看到

kafkaStream

消息的处理结果
在这里插入图片描述

**说明

kafkaStream

接收到消息并将多条消息进行了统一处理。**

参考(推荐阅读):

  1. https://cloud.tencent.com/developer/article/2100664
  2. https://www.cnblogs.com/tree1123/p/11457851.html
标签: java kafka 微服务

本文转载自: https://blog.csdn.net/zsx1713366249/article/details/132522600
版权归原作者 小星星* 所有, 如有侵权,请联系我们删除。

“【Kafka】Kafka Stream简单使用”的评论:

还没有评论