点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
- Flink Sink JDBC
- Flink Sink Kafka
注意事项
DataSetAPI 和 DataStream API一样有三个部分组成,各部分的作用对应一致,此处不再赘述。
FlinkDataSet
在 Apache Flink 中,DataSet API 是 Flink 批处理的核心接口,它主要用于处理静态数据集。虽然 Flink 的 DataStream API 被广泛用于流式数据处理,但 DataSet API 适用于大规模批处理场景,如数据清洗、ETL、分析等。虽然近年来 Flink 更多地向流处理方向发展,但批处理仍然是数据处理中的一个重要场景。
DataSource
对DataSet批处理而言,较为频繁的操作是读取HDFS中的文件数据,因为这里主要介绍两个 DataSource 组件:
- 基于集合:fromCollection 主要是为了方便测试
- 基于文件:readTextFile,基于HDFS中的数据进行计算分析
基本概念
Flink 的 DataSet API 是一个功能强大的批处理 API,专为处理静态、离线数据集设计。DataSet 中的数据是有限的,处理时系统会先等待整个数据集加载完毕。DataSet 可以通过多种方式创建,例如从文件、数据库、集合等加载数据,然后通过一系列转换操作(如 map、filter、join 等)进行处理。
核心特性
- 支持丰富的转换操作。
- 提供多种输入输出数据源。
- 支持复杂的数据类型,包括基本类型、元组、POJO、列表等。
- 支持优化计划,例如通过 cost-based optimizer 来优化查询执行计划。
DataSet 创建
在 Flink 中,可以通过多种方式创建 DataSet。以下是常见的数据源:
从本地文件读取
ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();DataSet<String> text = env.readTextFile("path/to/file");
从 CSV 文件读取
DataSet<Tuple3<Integer,String,Double>> csvData = env.readCsvFile("path/to/file.csv").types(Integer.class,String.class,Double.class);
从集合中创建
List<Tuple2<String,Integer>> data =Arrays.asList(newTuple2<>("Alice",1),newTuple2<>("Bob",2));DataSet<Tuple2<String,Integer>> dataSet = env.fromCollection(data);
从数据库中读取
可以通过自定义的输入格式(如 JDBC 输入格式)从数据库中读取数据,虽然 Flink 本身并没有内置 JDBC 源的批处理 API,但可以通过自定义实现。
DataSet 的转换操作(Transformation)
Flink 的 DataSet API 提供了丰富的转换操作,可以对数据进行各种变换,以下是常用的转换操作:
Map
将 DataSet 中的每一条记录进行映射操作,生成新的 DataSet。
DataSet<Integer> numbers = env.fromElements(1,2,3,4,5);DataSet<Integer> squaredNumbers = numbers.map(n -> n * n);
Filter
过滤掉不满足条件的记录。
DataSet<Integer> evenNumbers = numbers.filter(n -> n %2==0);
FlatMap
类似于 map,但允许一条记录生成多条输出记录。
DataSet<String> lines = env.fromElements("hello world","flink is great");DataSet<String> words = lines.flatMap((line, collector)->{for(String word : line.split(" ")){
collector.collect(word);}});
Reduce
将数据集根据某种聚合逻辑进行合并
DataSet<Integer> sum = numbers.reduce((n1, n2)-> n1 + n2);
GroupBy 和 Reduce
对数据集进行分组,然后在每个组上执行聚合操作
DataSet<Tuple2<String,Integer>> wordCounts = words
.map(word ->newTuple2<>(word,1)).groupBy(0).reduce((t1, t2)->newTuple2<>(t1.f0, t1.f1 + t2.f1));
Join
类似于 SQL 中的连接操作,连接两个 DataSet。
DataSet<Tuple2<Integer,String>> persons = env.fromElements(newTuple2<>(1,"Alice"),newTuple2<>(2,"Bob"));DataSet<Tuple2<Integer,String>> cities = env.fromElements(newTuple2<>(1,"Berlin"),newTuple2<>(2,"Paris"));DataSet<Tuple2<String,String>> personWithCities = persons.join(cities).where(0).equalTo(0).with((p, c)->newTuple2<>(p.f1, c.f1));
DataSet 输出
DataSet API 提供多种方式将数据写出到外部系统:
写入文件
wordCounts.writeAsCsv("output/wordcounts.csv","\n",",");
写入数据库
虽然 DataSet API 没有直接提供 JDBC Sink,可以通过自定义 Sink 实现写入数据库功能。
打印控制台
wordCounts.print();
批处理的优化
DataSet API 提供了优化机制,通过成本模型和执行计划的分析来优化任务执行。在 Flink 内部,编译器会根据任务定义的转换操作生成一个优化的执行计划,这个过程类似于 SQL 查询优化器的工作原理。
- DataSet 的分区:Flink 可以根据数据集的分区进行优化。例如,通过 partitionByHash 或 partitionByRange 来手动控制数据的分布方式。
- DataSet 的缓存:可以通过 rebalance()、hashPartition() 等方法来均衡数据负载,以提高并行度和计算效率。
DataSet API 的容错机制
Flink 的 DataSet API 提供了容错机制,支持在发生故障时重新执行失败的任务。虽然 DataSet API 没有像 DataStream 那样依赖于 Checkpoint 机制,但其批处理特性允许任务从头开始重新执行,确保数据处理的正确性。
DataSet 与 DataStream 的对比
DataSet API 与 DataStream API 之间有一些重要的区别:
DataSet API 的未来
需要注意的是,Flink 的官方路线图中已经不再优先开发 DataSet API 的新特性,未来的主要开发将集中在 DataStream API,甚至批处理功能都将通过 DataStream API 来实现。
因此,如果可能,建议新项目尽量使用 DataStream API 来替代 DataSet API。
特别是 Flink 的 Table API 和 SQL API 也适用于批处理和流处理,这些高层 API 提供了更简洁的语法和更强的优化能力。
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。