0


flink之addSource & fromSource 、addSink & SinkTo

一、addSource & fromSource 、addSink & SinkTo

  1. 这两组算子区别在于:addSourceaddSink需要自己实现SourceFunction或者是SinkFunction,其中读取数据的逻辑,容错等都需要自己实现;fromSourceSinkTo,是flink提供的简易的读取和输出的算子,建议优先使用fromSourceSinkTo,并结合flink官方文档;

二、filesystem source算子

1.readTextFile(filePath: String, charsetName: String):底层调用的是
readFile(format**,filePath,FileProcessingMode.PROCESS_ONCE,-1,**typeInfo)

2.readFile(**FileInputFormat<****OUT****> inputFormat,String filePath,FileProcessingMode watchType,longinterval****,FilePathFilter filter)**

①FileInputFormat<OUT> inputFormat:定义读取文件的类,具体可以看有哪些实现类,根据需要读取文件的类型定,也可以自定义;

②String filePath:文件路径

③FileProcessingMode watchType:定义读取文件的模式,读一次:FileProcessingMode.
PROCESS_ONCE**、读多次:FileProcessingMode
.PROCESS_CONTINUOUSLY

④long interval:读取文件的的时间间隔,batch模式设置为-1,单位毫秒

⑤FilePathFilter filter:过滤文件,标记为
@Deprecated

以上两种source底层源码并行度都为1,而且都是调用的addSource传入的SourceFunction;说个题外话,在1.14以前flink Kafka都是使用的是addSource,实现的是ParalismSourceFunction以及一些容错的类,1.14发布以后采用的fromSource,使用的架构是分片(Splits)分片枚举器(SplitEnumerator)
以及
源阅读器(SourceReader)

3.FileSource

A
·batch模式

val fileSource: FileSource[String] = FileSource

.forBulkFileFormat()

.build()

B.stream模式

依赖:

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-files</artifactId>

<version>1.12.5</version>

</dependency>

代码:

val fileSource: FileSource[String] = FileSource

.forRecordStreamFormat(new TextLineFormat(), new Path(""))

.monitorContinuously(Duration.ofSeconds(1000))

.build()

这种方式可以并行读取文件,并且也做好了容错等,但是具体的逻辑请看flink官方代码,
需要注意的是如果使用的是
TextLineFormat,因为其父类
SimpleStreamFormat的isSplittable方法返回的是false所以即使fromSource是多并行度的是,但是仍然不可以并行读取文件;但是可以但是可以重写StreamFormat,下面是一个参照TextLineFormat简单的实现

  1. import org.apache.flink.api.common.typeinfo.TypeInformation;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.configuration.Configuration;
  4. import org.apache.flink.connector.file.src.reader.StreamFormat;
  5. import org.apache.flink.connector.file.src.reader.TextLineFormat;
  6. import org.apache.flink.core.fs.FSDataInputStream;
  7. import javax.annotation.Nullable;
  8. import java.io.BufferedReader;
  9. import java.io.IOException;
  10. import java.io.InputStreamReader;
  11. public class MyStreamFormat implements StreamFormat<String> {
  12. private static final long serialVersionUID = 1L;
  13. public static final String DEFAULT_CHARSET_NAME = "UTF-8";
  14. private final String charsetName;
  15. public MyStreamFormat() {
  16. this(DEFAULT_CHARSET_NAME);
  17. }
  18. public MyStreamFormat(String charsetName) {
  19. this.charsetName = charsetName;
  20. }
  21. @Override
  22. public Reader createReader(Configuration config, FSDataInputStream stream, long fileLen, long splitEnd) throws IOException {
  23. final BufferedReader reader =
  24. new BufferedReader(new InputStreamReader(stream, charsetName));
  25. return new MyStreamFormat.Reader(reader);
  26. }
  27. @Override
  28. public Reader restoreReader(Configuration config, FSDataInputStream stream, long restoredOffset, long fileLen, long splitEnd) throws IOException {
  29. stream.seek(restoredOffset);
  30. return createReader(config, stream,fileLen,splitEnd);
  31. }
  32. @Override
  33. public boolean isSplittable() {
  34. return true;
  35. }
  36. @Override
  37. public TypeInformation<String> getProducedType() {
  38. return Types.STRING;
  39. }
  40. public static final class Reader implements StreamFormat.Reader<String> {
  41. private final BufferedReader reader;
  42. Reader(final BufferedReader reader) {
  43. this.reader = reader;
  44. }
  45. @Nullable
  46. @Override
  47. public String read() throws IOException {
  48. return reader.readLine();
  49. }
  50. @Override
  51. public void close() throws IOException {
  52. reader.close();
  53. }
  54. }
  55. }

二、filesystem sink****算子

StreamFileSink,1.14版本之后是FileSink,这里以前者为例

两种写入模式:forRowFormat、forBulkFormat

1.forRowFormat、forBulkFormat的构造

①forRowFormat(final Path basePath, final Encoder<IN> encoder)

行模式下,自定义内容限定于文件内部,想对文件进行压缩等操作,则很难办到;

//这个类只有一个方法

public interface Encoder<IN> extends Serializable {

  1. void encode(IN element, OutputStream stream) throws IOException;

}

②forBulkFormat( final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory)

列模式下,不仅可以对文件内部操作,也可以轻松做到对文件压缩等操作;

public class Mybucket<T> implements BulkWriter<T>{

  1. FSDataOutputStream fsDataOutputStream=null;
  2. GzipCompressorOutputStream gzout=null;
  3. @Override
  4. //每条数据通过流写入文件
  5. public void addElement(T element) throws IOException {
  6. gzout.write(element.toString().getBytes());
  7. }
  8. //刷新流,如果考虑效率问题这里可以不刷,到flinish()方法刷也行
  9. @Override
  10. public void flush() throws IOException {
  11. gzout.flush;
  12. }
  13. //关闭流
  14. //**注意:此方法不能关闭Factory传入的流,这是框架完成的事!!!**
  15. @Override
  16. public void finish() throws IOException {
  17. gzout.close;
  18. }
  19. //创建一个writer
  20. //**这里将类单独写也行,无非就是目前的外部类多一个构造方法**
  21. class MyFactory implements Factory<T>{
  22. @Override
  23. public Mybucket<T> create(FSDataOutputStream out) throws IOException {
  24. fsDataOutputStream=out;
  25. GzipCompressorOutputStream gzipOutput = new GzipCompressorOutputStream(output);
  26. return Mybucket.this;
  27. }}}

2.withBucketAssigner():

①解释:指定分桶策略,所谓桶,即数据应该去往哪个文件夹;
行模式和列模式是通用的;

②参数:BucketAssigner,其实现类有三个,:

BasePathBucketAssigner:

//直接再给定的路径下生成文件,不会生成文件夹

public String getBucketId(T element, BucketAssigner.Context context) {return "";}

DateTimeBucketAssigner:

//日期格式( 即 桶大小)和时区都可以手动配置。(见其构造方法)

//生成文件夹:yyyy-MM-dd--HH,所以是按小时滚动桶的

public String getBucketId(IN element, BucketAssigner.Context context) {

  1. if (dateTimeFormatter == null) {
  2. dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
  3. }
  4. return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
  5. }

自定义BucketAssigner:这个需要根据自己的业务逻辑去看继承哪个,如果只是按数据分桶那可以直接继承BasePathBucketAssigner重写getBucketId,如果是需要自定义时间可以继承DateTimeBucketAssigner重写getBucketId,如果业务逻辑进一步复杂那就重写BucketAssigner,下面简单简绍BucketAssigner的两个方法:

//决定了每条数据应该去往哪个文件夹,没有这个文件夹会自动创建,最终数据写入路径是该方法返回值+给定的路径

//context可以获取一些时间

BucketID getBucketId(IN element, BucketAssigner.Context context);

//序列化/反序列化getBucketId返回值

SimpleVersionedSerializer<BucketID> getSerializer();

**3.**withRollingPolicy:

①解释:
RollingPolicy
定义了何时关闭给定的 In-progress Part 文件,并将其转换为 Pending 状态,然后在转换为 Finished 状态。 Finished 状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。 在
STREAMING
模式下,滚动策略结合 Checkpoint 间隔(到下一个 Checkpoint 成功时,文件的 Pending 状态才转换为 Finished 状态)共同控制 Part 文件对下游 readers 是否可见以及这些文件的大小和数量。在
BATCH
模式下,Part 文件在 Job 最后对下游才变得可见,滚动策略只控制最大的 Part 文件大小。其中列模式(forBulkFormat)只能使用CheckpointRollingPolicy

②参数

RollingPolicy(所有RollingPolicy的父类)

public interface RollingPolicy<IN, BucketID> extends Serializable {

  1. //Determines if the in-progress part file for a bucket should roll on every checkpoint.(if true in-progress move to pending)
  2. //文件从pending状态到finished状态与此毫不相干
  3. boolean shouldRollOnCheckpoint(final PartFileInfo<BucketID> partFileState) throws IOException;
  4. //Determines if the in-progress part file for a bucket should roll based on its current state, e.g. its size.(if true in-progress move to pending)
  5. boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState, IN element)
  6. throws IOException;
  7. // Determines if the in-progress part file for a bucket should roll based on a time condition.(if true in-progress move to pending)
  8. boolean shouldRollOnProcessingTime(
  9. final PartFileInfo<BucketID> partFileState, final long currentTime) throws IOException;

}

CheckpointRollingPolicy

列模式虽然只能使用抽象类CheckpointRollingPolicy(它是RollingPolicy的一个实现,重写了shouldRollOnCheckpoint返回为true),CheckpointRollingPolicy只有一个子类OnCheckpointRollingPolicy(此类中shouldRollOnEvent、shouldRollOnProcessingTime返回为false);如果在列模式下,不想按照checkpoint进行滚动文件,那么可以试试继承CheckpointRollingPolicy后重写了shouldRollOnCheckpoint返回为false;

DefaultRollingPolicy,采用builder架构

DefaultRollingPolicy.builder()

  1. //设置一个文件最大开启时间,超过时间则滚动
  2. .withRolloverInterval(Duration.ofMinutes(15))
  3. //设置一个文件无数据写入的时间,超过时间则滚动
  4. .withInactivityInterval(Duration.ofMinutes(5))
  5. //设置一个文件最大的容量,超过容量则滚动
  6. .withMaxPartSize(MemorySize.ofMebiBytes(1024))
  7. .build()

**4.**withOutputFileConfig:

①解释:为生成的文件添加前缀后缀;
行模式和列模式是通用的;

.withOutputFileConfig(OutputFileConfig

  1. .builder()
  2. .withPartPrefix("gouba-")
  3. .withPartSuffix(".gz")
  4. .build())

**5.**enableCompact:

①解释:合并生成的finished文件;
行模式和列模式是通用的;

②参数:FileCompactStrategy,FileCompactor

FileCompactStrategy

FileCompactStrategy.Builder.newBuilder()

  1. //相隔几个checkpoint做一次合并,默认1
  2. .enableCompactionOnCheckpoint(3)
  3. //合并文件使用多少个线程,默认1
  4. .setNumCompactThreads(4)
  5. .build()

FileCompactor

※ IdenticalFileCompactor:直接复制一个文件的内容,到另一个文件,一次只能复制一个文件。

※ ConcatFileCompactor:可以自定义两个文件直接的分割符,由构造方法传入。

※ RecordWiseFileCompactor:自定义内容比较多

**6.**withBucketCheckInterval(mills)

解释:根据RollingPolicy检查是否应该关闭in-progress文件,以系统时间0秒时刻开始算的。

标签: flink

本文转载自: https://blog.csdn.net/m0_64640191/article/details/129859858
版权归原作者 客舟听雨2 所有, 如有侵权,请联系我们删除。

“flink之addSource & fromSource 、addSink & SinkTo”的评论:

还没有评论