0


Kafka Streams 开发单词计数应用

1、添加依赖(IDEA工具操作)

<dependency>
  1. <groupId>org.apache.kafka</groupId>
  2. <artifactId>kafka-streams</artifactId>
  3. <version>2.0.0</version>
  4. </dependency>

2、编写代码(IDEA工具操作)

根据上述业务流程分析得出,单词数据通过自定义处处理器接收并执行相应业务计算,因此创建LogProcessor类,并且继承StreamsAPI中的P ocessor接口,在Processor接口中,定义了以下3个方法。

(1)init(ProcessorContextprocessorContext):初如治化上下文对象。

(2)process(Key,Value):每接收到一条消息时,都会调用该方法处理并更新状态进行存储。

(3)close():关闭处理器,这里可以做一些资源清理理工作。 Kafka Streams单词计数详细代码如下所示。

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorContext;

import java.util.HashMap;

public class LogProcessor implements Processor<byte[],byte[]> {

  1. private ProcessorContext processorContext;
  2. public void init(ProcessorContext processorContext) {
  3. this.processorContext = processorContext;
  4. }
  5. public void process(byte[] key, byte[] value) {
  6. String inputOri = new String(value);
  7. HashMap <String,Integer>map = new HashMap<String, Integer>();
  8. int times = 1;
  9. if (inputOri.contains("")){
  10. //截取字段
  11. String[] words = inputOri.split(" ");
  12. for (String word : words){
  13. if (map.containsKey(word)){
  14. map.put(word,map.get(word)+1);
  15. }else {
  16. map.put(word,times);
  17. }
  18. }
  19. }
  20. inputOri = map.toString();
  21. processorContext.forward(key,inputOri.getBytes());
  22. }
  23. public void close() {
  24. }

}

单词计数的业务功能开发完之后,Kafka Streams 需要编写一个运行主程序的类App,来测试LogProcessor业务程序,具体代码如下:

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.Topology;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorSupplier;

import java.util.Properties;

public class App {

  1. public static void main(String[] args) {
  2. //声明来源主题
  3. String fromTopic = "test1";
  4. //声明目标主题
  5. String toTopic = "test2";
  6. //设置参数信息
  7. Properties props = new Properties();
  8. props.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");
  9. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
  10. "master:9092,slave1:9092,slave2:9092");
  11. //实例化StreamsConfig对象
  12. StreamsConfig config = new StreamsConfig(props);
  13. //构建拓扑结构
  14. Topology topology = new Topology();
  15. //添加原处理节点,为原处理节点指定名称和他订阅的主题
  16. topology.addSource("SOURCE",fromTopic)
  17. //添加自定义处理节点,指定处理器类和上一节点的名称
  18. .addProcessor("PROCESSOR", new ProcessorSupplier() {
  19. public Processor get() {
  20. return new LogProcessor();
  21. }
  22. },"SOURCE")
  23. //添加目标处理节点,需要指定目标处理节点和上一节点的名称
  24. .addSink("SINK",toTopic,"PROCESSOR");
  25. //实例化KafkaStreams对象
  26. KafkaStreams streams = new KafkaStreams(topology,config);
  27. streams.start();
  28. }

}

3、执行测试

代码编写完后,在master节点创建test1和test2主题,命令如下:

创建来源主题

*kafka-topics.sh --create *

**--topic test1 **

**--partitions 3 **

**--replication-factor 2 **

--zookeeper master:2181,slave1:2181,slave2:2181

创建目标主题

*kafka-topics.sh --create *

**--topic test2 **

**--partitions 3 **

**--replication-factor 2 **

--zookeeper master:2181,slave1:2181,slave2:2181

成功创建好目标主题后,分别在master节点和slave1节点启动生产者服务和消费者服务。

启动生产者服务命令如下:

*kafka-console-procuder.sh *

**--broker-list master:9092,slave1:9092,slave2:9092 **

--topic test1

启动消费者服务命令如下:

*kafka-console-consumer.sh *

**--from-beginning **

**--topic test2 **

--bootstrap-server master:9092,slave1:9092,slave2:9092

在生产者服务节点master输入数据:

到IDEA中运行创建好的App文件中的代码,运行结果如下:

以上两张图可以看到Kafka Streams所需的测试环境已经配置完成了。

标签: kafka 分布式 java

本文转载自: https://blog.csdn.net/m0_59839948/article/details/125219089
版权归原作者 鄙人阿彬 所有, 如有侵权,请联系我们删除。

“Kafka Streams 开发单词计数应用”的评论:

还没有评论