0


flink之addSource & fromSource 、addSink & SinkTo

一、addSource & fromSource 、addSink & SinkTo

     这两组算子区别在于:addSource和addSink需要自己实现SourceFunction或者是SinkFunction,其中读取数据的逻辑,容错等都需要自己实现;fromSource和SinkTo,是flink提供的简易的读取和输出的算子,建议优先使用fromSource和SinkTo,并结合flink官方文档; 

二、filesystem source算子

1.readTextFile(filePath: String, charsetName: String):底层调用的是
readFile(format**,filePath,FileProcessingMode.PROCESS_ONCE,-1,**typeInfo)

2.readFile(**FileInputFormat<****OUT****> inputFormat,String filePath,FileProcessingMode watchType,longinterval****,FilePathFilter filter)**

①FileInputFormat<OUT> inputFormat:定义读取文件的类,具体可以看有哪些实现类,根据需要读取文件的类型定,也可以自定义;

②String filePath:文件路径

③FileProcessingMode watchType:定义读取文件的模式,读一次:FileProcessingMode.
PROCESS_ONCE**、读多次:FileProcessingMode
.PROCESS_CONTINUOUSLY

④long interval:读取文件的的时间间隔,batch模式设置为-1,单位毫秒

⑤FilePathFilter filter:过滤文件,标记为
@Deprecated

以上两种source底层源码并行度都为1,而且都是调用的addSource传入的SourceFunction;说个题外话,在1.14以前flink Kafka都是使用的是addSource,实现的是ParalismSourceFunction以及一些容错的类,1.14发布以后采用的fromSource,使用的架构是分片(Splits)分片枚举器(SplitEnumerator)
以及
源阅读器(SourceReader)

3.FileSource

A
·batch模式

val fileSource: FileSource[String] = FileSource

.forBulkFileFormat()

.build()

B.stream模式

依赖:

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-files</artifactId>

<version>1.12.5</version>

</dependency>

代码:

val fileSource: FileSource[String] = FileSource

.forRecordStreamFormat(new TextLineFormat(), new Path(""))

.monitorContinuously(Duration.ofSeconds(1000))

.build()

这种方式可以并行读取文件,并且也做好了容错等,但是具体的逻辑请看flink官方代码,
需要注意的是如果使用的是
TextLineFormat,因为其父类
SimpleStreamFormat的isSplittable方法返回的是false所以即使fromSource是多并行度的是,但是仍然不可以并行读取文件;但是可以但是可以重写StreamFormat,下面是一个参照TextLineFormat简单的实现

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.core.fs.FSDataInputStream;

import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class MyStreamFormat implements StreamFormat<String> {
    private static final long serialVersionUID = 1L;

    public static final String DEFAULT_CHARSET_NAME = "UTF-8";

    private final String charsetName;

    public MyStreamFormat() {
        this(DEFAULT_CHARSET_NAME);
    }

    public MyStreamFormat(String charsetName) {
        this.charsetName = charsetName;
    }

    @Override
    public Reader createReader(Configuration config, FSDataInputStream stream, long fileLen, long splitEnd) throws IOException {
        final BufferedReader reader =
                new BufferedReader(new InputStreamReader(stream, charsetName));
        return new MyStreamFormat.Reader(reader);
    }

    @Override
    public Reader restoreReader(Configuration config, FSDataInputStream stream, long restoredOffset, long fileLen, long splitEnd) throws IOException {
        stream.seek(restoredOffset);
        return createReader(config, stream,fileLen,splitEnd);
    }

    @Override
    public boolean isSplittable() {
        return true;
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return Types.STRING;
    }

    public static final class Reader implements StreamFormat.Reader<String> {

        private final BufferedReader reader;

        Reader(final BufferedReader reader) {
            this.reader = reader;
        }

        @Nullable
        @Override
        public String read() throws IOException {
            return reader.readLine();
        }

        @Override
        public void close() throws IOException {
            reader.close();
        }
    }
}

二、filesystem sink****算子

StreamFileSink,1.14版本之后是FileSink,这里以前者为例

两种写入模式:forRowFormat、forBulkFormat

1.forRowFormat、forBulkFormat的构造

①forRowFormat(final Path basePath, final Encoder<IN> encoder)

行模式下,自定义内容限定于文件内部,想对文件进行压缩等操作,则很难办到;

//这个类只有一个方法

public interface Encoder<IN> extends Serializable {

void encode(IN element, OutputStream stream) throws IOException;

}

②forBulkFormat( final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory)

列模式下,不仅可以对文件内部操作,也可以轻松做到对文件压缩等操作;

public class Mybucket<T> implements BulkWriter<T>{

FSDataOutputStream fsDataOutputStream=null;

GzipCompressorOutputStream gzout=null;

@Override

//每条数据通过流写入文件

public void addElement(T element) throws IOException {

     gzout.write(element.toString().getBytes());

}

//刷新流,如果考虑效率问题这里可以不刷,到flinish()方法刷也行

@Override

public void flush() throws IOException {

     gzout.flush;

}

//关闭流

//**注意:此方法不能关闭Factory传入的流,这是框架完成的事!!!**

@Override

public void finish() throws IOException {

     gzout.close;

}



//创建一个writer

//**这里将类单独写也行,无非就是目前的外部类多一个构造方法**

class MyFactory implements Factory<T>{

        @Override

        public Mybucket<T> create(FSDataOutputStream out) throws IOException {

            fsDataOutputStream=out;

            GzipCompressorOutputStream gzipOutput = new GzipCompressorOutputStream(output);

            return Mybucket.this;

        }}}

2.withBucketAssigner():

①解释:指定分桶策略,所谓桶,即数据应该去往哪个文件夹;
行模式和列模式是通用的;

②参数:BucketAssigner,其实现类有三个,:

BasePathBucketAssigner:

//直接再给定的路径下生成文件,不会生成文件夹

public String getBucketId(T element, BucketAssigner.Context context) {return "";}

DateTimeBucketAssigner:

//日期格式( 即 桶大小)和时区都可以手动配置。(见其构造方法)

//生成文件夹:yyyy-MM-dd--HH,所以是按小时滚动桶的

public String getBucketId(IN element, BucketAssigner.Context context) {

    if (dateTimeFormatter == null) {

        dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);

    }

    return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));

}

自定义BucketAssigner:这个需要根据自己的业务逻辑去看继承哪个,如果只是按数据分桶那可以直接继承BasePathBucketAssigner重写getBucketId,如果是需要自定义时间可以继承DateTimeBucketAssigner重写getBucketId,如果业务逻辑进一步复杂那就重写BucketAssigner,下面简单简绍BucketAssigner的两个方法:

//决定了每条数据应该去往哪个文件夹,没有这个文件夹会自动创建,最终数据写入路径是该方法返回值+给定的路径

//context可以获取一些时间

BucketID getBucketId(IN element, BucketAssigner.Context context);

//序列化/反序列化getBucketId返回值

SimpleVersionedSerializer<BucketID> getSerializer();

**3.**withRollingPolicy:

①解释:
RollingPolicy
定义了何时关闭给定的 In-progress Part 文件,并将其转换为 Pending 状态,然后在转换为 Finished 状态。 Finished 状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。 在
STREAMING
模式下,滚动策略结合 Checkpoint 间隔(到下一个 Checkpoint 成功时,文件的 Pending 状态才转换为 Finished 状态)共同控制 Part 文件对下游 readers 是否可见以及这些文件的大小和数量。在
BATCH
模式下,Part 文件在 Job 最后对下游才变得可见,滚动策略只控制最大的 Part 文件大小。其中列模式(forBulkFormat)只能使用CheckpointRollingPolicy

②参数

RollingPolicy(所有RollingPolicy的父类)

public interface RollingPolicy<IN, BucketID> extends Serializable {

//Determines if the in-progress part file for a bucket should roll on every checkpoint.(if true in-progress move to pending)

//文件从pending状态到finished状态与此毫不相干

boolean shouldRollOnCheckpoint(final PartFileInfo<BucketID> partFileState) throws IOException;

//Determines if the in-progress part file for a bucket should roll based on its current state, e.g. its size.(if true in-progress move to pending)

boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState, IN element)

        throws IOException;

// Determines if the in-progress part file for a bucket should roll based on a time condition.(if true in-progress move to pending)

boolean shouldRollOnProcessingTime(

        final PartFileInfo<BucketID> partFileState, final long currentTime) throws IOException;

}

CheckpointRollingPolicy

列模式虽然只能使用抽象类CheckpointRollingPolicy(它是RollingPolicy的一个实现,重写了shouldRollOnCheckpoint返回为true),CheckpointRollingPolicy只有一个子类OnCheckpointRollingPolicy(此类中shouldRollOnEvent、shouldRollOnProcessingTime返回为false);如果在列模式下,不想按照checkpoint进行滚动文件,那么可以试试继承CheckpointRollingPolicy后重写了shouldRollOnCheckpoint返回为false;

DefaultRollingPolicy,采用builder架构

DefaultRollingPolicy.builder()

      //设置一个文件最大开启时间,超过时间则滚动

      .withRolloverInterval(Duration.ofMinutes(15))

      //设置一个文件无数据写入的时间,超过时间则滚动

      .withInactivityInterval(Duration.ofMinutes(5))

      //设置一个文件最大的容量,超过容量则滚动

      .withMaxPartSize(MemorySize.ofMebiBytes(1024))

      .build()

**4.**withOutputFileConfig:

①解释:为生成的文件添加前缀后缀;
行模式和列模式是通用的;

.withOutputFileConfig(OutputFileConfig

  .builder()

  .withPartPrefix("gouba-")

  .withPartSuffix(".gz")

  .build())

**5.**enableCompact:

①解释:合并生成的finished文件;
行模式和列模式是通用的;

②参数:FileCompactStrategy,FileCompactor

FileCompactStrategy

FileCompactStrategy.Builder.newBuilder()

              //相隔几个checkpoint做一次合并,默认1

      .enableCompactionOnCheckpoint(3)

     //合并文件使用多少个线程,默认1

      .setNumCompactThreads(4)

      .build()

FileCompactor

※ IdenticalFileCompactor:直接复制一个文件的内容,到另一个文件,一次只能复制一个文件。

※ ConcatFileCompactor:可以自定义两个文件直接的分割符,由构造方法传入。

※ RecordWiseFileCompactor:自定义内容比较多

**6.**withBucketCheckInterval(mills)

解释:根据RollingPolicy检查是否应该关闭in-progress文件,以系统时间0秒时刻开始算的。

标签: flink

本文转载自: https://blog.csdn.net/m0_64640191/article/details/129859858
版权归原作者 客舟听雨2 所有, 如有侵权,请联系我们删除。

“flink之addSource & fromSource 、addSink & SinkTo”的评论:

还没有评论