0


flink重温笔记(二):Flink 流批一体 API 开发——Source 数据源操作

Flink学习笔记

前言:今天是第二天啦!开始学习 Flink 流批一体化开发知识点,重点学习了各类数据源的导入操作,我发现学习编程需要分类记忆,一次一次地猜想 api 作用,然后通过敲代码印证自己的想法,以此理解知识点,加深对api的理解和应用。
Tips:我觉得学习 Flink 还是挺有意思的,虽然学习进度有点慢,但是数据源已经理解清楚了,我相信接下来一切会越来越好的!

二、Flink 流批一体 API 开发

1. 输入数据集 Data Source

1.1 预定义 Source
1.1.1 基于本地集合的 Source
  • (1) env.fromElements()
# 两种输入类型,一种是元素,一种是元组
DataStreamSource<Object> ds1 = env.fromElements("hadoop","spark", "spark", "flink");

List<Tuple2<String,Long>> tuple2List  = new ArrayList<>();
tuple2List.add(Tuple2.of("hadoop",1L));
tuple2List.add(Tuple2.of("spark", 2L));
tuple2List.add(Tuple2.of("flink", 3L));
DataStreamSource<List<Tuple2<String, Long>>> ds2 = env.fromElements(tuple2List);# 输出-16> spark
4> hadoop
5> spark
7> flink

# 输出-26>[(hadoop,1), (spark,2), (flink,3)]
  • (2) env.fromCollection()
# 传入列表
DataStreamSource<String> ds3 = env.fromCollection(Arrays.asList("spark", "flink", "hadoop"));# 输出-38> hadoop
6> spark
7> flink

# fromParallelCollection 并行度队列(0-10闭区间)
DataStreamSource<Long> parallelCollection  = env.fromParallelCollection(
                new NumberSequenceIterator(0L, 10L),
                TypeInformation.of(Long.TYPE)).setParallelism(3);# 乱序输出 -parallelCollection8>82>108>76>36>53>07>61>95>25>44>1
  • (3) env.generateSequence()
# 传入队列(左开右闭区间)
DataStreamSource<Long> ds4 = env.generateSequence(1, 10);# 输出 -48>83>32>25>51>11>97>76>64>42>10
  • (4) env.fromSequence()
# 传入队列(左开右闭区间)
DataStreamSource<Long> ds5 = env.fromSequence(1, 10);# 输出 -51>87>66>102>53>13>28>74>95>35>4
1.1.2 基于文件的 Source
  • (1) 批的方式读取文本文件:env.readTextFile(path)
packagecn.itcast.day02.source;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
 * @author lql
 * @time 2024-02-12 23:47:53
 * @description TODO:批的方式读取文件
 */publicclassBatchFromFile{publicstaticvoidmain(String[] args)throwsException{// 配置端口号信息Configuration configuration =newConfiguration();
        configuration.setInteger("rest.port",8081);// 初始化 UI 环境StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);// 读取数据源String path ="D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\wordcount.txt";DataStreamSource<String> lines = env.readTextFile(path);// 数据源并行度int parallelism = lines.getParallelism();System.out.println("ReadTextFileDemo创建的DataStream的并行度为:"+ parallelism);
        lines.print();
        env.execute();}}
  • (2) 流的方式读取文本文件:env.readFile() - 细节点:流式处理 PROCESS_CONTINUOUSLY 时,文件状态改变才能触发重新打印一次
packagecn.itcast.day02.source;importorg.apache.flink.api.java.io.TextInputFormat;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.FileProcessingMode;/**
 * @author lql
 * @time 2024-02-13 15:34:11
 * @description TODO:流的方式读取数据源,无限流
 */publicclassStreamFromFile{publicstaticvoidmain(String[] args)throwsException{Configuration configuration =newConfiguration();
        configuration.setInteger("rest.port",8081);StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);String path ="./data/input/wordcount.txt";// new TextInputFormat(null),文本输入编码格式,null表示默认为utf-8编码// FileProcessingMode.PROCESS_ONCE 只处理一次// 2000毫秒表示间隔处理时间DataStreamSource<String> lines1 = env.readFile(newTextInputFormat(null), path,FileProcessingMode.PROCESS_ONCE,2000);// FileProcessingMode.PROCESS_CONTINUOUSLY 永续处理,不会停止DataStreamSource<String> lines2 = env.readFile(newTextInputFormat(null), path,FileProcessingMode.PROCESS_CONTINUOUSLY,2000);// 查看并行度System.out.println("lines1的并行度:"+lines1.getParallelism());System.out.println("lines2的并行度:"+lines2.getParallelism());//lines1.print();
        lines2.print();
        env.execute();}}
1.1.3 基于 Socket 的 Source
  • 现象:socket 的并行度是 1(单并行度数据源)
  • 细节:在虚拟机上用 nc -lk 8888 启动 socket 服务端
packagecn.itcast.day02.source;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
 * @author lql
 * @time 2024-02-13 16:00:47
 * @description TODO:基于socket的数据源
 */publicclassStreamSocketSource{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();int parallelism0 = env.getParallelism();System.out.println("执行环境默认的并行度:"+ parallelism0);DataStreamSource<String> lines = env.socketTextStream("192.168.88.161",8888);int parallelism1 = lines.getParallelism();System.out.println("SocketSource的并行度:"+ parallelism1);SingleOutputStreamOperator<String> words = lines.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String line,Collector<String> out)throwsException{String[] words = line.split(" ");for(String word : words){
                    out.collect(word);}}});int parallelism2 = words.getParallelism();System.out.println("调用完FlatMap后DataStream的并行度:"+ parallelism2);
        words.print();
        env.execute();}}
1.2 自定义 Source
1.2.1 基于随机生成DataSource
  • (1) 自定义实现 SourceFunction 接口 - 例子:自定义数据源, 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)- 要求: 随机生成订单ID(UUID),用户ID(0-2),订单金额(0-100),时间戳为当前系统时间
packagecn.itcast.day02.source.custom;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importjava.util.Random;importjava.util.UUID;/**
 * @author lql
 * @time 2024-02-13 16:21:31
 * @description TODO
 */publicclassCustomerSourceWithoutParallelDemo{/**
     * 自定义 java Bean 类
     *     @Data:自动为类生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。
     *     @AllArgsConstructor:自动生成一个包含所有参数的构造函数。
     *     @NoArgsConstructor:自动生成一个无参构造函数。
     */@Data@NoArgsConstructor@AllArgsConstructorpublicstaticclassOrder{// 订单privateString id;// 用户 IDprivateString userId;// 订单金额privateint money;// 时间戳privateLong timestamp;}/**
     * 主函数
     * @param args
     * @throws Exception
     */publicstaticvoidmain(String[] args)throwsException{//todo 1)获取flink流处理的运行环境Configuration configuration =newConfiguration();
        configuration.setInteger("rest.port",8081);StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);System.out.println("初始化环境的并行度:"+ env.getParallelism());// todo 2) 接入自定义数据源DataStreamSource<Order> streamSource = env.addSource(newMySource());System.out.println("streamSource并行度: "+ streamSource.getParallelism());// todo 3) 打印输出
        streamSource.printToErr();
        env.execute();}/**
     * 自定义数据源,每秒钟生成一个订单信息
     */privatestaticclassMySourceimplementsSourceFunction<Order>{// 定义循环生成数据的标记privateboolean isRunning =true;/**
         * 核心方法:生成数据
         */@Overridepublicvoidrun(SourceContext<Order> sourceContext)throwsException{Random random =newRandom();while(isRunning){// 订单IDString orderID =UUID.randomUUID().toString();// 用户 IdString userID =String.valueOf(random.nextInt(3));// 订单金额int money = random.nextInt(1000);// 时间long time =System.currentTimeMillis();// 返回数据
                sourceContext.collect(newOrder(orderID, userID, money, time));}}@Overridepublicvoidcancel(){
            isRunning =false;}}}

结果:默认运行环境的并行度:8, 自定义streamSource的并行度为:1

总结:

  • 1- env.addSource(new MySource()),自定义数据源 [私有静态方法]:- new 一个 实现(implements) SourceFunction 接口,并重写核心方法
  • 2- 认识了 java bean 类,@Data,@NoArgsConstructor,@AllArgsConstructor 的作用
  • 3- UUID 这个工具类可以随机生成 id,随机数使用需要先 new 一个,random.nextInt() 是左闭右开
  • 4- String.valuesOf()是可以生成字符串类型,while 循环需要有 boolean 标记
  • 5- collect()可以返回对象数据
  • (2) 实现ParallelSourceFunction创建可并行Source
DataStreamSource<String> mySource = env.addSource(newMySource()).setParallelism(6);

# 上述非rich的自定义mySource数据源不支持多个并行度
  • (3) 实现RichParallelSourceFunction:创建并行并带有Rich功能的Source
packagecn.itcast.day02.source.custom;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;importjava.util.Random;importjava.util.UUID;/**
 * @author lql
 * @time 2024-02-13 16:58:49
 * @description TODO:多并行度的自定义数据源
 */publicclassRichParallelismDemo{/**
     * 自定义 java Bean 类
     *
     * @Data:自动为类生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。
     * @AllArgsConstructor:自动生成一个包含所有参数的构造函数。
     * @NoArgsConstructor:自动生成一个无参构造函数。
     */@Data@NoArgsConstructor@AllArgsConstructorpublicstaticclassOrder{// 订单privateString id;// 用户 IDprivateString userId;// 订单金额privateint money;// 时间戳privateLong timestamp;}/**
     * 主函数
     * @param args
     * @throws Exception
     */publicstaticvoidmain(String[] args)throwsException{//todo 1)获取flink流处理的运行环境Configuration configuration =newConfiguration();
        configuration.setInteger("rest.port",8081);StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);System.out.println("初始化环境的并行度:"+ env.getParallelism());// todo 2) 接入自定义数据源DataStreamSource<Order> streamSource = env.addSource(newMySource());
        streamSource = streamSource;System.out.println("streamSource并行度: "+ streamSource.getParallelism());// todo 3) 打印输出
        streamSource.printToErr();
        env.execute();}/**
     * 自定义数据源,每秒钟生成一个订单信息
     */privatestaticclassMySourceextendsRichParallelSourceFunction<Order>{// 定义循环生成数据的标记privateboolean isRunning =true;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);}@Overridepublicvoidclose()throwsException{super.close();}@Overridepublicvoidcancel(){}@Overridepublicvoidrun(SourceContext<Order> sourceContext)throwsException{Random random =newRandom();while(isRunning){// 订单IDString orderID =UUID.randomUUID().toString();// 用户 IdString userID =String.valueOf(random.nextInt(3));// 订单金额int money = random.nextInt(1000);// 时间long time =System.currentTimeMillis();// 返回数据
                sourceContext.collect(newOrder(orderID, userID, money, time));}}}}

结果:自定义RichParallelSourceFunction支持多个并行度

总结:继承 RichParallelSourceFunction 方法,需要重写方法 open 和 close !

1.2.2 基于 MySQL 的 Source 操作
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`  (
  `id` int(11) NOT NULL,
  `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壮');
INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');

SET FOREIGN_KEY_CHECKS = 1;
packagecn.itcast.day02.source.custom;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importjava.sql.ResultSet;importjava.util.concurrent.TimeUnit;/**
 * @author lql
 * @time 2024-02-13 17:14:06
 * @description TODO:自定义 mysql 数据源
 */publicclassMysqlSource{publicstaticvoidmain(String[] args)throwsException{// TODO 1: 获取 flink 流处理环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// TODO 2: 接入自定义数据源DataStreamSource<UserInfo> streamSource = env.addSource(newMysqlSourceFunction());System.out.println("MysqlSourceFunction的并行度为:"+streamSource.getParallelism());// todo 3) 打印输出
        streamSource.print();// todo 4) 启动运行作业
        env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublicstaticclassUserInfo{privateint id;privateString username;privateString password;privateString name;}/**
     * 自定义数据源:获取 mysql 数据
     */privatestaticclassMysqlSourceFunctionextendsRichParallelSourceFunction<UserInfo>{// 定义 mysql 的连接对象privateConnection connection =null;// 定义 mysql statement 对象privatePreparedStatement statement =null;/**
         * 实例化的时候会被执行一次,多个并行度会执行多次,因为有多个实例
         * 一般由于资源的初始化操作
         * @param parameters
         * @throws Exception
         */@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);// 注册驱动Class.forName("com.mysql.jdbc.Driver");// 实例化 mysql 的连接对象
            connection =DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false","root","root");// 实例化 statement 对象
            statement = connection.prepareStatement("select * from test.user");}@Overridepublicvoidclose()throwsException{super.close();}@Overridepublicvoidrun(SourceContext<UserInfo> sourceContext)throwsException{while(true){ResultSet resultSet = statement.executeQuery();while(resultSet.next()){int id = resultSet.getInt("id");String username = resultSet.getString("username");String password = resultSet.getString("password");String name = resultSet.getString("name");

                    sourceContext.collect(newUserInfo(id,username,password,name));}
                resultSet.close();TimeUnit.SECONDS.sleep(1);}}@Overridepublicvoidcancel(){}}}

结果:mysql 的自定义 source,可以多并行度

总结:

  • 1- java Bean 类,给 mysql 字段名定义用
  • 2- 初始化 mysql 连接对象 connection 和 statement 记为null
  • 3- 重写 open 驱动方法: - 注册 mysql 驱动:Class.forName(“com.mysql.jdbc.Driver”)- 实例化连接对象 connection:DriverManager.getConnection()- 实例化 statement:connection.prepareStatement(),这里放置 sql 查询语句
  • 4- 重写 run 核心方法: - 双重循环,第一层:结果集关闭和停顿间隔,第二层:statement.executeQuery()获取结果集,字段类型和内容获取- 获取完字段后,需要collect(new 实体类(字段集))
  • 5- 睡眠时间:TimeUnit.SECONDS.sleep()
标签: flink 笔记 大数据

本文转载自: https://blog.csdn.net/m0_60732994/article/details/136110526
版权归原作者 那就学有所成吧(˵¯͒¯͒˵) 所有, 如有侵权,请联系我们删除。

“flink重温笔记(二):Flink 流批一体 API 开发——Source 数据源操作”的评论:

还没有评论