1、添加依赖(IDEA工具操作)
<dependency> <groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
2、编写代码(IDEA工具操作)
根据上述业务流程分析得出,单词数据通过自定义处处理器接收并执行相应业务计算,因此创建LogProcessor类,并且继承StreamsAPI中的P ocessor接口,在Processor接口中,定义了以下3个方法。
(1)init(ProcessorContextprocessorContext):初如治化上下文对象。
(2)process(Key,Value):每接收到一条消息时,都会调用该方法处理并更新状态进行存储。
(3)close():关闭处理器,这里可以做一些资源清理理工作。 Kafka Streams单词计数详细代码如下所示。
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import java.util.HashMap;
public class LogProcessor implements Processor<byte[],byte[]> {
private ProcessorContext processorContext;
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
}
public void process(byte[] key, byte[] value) {
String inputOri = new String(value);
HashMap <String,Integer>map = new HashMap<String, Integer>();
int times = 1;
if (inputOri.contains("")){
//截取字段
String[] words = inputOri.split(" ");
for (String word : words){
if (map.containsKey(word)){
map.put(word,map.get(word)+1);
}else {
map.put(word,times);
}
}
}
inputOri = map.toString();
processorContext.forward(key,inputOri.getBytes());
}
public void close() {
}
}
单词计数的业务功能开发完之后,Kafka Streams 需要编写一个运行主程序的类App,来测试LogProcessor业务程序,具体代码如下:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.util.Properties;
public class App {
public static void main(String[] args) {
//声明来源主题
String fromTopic = "test1";
//声明目标主题
String toTopic = "test2";
//设置参数信息
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"master:9092,slave1:9092,slave2:9092");
//实例化StreamsConfig对象
StreamsConfig config = new StreamsConfig(props);
//构建拓扑结构
Topology topology = new Topology();
//添加原处理节点,为原处理节点指定名称和他订阅的主题
topology.addSource("SOURCE",fromTopic)
//添加自定义处理节点,指定处理器类和上一节点的名称
.addProcessor("PROCESSOR", new ProcessorSupplier() {
public Processor get() {
return new LogProcessor();
}
},"SOURCE")
//添加目标处理节点,需要指定目标处理节点和上一节点的名称
.addSink("SINK",toTopic,"PROCESSOR");
//实例化KafkaStreams对象
KafkaStreams streams = new KafkaStreams(topology,config);
streams.start();
}
}
3、执行测试
代码编写完后,在master节点创建test1和test2主题,命令如下:
创建来源主题
*kafka-topics.sh --create *
**--topic test1 **
**--partitions 3 **
**--replication-factor 2 **
--zookeeper master:2181,slave1:2181,slave2:2181
创建目标主题
*kafka-topics.sh --create *
**--topic test2 **
**--partitions 3 **
**--replication-factor 2 **
--zookeeper master:2181,slave1:2181,slave2:2181
成功创建好目标主题后,分别在master节点和slave1节点启动生产者服务和消费者服务。
启动生产者服务命令如下:
*kafka-console-procuder.sh *
**--broker-list master:9092,slave1:9092,slave2:9092 **
--topic test1
启动消费者服务命令如下:
*kafka-console-consumer.sh *
**--from-beginning **
**--topic test2 **
--bootstrap-server master:9092,slave1:9092,slave2:9092
在生产者服务节点master输入数据:
到IDEA中运行创建好的App文件中的代码,运行结果如下:
以上两张图可以看到Kafka Streams所需的测试环境已经配置完成了。
版权归原作者 鄙人阿彬 所有, 如有侵权,请联系我们删除。