Maven依赖:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
其中,
flink.version
和
scala.binary.version
都需要替换为实际使用的版本号。
模拟数据生成:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class DataGenerator {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 模拟数据生成
DataStream<String> input = env.generateSequence(0, 999)
.map(Object::toString)
.map(s -> "key-" + s + "," + "value-" + s);
// Kafka 生产者配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// 将数据写入 Kafka
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
"my-topic",
new SimpleStringSchema(),
properties
);
input.addSink(producer);
env.execute("DataGenerator");
}
}
这个程序使用 Flink 的
generateSequence()
方法生成 1000 个从 0 到 999 的数字作为模拟数据,将它们转化为字符串并拼接成键值对,然后使用 Flink 的 Kafka 生产者将数据写入到 Kafka 的
my-topic
主题中。
完整的 Flink 代码示例:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimestampAssigner;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class FlinkKafkaExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置检查点,以实现容错
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 从命令行参数中读取 Kafka 相关配置
ParameterTool parameterTool = ParameterTool.fromArgs(args);
Properties properties = parameterTool.getProperties();
// 从 Kafka 中读取数据
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"my-topic",
new SimpleStringSchema(),
properties
);
DataStream<String> input = env.addSource(consumer);
// 将数据解析成键值对
DataStream<KeyValue> keyValueStream = input.flatMap((FlatMapFunction<String, KeyValue>) (s, collector) -> {
String[] parts = s.split(",");
collector.collect(new KeyValue(parts[0], parts[1]));
});
// 按键进行分组,统计每个键的计数和窗口中的记录数
DataStream<String> result = keyValueStream
.keyBy(KeyValue::getKey)
.timeWindow(Time.seconds(5))
.process(new CountProcessWindowFunction());
// 打印结果
result.print();
env.execute("FlinkKafkaExample");
}
private static class KeyValue {
private final String key;
private final String value;
public KeyValue(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public String getValue() {
return value;
}
}
private static class CountProcessWindowFunction extends ProcessWindowFunction<KeyValue, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<KeyValue> elements, Collector<String> out) {
int count = 0;
for (KeyValue element : elements) {
count++;
}
out.collect("Key: " + key + ", Count: " + count + ", Window Size: " + context.window().getEnd() + "-" + context.window().getStart());
}
}
}
这个程序使用 Flink 的
enableCheckpointing()
方法开启了检查点,并设置了检查点间隔和模式。它使用了 Flink 的 Kafka 消费者从 Kafka 主题
my-topic
中读取数据,然后将每个键值对解析成
KeyValue
对象,其中包含键和
版权归原作者 抓饭不吃皮牙子 所有, 如有侵权,请联系我们删除。