在Flink中提供了
StreamingFileSink
用以将数据流输出到文件系统.
这里结合代码介绍如何使用
FileSink
.
首先
FileSink
有两种模式
forRowFormat
和
forBulkFormat
publicstatic<IN>DefaultRowFormatBuilder<IN>forRowFormat(finalPath basePath,finalEncoder<IN> encoder){returnnewDefaultRowFormatBuilder<>(basePath, encoder,newDateTimeBucketAssigner<>());}publicstatic<IN>DefaultBulkFormatBuilder<IN>forBulkFormat(finalPath basePath,finalBulkWriter.Factory<IN> bulkWriterFactory){returnnewDefaultBulkFormatBuilder<>(
basePath, bulkWriterFactory,newDateTimeBucketAssigner<>());}
二者的区别是
forRowFormat
是一行一行的处理数据,而
forBulkFormat
则是可以一次处理多条数据,而多条处理的好处就是可以帮助生成列式存储的文件如
ParquetFile
和
ORCFile
,而
forRowFormat
则做不到这点,关于列式存储和行式存储的区别可通过数据存储格式这篇文章简单做一个了解.
下面以
forRowFormat
作为示例演示一下代码
importorg.apache.flink.api.common.serialization.SimpleStringEncoder;importorg.apache.flink.configuration.MemorySize;importorg.apache.flink.connector.file.sink.FileSink;importorg.apache.flink.core.fs.Path;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;importorg.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;importorg.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;importjava.time.Duration;/**
* @Author: J
* @Version: 1.0
* @CreateTime: 2023/6/27
* @Description: 测试
**/publicclassFlinkFileSink{publicstaticvoidmain(String[] args)throwsException{// 构建流环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1
env.setParallelism(1);// 这里是生成数据流,CustomizeSource这个类是自定义数据源(为了方便测试)DataStreamSource<CustomizeBean> dataStreamSource = env.addSource(newCustomizeSource());// 现将数据转换成字符串形式SingleOutputStreamOperator<String> map = dataStreamSource.map(bean -> bean.toString());// 构造FileSink对象,这里使用的RowFormat,即行处理类型的FileSink<String> fileSink =FileSink// 配置文件输出路径及编码格式.forRowFormat(newPath("/Users/xxx/data/testData/"),newSimpleStringEncoder<String>("UTF-8"))// 设置文件滚动策略(文件切换).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(180))// 设置间隔时长180秒进行文件切换.withInactivityInterval(Duration.ofSeconds(20))// 文件20秒没有数据写入进行文件切换.withMaxPartSize(MemorySize.ofMebiBytes(1))// 设置文件大小1MB进行文件切换.build())// 分桶策略(划分子文件夹).withBucketAssigner(newDateTimeBucketAssigner<String>())// 按照yyyy-mm-dd--h进行分桶//设置分桶检查时间间隔为100毫秒.withBucketCheckInterval(100)// 输出文件文件名相关配置.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("test_")// 文件前缀.withPartSuffix(".txt")// 文件后缀.build()).build();// 输出到文件
map.print();
map.sinkTo(fileSink);
env.execute();}}
代码内容这里就不详细说明了,注释已经写得很清楚了.有一点要注意使用
FileSink
的时候我们要加上对应的
pom
依赖.我这里使用Flink版本是
1.15.3
<!-- File connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency>
这里我们先看一下生成的结果文件
-rw-r--r-- 1 xxx staff 1.0M 6 27 14:43 .test_-eb905337-488d-46f1-8177-86fbb46f778f-0.txt.inprogress.91e49c89-cc79-44f5-940d-ded2770b61a1
-rw-r--r-- 1 xxx staff 1.0M 6 27 14:44 .test_-eb905337-488d-46f1-8177-86fbb46f778f-1.txt.inprogress.c548bd30-8583-48d5-91d2-2e11a7dca2cd
-rw-r--r-- 1 xxx staff 1.0M 6 27 14:45 .test_-eb905337-488d-46f1-8177-86fbb46f778f-2.txt.inprogress.a041dba1-8f37-4307-82da-682c48b0796b
-rw-r--r-- 1 xxx staff 280K 6 27 14:45 .test_-eb905337-488d-46f1-8177-86fbb46f778f-3.txt.inprogress.e05d1759-0a38-4a25-bcd0-1216ce6dda59
这里有必要说明一下由于我使用的是Mac在生成文件的时候会出现一个小问题,上面的那种文件会隐藏起来,直接点开文件夹是看不到的可以通过
command + shift + .
来显示隐藏文件,或者像我这种直接通过终端
ll -a
来查看,windows没有发现这个问题.
可以看到除了最后一个文件,其他的文件大小基本都是
1MB
,最后一个是因为写入的数据大小还没有满足
1MB
,并且写入时间也没有满足滚动条件,所以还在持续写入中.
而且通过文件名我们可以看到所有文件中都带有
inprogress
这个状态,这是因为我们没有开启
checkpoint
,这里先说一下
FileSink
写入文件时的三个文件状态,官网原图如下:
这三种状态分别是
inprogress
、
pending
和
finished
,对应的就是处理中、挂起和完成,官网中同时也说明了
FileSink
必须和
checkpoint
配合使用,不然文件的状态只会出现
inprogress
和
pending
,原文内容如下:
下面我们在看一下加入
checkpoint
的代码和结果文件
代码如下
importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.serialization.SimpleStringEncoder;importorg.apache.flink.configuration.MemorySize;importorg.apache.flink.connector.file.sink.FileSink;importorg.apache.flink.core.fs.Path;importorg.apache.flink.streaming.api.CheckpointingMode;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.CheckpointConfig;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;importorg.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;importorg.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;importjava.time.Duration;/**
* @Author: J
* @Version: 1.0
* @CreateTime: 2023/6/27
* @Description: 测试
**/publicclassFlinkFileSink{publicstaticvoidmain(String[] args)throwsException{// 构建流环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1
env.setParallelism(1);// 这里是生成数据流,CustomizeSource这个类是自定义数据源(为了方便测试)DataStreamSource<CustomizeBean> dataStreamSource = env.addSource(newCustomizeSource());// 现将数据转换成字符串形式SingleOutputStreamOperator<String> map = dataStreamSource.map(bean -> bean.toString());// 每20秒作为checkpoint的一个周期
env.enableCheckpointing(20000);// 两次checkpoint间隔最少是10秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);// 程序取消或者停止时不删除checkpoint
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// checkpoint必须在60秒结束,否则将丢弃
env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只能有一个checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置EXACTLY_ONCE语义,默认就是这个
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// checkpoint存储位置
env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 设置执行模型为Streaming方式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 构造FileSink对象,这里使用的RowFormat,即行处理类型的FileSink<String> fileSink =FileSink// 配置文件输出路径及编码格式.forRowFormat(newPath("/Users/xxx/data/testData/"),newSimpleStringEncoder<String>("UTF-8"))// 设置文件滚动策略(文件切换).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(180))// 设置间隔时长180秒进行文件切换.withInactivityInterval(Duration.ofSeconds(20))// 文件20秒没有数据写入进行文件切换.withMaxPartSize(MemorySize.ofMebiBytes(1))// 设置文件大小1MB进行文件切换.build())// 分桶策略(划分子文件夹).withBucketAssigner(newDateTimeBucketAssigner<String>())// 按照yyyy-mm-dd--h进行分桶//设置分桶检查时间间隔为100毫秒.withBucketCheckInterval(100)// 输出文件文件名相关配置.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("test_")// 文件前缀.withPartSuffix(".txt")// 文件后缀.build()).build();// 输出到文件
map.print();
map.sinkTo(fileSink);
env.execute();}}
看一下结果文件:
-rw-r--r-- 1 xxx staff 761K 6 27 15:13 .test_-96ccd42e-716d-4ee0-835e-342618914e7d-2.txt.inprogress.aa5fccaa-f99f-4059-93e7-6d3c548a66b3
-rw-r--r-- 1 xxx staff 1.0M 6 27 15:11 test_-96ccd42e-716d-4ee0-835e-342618914e7d-0.txt
-rw-r--r-- 1 xxx staff 1.0M 6 27 15:12 test_-96ccd42e-716d-4ee0-835e-342618914e7d-1.txt
可以看到已经完成的文件状态中已经没有
inprogress
和其他的后缀了,而正在写入的文件则是处于
inprogress
状态.
版权归原作者 飞天小老头 所有, 如有侵权,请联系我们删除。