0


Kafka Streams 开发单词计数应用

1、添加依赖(IDEA工具操作)

<dependency>
        <groupId>org.apache.kafka</groupId>

        <artifactId>kafka-streams</artifactId>

        <version>2.0.0</version>

    </dependency>

2、编写代码(IDEA工具操作)

根据上述业务流程分析得出,单词数据通过自定义处处理器接收并执行相应业务计算,因此创建LogProcessor类,并且继承StreamsAPI中的P ocessor接口,在Processor接口中,定义了以下3个方法。

(1)init(ProcessorContextprocessorContext):初如治化上下文对象。

(2)process(Key,Value):每接收到一条消息时,都会调用该方法处理并更新状态进行存储。

(3)close():关闭处理器,这里可以做一些资源清理理工作。 Kafka Streams单词计数详细代码如下所示。

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorContext;

import java.util.HashMap;

public class LogProcessor implements Processor<byte[],byte[]> {

private ProcessorContext processorContext;

public void init(ProcessorContext processorContext) {

    this.processorContext = processorContext;

}

public void process(byte[] key, byte[] value) {

    String inputOri = new String(value);

    HashMap <String,Integer>map = new HashMap<String, Integer>();

    int times = 1;

    if (inputOri.contains("")){

        //截取字段

        String[] words = inputOri.split(" ");

        for (String word : words){

            if (map.containsKey(word)){

                map.put(word,map.get(word)+1);

            }else {

                map.put(word,times);

            }

        }

    }

    inputOri = map.toString();

    processorContext.forward(key,inputOri.getBytes());

}

public void close() {

}

}

单词计数的业务功能开发完之后,Kafka Streams 需要编写一个运行主程序的类App,来测试LogProcessor业务程序,具体代码如下:

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.Topology;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorSupplier;

import java.util.Properties;

public class App {

public static void main(String[] args) {

    //声明来源主题

    String fromTopic = "test1";

    //声明目标主题

    String toTopic = "test2";

    //设置参数信息

    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");

    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,

            "master:9092,slave1:9092,slave2:9092");

    //实例化StreamsConfig对象

    StreamsConfig config = new StreamsConfig(props);

    //构建拓扑结构

    Topology topology = new Topology();

    //添加原处理节点,为原处理节点指定名称和他订阅的主题

    topology.addSource("SOURCE",fromTopic)

            //添加自定义处理节点,指定处理器类和上一节点的名称

    .addProcessor("PROCESSOR", new ProcessorSupplier() {

        public Processor get() {

            return new LogProcessor();

        }

    },"SOURCE")

            //添加目标处理节点,需要指定目标处理节点和上一节点的名称

            .addSink("SINK",toTopic,"PROCESSOR");

    //实例化KafkaStreams对象

    KafkaStreams streams = new KafkaStreams(topology,config);

    streams.start();

}

}

3、执行测试

代码编写完后,在master节点创建test1和test2主题,命令如下:

创建来源主题

*kafka-topics.sh --create *

**--topic test1 **

**--partitions 3 **

**--replication-factor 2 **

--zookeeper master:2181,slave1:2181,slave2:2181

创建目标主题

*kafka-topics.sh --create *

**--topic test2 **

**--partitions 3 **

**--replication-factor 2 **

--zookeeper master:2181,slave1:2181,slave2:2181

成功创建好目标主题后,分别在master节点和slave1节点启动生产者服务和消费者服务。

启动生产者服务命令如下:

*kafka-console-procuder.sh *

**--broker-list master:9092,slave1:9092,slave2:9092 **

--topic test1

启动消费者服务命令如下:

*kafka-console-consumer.sh *

**--from-beginning **

**--topic test2 **

--bootstrap-server master:9092,slave1:9092,slave2:9092

在生产者服务节点master输入数据:

到IDEA中运行创建好的App文件中的代码,运行结果如下:

以上两张图可以看到Kafka Streams所需的测试环境已经配置完成了。

标签: kafka 分布式 java

本文转载自: https://blog.csdn.net/m0_59839948/article/details/125219089
版权归原作者 鄙人阿彬 所有, 如有侵权,请联系我们删除。

“Kafka Streams 开发单词计数应用”的评论:

还没有评论