0


使用java写一个对接flink的例子

Maven依赖:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>${flink.version}</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  10. <version>${flink.version}</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  15. <version>${flink.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  20. <version>${flink.version}</version>
  21. </dependency>
  22. </dependencies>

其中,

  1. flink.version

  1. scala.binary.version

都需要替换为实际使用的版本号。

模拟数据生成:

  1. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  5. import java.util.Properties;
  6. public class DataGenerator {
  7. public static void main(String[] args) throws Exception {
  8. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. // 模拟数据生成
  10. DataStream<String> input = env.generateSequence(0, 999)
  11. .map(Object::toString)
  12. .map(s -> "key-" + s + "," + "value-" + s);
  13. // Kafka 生产者配置
  14. Properties properties = new Properties();
  15. properties.setProperty("bootstrap.servers", "localhost:9092");
  16. // 将数据写入 Kafka
  17. FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
  18. "my-topic",
  19. new SimpleStringSchema(),
  20. properties
  21. );
  22. input.addSink(producer);
  23. env.execute("DataGenerator");
  24. }
  25. }

这个程序使用 Flink 的

  1. generateSequence()

方法生成 1000 个从 0 到 999 的数字作为模拟数据,将它们转化为字符串并拼接成键值对,然后使用 Flink 的 Kafka 生产者将数据写入到 Kafka 的

  1. my-topic

主题中。

完整的 Flink 代码示例:

  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.api.java.utils.ParameterTool;
  5. import org.apache.flink.streaming.api.CheckpointingMode;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.TimestampAssigner;
  9. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  10. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  11. import org.apache.flink.streaming.api.windowing.time.Time;
  12. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  13. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  14. import org.apache.flink.util.Collector;
  15. import java.util.Properties;
  16. public class FlinkKafkaExample {
  17. public static void main(String[] args) throws Exception {
  18. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19. // 设置检查点,以实现容错
  20. env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
  21. env.getCheckpointConfig().setCheckpointTimeout(60000);
  22. // 从命令行参数中读取 Kafka 相关配置
  23. ParameterTool parameterTool = ParameterTool.fromArgs(args);
  24. Properties properties = parameterTool.getProperties();
  25. // 从 Kafka 中读取数据
  26. FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
  27. "my-topic",
  28. new SimpleStringSchema(),
  29. properties
  30. );
  31. DataStream<String> input = env.addSource(consumer);
  32. // 将数据解析成键值对
  33. DataStream<KeyValue> keyValueStream = input.flatMap((FlatMapFunction<String, KeyValue>) (s, collector) -> {
  34. String[] parts = s.split(",");
  35. collector.collect(new KeyValue(parts[0], parts[1]));
  36. });
  37. // 按键进行分组,统计每个键的计数和窗口中的记录数
  38. DataStream<String> result = keyValueStream
  39. .keyBy(KeyValue::getKey)
  40. .timeWindow(Time.seconds(5))
  41. .process(new CountProcessWindowFunction());
  42. // 打印结果
  43. result.print();
  44. env.execute("FlinkKafkaExample");
  45. }
  46. private static class KeyValue {
  47. private final String key;
  48. private final String value;
  49. public KeyValue(String key, String value) {
  50. this.key = key;
  51. this.value = value;
  52. }
  53. public String getKey() {
  54. return key;
  55. }
  56. public String getValue() {
  57. return value;
  58. }
  59. }
  60. private static class CountProcessWindowFunction extends ProcessWindowFunction<KeyValue, String, String, TimeWindow> {
  61. @Override
  62. public void process(String key, Context context, Iterable<KeyValue> elements, Collector<String> out) {
  63. int count = 0;
  64. for (KeyValue element : elements) {
  65. count++;
  66. }
  67. out.collect("Key: " + key + ", Count: " + count + ", Window Size: " + context.window().getEnd() + "-" + context.window().getStart());
  68. }
  69. }
  70. }

这个程序使用 Flink 的

  1. enableCheckpointing()

方法开启了检查点,并设置了检查点间隔和模式。它使用了 Flink 的 Kafka 消费者从 Kafka 主题

  1. my-topic

中读取数据,然后将每个键值对解析成

  1. KeyValue

对象,其中包含键和

标签: java flink 大数据

本文转载自: https://blog.csdn.net/q7w8e9r4/article/details/129236500
版权归原作者 抓饭不吃皮牙子 所有, 如有侵权,请联系我们删除。

“使用java写一个对接flink的例子”的评论:

还没有评论