0


使用java写一个对接flink的例子

Maven依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

其中,

flink.version

scala.binary.version

都需要替换为实际使用的版本号。

模拟数据生成:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class DataGenerator {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 模拟数据生成
        DataStream<String> input = env.generateSequence(0, 999)
                .map(Object::toString)
                .map(s -> "key-" + s + "," + "value-" + s);

        // Kafka 生产者配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");

        // 将数据写入 Kafka
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
                "my-topic",
                new SimpleStringSchema(),
                properties
        );
        input.addSink(producer);

        env.execute("DataGenerator");
    }
}

这个程序使用 Flink 的

generateSequence()

方法生成 1000 个从 0 到 999 的数字作为模拟数据,将它们转化为字符串并拼接成键值对,然后使用 Flink 的 Kafka 生产者将数据写入到 Kafka 的

my-topic

主题中。

完整的 Flink 代码示例:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimestampAssigner;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class FlinkKafkaExample {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置检查点,以实现容错
        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        // 从命令行参数中读取 Kafka 相关配置
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        Properties properties = parameterTool.getProperties();

        // 从 Kafka 中读取数据
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "my-topic",
                new SimpleStringSchema(),
                properties
        );
        DataStream<String> input = env.addSource(consumer);

        // 将数据解析成键值对
        DataStream<KeyValue> keyValueStream = input.flatMap((FlatMapFunction<String, KeyValue>) (s, collector) -> {
            String[] parts = s.split(",");
            collector.collect(new KeyValue(parts[0], parts[1]));
        });

        // 按键进行分组,统计每个键的计数和窗口中的记录数
        DataStream<String> result = keyValueStream
                .keyBy(KeyValue::getKey)
                .timeWindow(Time.seconds(5))
                .process(new CountProcessWindowFunction());

        // 打印结果
        result.print();

        env.execute("FlinkKafkaExample");
    }

    private static class KeyValue {
        private final String key;
        private final String value;

        public KeyValue(String key, String value) {
            this.key = key;
            this.value = value;
        }

        public String getKey() {
            return key;
        }

        public String getValue() {
            return value;
        }
    }

    private static class CountProcessWindowFunction extends ProcessWindowFunction<KeyValue, String, String, TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<KeyValue> elements, Collector<String> out) {
            int count = 0;
            for (KeyValue element : elements) {
                count++;
            }
            out.collect("Key: " + key + ", Count: " + count + ", Window Size: " + context.window().getEnd() + "-" + context.window().getStart());
        }
    }
}

这个程序使用 Flink 的

enableCheckpointing()

方法开启了检查点,并设置了检查点间隔和模式。它使用了 Flink 的 Kafka 消费者从 Kafka 主题

my-topic

中读取数据,然后将每个键值对解析成

KeyValue

对象,其中包含键和

标签: java flink 大数据

本文转载自: https://blog.csdn.net/q7w8e9r4/article/details/129236500
版权归原作者 抓饭不吃皮牙子 所有, 如有侵权,请联系我们删除。

“使用java写一个对接flink的例子”的评论:

还没有评论