Flink学习笔记
前言:今天是学习 flink 的第七天啦!学习了 flink 中 sink(数据槽) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,我觉得 flink 知识点虽然比较难理解,但是代码跑通后,逻辑还是比较有趣的!
Tips:毛爷爷说过:“宜将剩勇追穷寇,不可沽名学霸王!”明天周日除了复习前面知识点之外,也要继续努力学习接下来的知识点,继续加油!
文章目录
二、Flink 流批一体 API 开发
4. 数据输出 Sink
4.1 print 打印
打印是最简单的一个Sink,通常是用来做实验和测试时使用。
实例:socket 数据源,查看进程编号最终输出 sink 之 print 打印
packagecn.itcast.day06.sink;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
* @author lql
* @time 2024-02-17 22:27:48
* @description TODO:print
*/publicclassPrintSinkDemo{publicstaticvoidmain(String[] args)throwsException{//local模式默认的并行度是当前机器的逻辑核的数量Configuration configuration =newConfiguration();StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);int parallelism0 = env.getParallelism();System.out.println("执行环境默认的并行度:"+ parallelism0);// socket 数据源DataStreamSource<String> lines = env.socketTextStream("node1",9999);// 获取 lines 数据源并行度int parallelism = lines.getParallelism();System.out.println("SocketSource的并行度:"+ parallelism);
lines.print();
lines.addSink(newMyPrintSink()).name("my-print-sink");
env.execute();}privatestaticclassMyPrintSinkextendsRichSinkFunction<String>{// 这一处定义很重要,不然 indexOfThisSubtask 只能在一个方法中使用!privateint indexOfThisSubtask;@Overridepublicvoidopen(Configuration parameters)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();}@Overridepublicvoidinvoke(String value,Context context)throwsException{System.out.println(indexOfThisSubtask +1+"> "+ value);}}}
结果:
执行环境默认的并行度:8
SocketSource的并行度:1
6> hadoop
1> hadoop
1> hadoop
7> hadoop
总结:
- 打印输出,也是一种 sink
4.2 writeAsText 以文本格式输出
该方法是将数据以文本格式实时的写入到指定的目录中,本质上使用的是 TextOutputFormat 格式写入的。
实例:socket 数据源,将数据输出到文本 Text 中
packagecn.itcast.day06.sink;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.core.fs.FileSystem;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author lql
* @time 2024-02-17 22:40:48
* @description TODO:writeAsText
*/publicclassWriteSinkDemo{publicstaticvoidmain(String[] args)throwsException{//local模式默认的并行度是当前机器的逻辑核的数量Configuration configuration =newConfiguration();StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);int parallelism0 = env.getParallelism();System.out.println("执行环境默认的并行度:"+ parallelism0);DataStreamSource<String> lines = env.socketTextStream("node1",9999);//获取DataStream的并行度int parallelism = lines.getParallelism();System.out.println("SocketSource的并行度:"+ parallelism);
lines.writeAsText("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output",FileSystem.WriteMode.OVERWRITE);
env.execute();}}
结果:
output 文件夹下出现以数字命名的文件
内容为 socket 数据源输出,加上了 \n 换行符
目录中的文件名称是该 Sink 所在 subtask 的 Index + 1
总结:
- 1- writeAsText 输出数据以小文件方式,文件命名为 subtask 的 Index + 1
- 2- FileSystem.WriteMode 有两种,一种是 OVERWRITE,可以覆盖同名文件,一种是 NO_OVERWRITE,同名文件就报错
4.3 writeAsCsv 以 csv 格式输出
该方法是将数据以 csv 格式写入到指定的目录中,本质上使用的是 CsvOutputFormat 格式写入的。
实例:socket 数据源,将数据输出到文本 csv 中
packagecn.itcast.day06.sink;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.operators.DataSource;importorg.apache.flink.api.java.tuple.Tuple7;importorg.apache.flink.core.fs.FileSystem;/**
* @author lql
* @time 2024-02-17 22:52:12
* @description TODO:将DataSet数据写入到csv文件中
*/publicclassCsvSink{publicstaticvoidmain(String[] args)throwsException{ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();//需先建立文件String filePath ="D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\user.csv";//添加数据Tuple7<Integer,String,Integer,Integer,String,String,Long> row =newTuple7<>(15,"zhangsan",40,1,"CN","2020-09-08 00:00:00",1599494400000L);//转换为dataSet,利用 数据源中 fromElements 可以接受 [列表或元组] 的属性DataSource<Tuple7<Integer,String,Integer,Integer,String,String,Long>> dataSet =(DataSource<Tuple7<Integer,String,Integer,Integer,String,String,Long>>) env.fromElements(row);//将内容写入到File中,如果文件已存在,将会被复盖
dataSet.writeAsCsv(filePath,FileSystem.WriteMode.OVERWRITE).setParallelism(1);
env.execute();}}
结果:
在指定文件中,生成了 csv 数据
总结:
- 1- 首先需要定义数据存放的位置,精确到 .scv
- 2- 最终需要将并行度设置为 1,才能生成一个完整的文件
4.4 writeUsingOutputFormat 指定格式输出
该方法是将数据已指定的格式写入到指定目录中
packagecn.itcast.day06.sink;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.io.TextOutputFormat;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.core.fs.Path;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author lql
* @time 2024-02-17 23:03:24
* @description TODO:将数据已指定的格式写入到指定目录中
*/publicclass writeUsingOutputFormatSink {publicstaticvoidmain(String[] args)throwsException{//1:获取流处理运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//调用env的fromElements创建一个非并行的DataStreamSourceDataStreamSource<String> words = env.fromElements("hadoop","spark","flink","hbase","flink","spark");// 对拆分后的单词,每个单词记一次数SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = words.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String word)throwsException{returnTuple2.of(word,1);}});SingleOutputStreamOperator<Tuple2<String,Integer>> result = wordAndOne.keyBy(0).sum(1);
result.writeUsingOutputFormat(newTextOutputFormat<>(newPath("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\wordcount")));
env.execute();}}
结果:
在指定目录下,生成 n(电脑并行度数量) 个文本文件
总结:
- 1- writeAsText 和 writeAsCsv 方法底层都是调用了 writeUsingOutputFormat 方法
- 2- 这种方法更加灵活
4.5 writeToSocket 输出到网络端口
该方法是将数据输出到指定的Socket网络地址端口。
实例:socket 数据源,node1:9999 写数据到 node1:8888
packagecn.itcast.day06.sink;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author lql
* @time 2024-02-17 23:12:03
* @description TODO:writeToSocket
*/publicclassWriteToSocketDemo{publicstaticvoidmain(String[] args)throwsException{//local模式默认的并行度是当前机器的逻辑核的数量Configuration configuration =newConfiguration();StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);int parallelism0 = env.getParallelism();System.out.println("执行环境默认的并行度:"+ parallelism0);DataStreamSource<String> lines = env.socketTextStream("node1",9999);//获取DataStream的并行度int parallelism = lines.getParallelism();System.out.println("SocketSource的并行度:"+ parallelism);// 第三个参数是数据输出的序列化格式 SerializationSchema
lines.writeToSocket("node1",8888,newSimpleStringSchema());
env.execute();}}
结果:
node1:8888 实时接收到 node1:9999 写入的数据
总结:
- 端口号需要提前开启
4.6 基于本地集合的 Sink
数据分类集合输出
实例:数据打印输出,error 输出,可以输出到:Stdout,Stderr,采集为本地集合
packagecn.itcast.day06.sink;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author lql
* @time 2024-02-17 23:18:12
* @description TODO:数据可以输出到:Stdout,Stderr,采集为本地集合
*/publicclassCollectionDemo{publicstaticvoidmain(String[] args)throwsException{//1.envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Tuple2<Integer,String>> dataSource = env.fromElements(Tuple2.of(1,"zhangsan"),Tuple2.of(2,"lisi"),Tuple2.of(3,"wangwu"),Tuple2.of(4,"zhaoliu"));//2.sink
dataSource.print();
dataSource.printToErr();
env.execute();}}
结果:
黑色字体输出:
6> (3,wangwu)
7> (4,zhaoliu)
4> (1,zhangsan)
5> (2,lisi)
红色字体输出:
8> (3,wangwu)
7> (2,lisi)
1> (4,zhaoliu)
6> (1,zhangsan)
总结:
- 1- printToErr 可以进行分类输出
- 2- 并行度是1 能输出文件
- 3- 并行度是n 能输出文件夹
版权归原作者 那就学有所成吧(˵¯͒¯͒˵) 所有, 如有侵权,请联系我们删除。