0


flink重温笔记(六):Flink 流批一体 API 开发—— 数据输出 sink

Flink学习笔记

前言:今天是学习 flink 的第七天啦!学习了 flink 中 sink(数据槽) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,我觉得 flink 知识点虽然比较难理解,但是代码跑通后,逻辑还是比较有趣的!

Tips:毛爷爷说过:“宜将剩勇追穷寇,不可沽名学霸王!”明天周日除了复习前面知识点之外,也要继续努力学习接下来的知识点,继续加油!

文章目录

二、Flink 流批一体 API 开发

4. 数据输出 Sink

4.1 print 打印

打印是最简单的一个Sink,通常是用来做实验和测试时使用。

实例:socket 数据源,查看进程编号最终输出 sink 之 print 打印

packagecn.itcast.day06.sink;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
 * @author lql
 * @time 2024-02-17 22:27:48
 * @description TODO:print
 */publicclassPrintSinkDemo{publicstaticvoidmain(String[] args)throwsException{//local模式默认的并行度是当前机器的逻辑核的数量Configuration configuration =newConfiguration();StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);int parallelism0 = env.getParallelism();System.out.println("执行环境默认的并行度:"+ parallelism0);// socket 数据源DataStreamSource<String> lines = env.socketTextStream("node1",9999);// 获取 lines 数据源并行度int parallelism = lines.getParallelism();System.out.println("SocketSource的并行度:"+ parallelism);

        lines.print();
        lines.addSink(newMyPrintSink()).name("my-print-sink");
        env.execute();}privatestaticclassMyPrintSinkextendsRichSinkFunction<String>{// 这一处定义很重要,不然 indexOfThisSubtask 只能在一个方法中使用!privateint indexOfThisSubtask;@Overridepublicvoidopen(Configuration parameters)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();}@Overridepublicvoidinvoke(String value,Context context)throwsException{System.out.println(indexOfThisSubtask +1+"> "+ value);}}}

结果:

执行环境默认的并行度:8
SocketSource的并行度:1
6> hadoop
1> hadoop
1> hadoop
7> hadoop

总结:

  • 打印输出,也是一种 sink
4.2 writeAsText 以文本格式输出

该方法是将数据以文本格式实时的写入到指定的目录中,本质上使用的是 TextOutputFormat 格式写入的。

实例:socket 数据源,将数据输出到文本 Text 中

packagecn.itcast.day06.sink;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.core.fs.FileSystem;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
 * @author lql
 * @time 2024-02-17 22:40:48
 * @description TODO:writeAsText
 */publicclassWriteSinkDemo{publicstaticvoidmain(String[] args)throwsException{//local模式默认的并行度是当前机器的逻辑核的数量Configuration configuration =newConfiguration();StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);int parallelism0 = env.getParallelism();System.out.println("执行环境默认的并行度:"+ parallelism0);DataStreamSource<String> lines = env.socketTextStream("node1",9999);//获取DataStream的并行度int parallelism = lines.getParallelism();System.out.println("SocketSource的并行度:"+ parallelism);

        lines.writeAsText("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output",FileSystem.WriteMode.OVERWRITE);
        env.execute();}}

结果:

output 文件夹下出现以数字命名的文件
内容为 socket 数据源输出,加上了 \n 换行符
目录中的文件名称是该 Sink 所在 subtask 的 Index + 1

总结:

  • 1- writeAsText 输出数据以小文件方式,文件命名为 subtask 的 Index + 1
  • 2- FileSystem.WriteMode 有两种,一种是 OVERWRITE,可以覆盖同名文件,一种是 NO_OVERWRITE,同名文件就报错
4.3 writeAsCsv 以 csv 格式输出

该方法是将数据以 csv 格式写入到指定的目录中,本质上使用的是 CsvOutputFormat 格式写入的。

实例:socket 数据源,将数据输出到文本 csv 中

packagecn.itcast.day06.sink;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.operators.DataSource;importorg.apache.flink.api.java.tuple.Tuple7;importorg.apache.flink.core.fs.FileSystem;/**
 * @author lql
 * @time 2024-02-17 22:52:12
 * @description TODO:将DataSet数据写入到csv文件中
 */publicclassCsvSink{publicstaticvoidmain(String[] args)throwsException{ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();//需先建立文件String filePath ="D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\user.csv";//添加数据Tuple7<Integer,String,Integer,Integer,String,String,Long> row =newTuple7<>(15,"zhangsan",40,1,"CN","2020-09-08 00:00:00",1599494400000L);//转换为dataSet,利用 数据源中 fromElements 可以接受 [列表或元组] 的属性DataSource<Tuple7<Integer,String,Integer,Integer,String,String,Long>> dataSet =(DataSource<Tuple7<Integer,String,Integer,Integer,String,String,Long>>) env.fromElements(row);//将内容写入到File中,如果文件已存在,将会被复盖
        dataSet.writeAsCsv(filePath,FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        env.execute();}}

结果:

在指定文件中,生成了 csv 数据

总结:

  • 1- 首先需要定义数据存放的位置,精确到 .scv
  • 2- 最终需要将并行度设置为 1,才能生成一个完整的文件
4.4 writeUsingOutputFormat 指定格式输出

该方法是将数据已指定的格式写入到指定目录中

packagecn.itcast.day06.sink;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.io.TextOutputFormat;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.core.fs.Path;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
 * @author lql
 * @time 2024-02-17 23:03:24
 * @description TODO:将数据已指定的格式写入到指定目录中
 */publicclass writeUsingOutputFormatSink {publicstaticvoidmain(String[] args)throwsException{//1:获取流处理运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//调用env的fromElements创建一个非并行的DataStreamSourceDataStreamSource<String> words = env.fromElements("hadoop","spark","flink","hbase","flink","spark");// 对拆分后的单词,每个单词记一次数SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = words.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String word)throwsException{returnTuple2.of(word,1);}});SingleOutputStreamOperator<Tuple2<String,Integer>> result = wordAndOne.keyBy(0).sum(1);

        result.writeUsingOutputFormat(newTextOutputFormat<>(newPath("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\wordcount")));
        env.execute();}}

结果:

在指定目录下,生成 n(电脑并行度数量) 个文本文件

总结:

  • 1- writeAsText 和 writeAsCsv 方法底层都是调用了 writeUsingOutputFormat 方法
  • 2- 这种方法更加灵活
4.5 writeToSocket 输出到网络端口

该方法是将数据输出到指定的Socket网络地址端口。

实例:socket 数据源,node1:9999 写数据到 node1:8888

packagecn.itcast.day06.sink;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
 * @author lql
 * @time 2024-02-17 23:12:03
 * @description TODO:writeToSocket
 */publicclassWriteToSocketDemo{publicstaticvoidmain(String[] args)throwsException{//local模式默认的并行度是当前机器的逻辑核的数量Configuration configuration =newConfiguration();StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);int parallelism0 = env.getParallelism();System.out.println("执行环境默认的并行度:"+ parallelism0);DataStreamSource<String> lines = env.socketTextStream("node1",9999);//获取DataStream的并行度int parallelism = lines.getParallelism();System.out.println("SocketSource的并行度:"+ parallelism);// 第三个参数是数据输出的序列化格式 SerializationSchema
        lines.writeToSocket("node1",8888,newSimpleStringSchema());
        env.execute();}}

结果:

node1:8888 实时接收到 node1:9999 写入的数据

总结:

  • 端口号需要提前开启
4.6 基于本地集合的 Sink

数据分类集合输出

实例:数据打印输出,error 输出,可以输出到:Stdout,Stderr,采集为本地集合

packagecn.itcast.day06.sink;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
 * @author lql
 * @time 2024-02-17 23:18:12
 * @description TODO:数据可以输出到:Stdout,Stderr,采集为本地集合
 */publicclassCollectionDemo{publicstaticvoidmain(String[] args)throwsException{//1.envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Tuple2<Integer,String>> dataSource = env.fromElements(Tuple2.of(1,"zhangsan"),Tuple2.of(2,"lisi"),Tuple2.of(3,"wangwu"),Tuple2.of(4,"zhaoliu"));//2.sink
        dataSource.print();
        dataSource.printToErr();

        env.execute();}}

结果:

黑色字体输出:
6> (3,wangwu)
7> (4,zhaoliu)
4> (1,zhangsan)
5> (2,lisi)

红色字体输出:
8> (3,wangwu)
7> (2,lisi)
1> (4,zhaoliu)
6> (1,zhangsan)

总结:

  • 1- printToErr 可以进行分类输出
  • 2- 并行度是1 能输出文件
  • 3- 并行度是n 能输出文件夹
标签: flink 笔记 大数据

本文转载自: https://blog.csdn.net/m0_60732994/article/details/136143651
版权归原作者 那就学有所成吧(˵¯͒¯͒˵) 所有, 如有侵权,请联系我们删除。

“flink重温笔记(六):Flink 流批一体 API 开发—— 数据输出 sink”的评论:

还没有评论