0


[flink 实时流基础] 输出算子(Sink)

学习笔记
Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。
image.png


文章目录

连接到外部系统

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始,同样重构了Sink架构,
stream.sinkTo(…)
当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/connectors/datastream/overview/
image.png

我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。
image.png
除此以外,就需要用户自定义实现sink连接器了。

输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
publicclassSinkFile{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,都有 并行度个数的 文件在写入
        env.setParallelism(2);// 必须开启checkpoint,否则一直都是 .inprogress
        env.enableCheckpointing(2000,CheckpointingMode.EXACTLY_ONCE);DataGeneratorSource<String> dataGeneratorSource =newDataGeneratorSource<>(newGeneratorFunction<Long,String>(){@OverridepublicStringmap(Long value)throwsException{return"Number:"+ value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource,WatermarkStrategy.noWatermarks(),"data-generator");// 输出到文件系统FileSink<String> fieSink =FileSink// 输出行式存储的文件,指定路径、指定编码.<String>forRowFormat(newPath("f:/tmp"),newSimpleStringEncoder<>("UTF-8"))// 输出文件的一些配置: 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("atguigu-").withPartSuffix(".log").build())// 按照目录分桶:如下,就是每个小时一个目录.withBucketAssigner(newDateTimeBucketAssigner<>("yyyy-MM-dd HH",ZoneId.systemDefault()))// 文件滚动策略:  1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(newMemorySize(1024*1024)).build()).build();
        dataGen.sinkTo(fieSink);

        env.execute();}}

输出到 Kafka

(1)添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码

publicclassSinkKafka{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);// 如果是精准一次,必须开启checkpoint(后续章节介绍)
        env.enableCheckpointing(2000,CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102",7777);/**
         * Kafka Sink:
         * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
         * 1、开启checkpoint(后续介绍)
         * 2、设置事务前缀
         * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟
         */KafkaSink<String> kafkaSink =KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")// 指定序列化器:指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(newSimpleStringSchema()).build())// 写到kafka的一致性级别: 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("atguigu-")// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10*60*1000+"").build();

        sensorDS.sinkTo(kafkaSink);

        env.execute();}}

自定义序列化器,实现带key的record:

publicclassSinkKafkaWithKey{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(2000,CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102",7777);/**
         * 如果要指定写入kafka的key,可以自定义序列化器:
         * 1、实现 一个接口,重写 序列化 方法
         * 2、指定key,转成 字节数组
         * 3、指定value,转成 字节数组
         * 4、返回一个 ProducerRecord对象,把key、value放进去
         */KafkaSink<String> kafkaSink =KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,
        hadoop104:9092").setRecordSerializer(newKafkaRecordSerializationSchema<String>(){@Nullable@OverridepublicProducerRecord<byte[],byte[]>serialize(String element,KafkaSinkContext context,Long timestamp){String[] datas = element.split(",");byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);byte[] value = element.getBytes(StandardCharsets.UTF_8);returnnewProducerRecord<>("ws", key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("atguigu-").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10*60*1000+"").build();

        sensorDS.sinkTo(kafkaSink);

        env.execute();}}

输出到 mysql

写入数据的MySQL的测试步骤如下。
(1)添加依赖
添加MySQL驱动:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency>

官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apache snapshot仓库下载,pom文件中指定仓库路径:

<repositories><repository><id>apache-snapshots</id><name>apache snapshots</name><url>https://repository.apache.org/content/repositories/snapshots/</url></repository></repositories>

添加依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.17-SNAPSHOT</version></dependency>

如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加如下标红内容:

<mirror><id>aliyunmaven</id><mirrorOf>*,!apache-snapshots</mirrorOf><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url></mirror>

(2)启动MySQL,在test库下建表ws

mysql>
CREATE TABLE ws (id varchar(100) NOT NULL,
ts bigint(20) DEFAULT NULL,
vc int(11) DEFAULT NULL,
PRIMARY KEY (id))ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)编写输出到MySQL的示例代码

publicclassSinkMySQL{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102",7777).map(newWaterSensorMapFunction());/**
     * TODO 写入mysql
     * 1、只能用老的sink写法: addsink
     * 2、JDBCSink的4个参数:
     *    第一个参数: 执行的sql,一般就是 insert into
     *    第二个参数: 预编译sql, 对占位符填充值
     *    第三个参数: 执行选项 ---》 攒批、重试
     *    第四个参数: 连接选项 ---》 url、用户名、密码
     */SinkFunction<WaterSensor> jdbcSink =JdbcSink.sink("insert into ws values(?,?,?)",newJdbcStatementBuilder<WaterSensor>(){@Overridepublicvoidaccept(PreparedStatement preparedStatement,WaterSensor waterSensor)throwsSQLException{//每收到一条WaterSensor,如何去填充占位符
            preparedStatement.setString(1, waterSensor.getId());
            preparedStatement.setLong(2, waterSensor.getTs());
            preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3)// 重试次数.withBatchSize(100)// 批次的大小:条数.withBatchIntervalMs(3000)// 批次的时间.build(),newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("000000").withConnectionCheckTimeoutSeconds(60)// 重试的超时时间.build());

sensorDS.addSink(jdbcSink);

env.execute();}}

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

自定义 sink

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction());
在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

标签: flink 大数据

本文转载自: https://blog.csdn.net/qq_45704048/article/details/137208559
版权归原作者 程序员三木 所有, 如有侵权,请联系我们删除。

“[flink 实时流基础] 输出算子(Sink)”的评论:

还没有评论