0


Flink从入门到实践(二):Flink DataStream API

文章目录

系列文章索引

Flink从入门到实践(一):Flink入门、Flink部署
Flink从入门到实践(二):Flink DataStream API
Flink从入门到实践(三):数据实时采集 - Flink MySQL CDC

三、DataStream API

1、官网

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/overview/

2、获取执行环境(Environment)

/**
 * 1、获取Flink执行的环境
 * getExecutionEnvironment() 这是我们用的最多的一种
 * createLocalEnvironment()  这种仅限于本地开发使用
 * createRemoteEnvironment(String host, int port, String... jarFiles);  知道就行,开发不用
 *
 *
 * getExecutionEnvironment 传入一个 new Configuration(),本质上是一个HashMap
 */// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);// 3秒检查一次,提高应用程序的容错性和数据一致性。DataStream<String> text = env.readTextFile("file:///path/to/file");

通常来说使用getExecutionEnvironment()就可以了,会自动选择你当前的运行环境。

3、数据接入(Source)

(1)总览

我们可以使用

env.addSource(sourceFunction)

来添加数据来源,实际有许多内置的Source,也可以定义自己的Source。

如果想要自定义数据来源,比如说(该方式在1.18已过时,推荐使用Source接口):
实现

SourceFunction

接口来实现单并行度的数据来源;
实现

ParallelSourceFunction

接口来实现多并行度的数据来源;
实现

RichParallelSourceFunction

接口来实现更高级的多并行度的数据来源。

内置的数据来源(本质上也是使用

env.addSource(sourceFunction)

来已经预实现了):

env.readTextFile(path)

:逐行读取文本文件,即符合TextInputFormat规范的文件,并将其作为字符串返回。

readFile(fileInputFormat, path)

:按照指定的文件输入格式读取(一次)文件。

readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

:更加复杂的文件处理。

socketTextStream()

:从Socket读取。元素可以用分隔符分隔。

fromCollection(Collection)

fromCollection(Iterator, Class)

fromElements(T ...)

fromParallelCollection(SplittableIterator, Class)

generateSequence(from, to)

:从集合读取。

addSource(new FlinkKafkaConsumer<>(...))

:从kafka读取。

在这里插入图片描述

(2)代码实例(1.18版本已过时的)

// 实体类publicclassAccess{privatelong time;privateString domain;privatedouble traffic;}publicclassStudent{privateint id;privateString name;privateint age;}
// 工具类 需要引入mysql-connector-java包importjava.sql.Connection;importjava.sql.DriverManager;publicclassMySQLUtils{publicstaticConnectiongetConnection()throwsException{Class.forName("com.mysql.jdbc.Driver");returnDriverManager.getConnection("jdbc:mysql://localhost:3306/flink","root","123");}publicstaticvoidclose(AutoCloseable closeable){if(null!= closeable){try{
                closeable.close();// null.close}catch(Exception e){
                e.printStackTrace();}finally{
                closeable =null;}}}}
// 自定义source/**
 * 自定义数据源
 * 并行度为1
 */publicclassAccessSourceimplementsSourceFunction<Access>{volatileboolean isRunning =true;/**
     * 造数据是自定义数据源的使用方式之一
     * @param ctx
     * @throws Exception
     */@Overridepublicvoidrun(SourceContext<Access> ctx)throwsException{Random random =newRandom();String[] domains ={"test1.com","test2.com","test3.com"};while(isRunning){long time =System.currentTimeMillis();
            ctx.collect(newAccess(time, domains[random.nextInt(domains.length)], random.nextInt(1000)+1000));Thread.sleep(2000);}}@Overridepublicvoidcancel(){
        isRunning =false;}}/**
 * 自定义数据源
 * 多并行度
 */publicclassAccessSourceV2implementsParallelSourceFunction<Access>{volatileboolean isRunning =true;/**
     * 造数据是自定义数据源的使用方式之一
     * @param ctx
     * @throws Exception
     */@Overridepublicvoidrun(SourceContext<Access> ctx)throwsException{Random random =newRandom();String[] domains ={"test1.com","test2.com","test3.com"};while(isRunning){long time =System.currentTimeMillis();
            ctx.collect(newAccess(time, domains[random.nextInt(domains.length)], random.nextInt(1000)+1000));Thread.sleep(5000);}}@Overridepublicvoidcancel(){
        isRunning =false;}}/**
 * RichSourceFunction: Rich  +   SourceFunction
 * Rich: 包含了生命周期方法  open  close
 * SourceFunction:单
 *
 * 自定义二次开发:按照框架(Flink/Spark/....)所提供的接口,去实现自己的业务逻辑即可
 * 自定义Source
 * 自定义Sink
 *
 *
 * 扩展:对于Spark SQL的外部数据源熟悉吗? 按照Spark所提供的接口,自己实现业务逻辑
 *
 */publicclassMySQLSourceextendsRichSourceFunction<Student>{Connection connection;PreparedStatement pstmt;/**
     * 初始化操作,建立connection
     */@Overridepublicvoidopen(Configuration parameters)throwsException{
        connection =MySQLUtils.getConnection();
        pstmt = connection.prepareStatement("select * from student");}/**
     * 释放资源,关闭connection
     */@Overridepublicvoidclose()throwsException{MySQLUtils.close(pstmt);MySQLUtils.close(connection);}/**
     * 业务逻辑:就是把表中的数据读取出来 ==> Student
     */@Overridepublicvoidrun(SourceContext<Student> ctx)throwsException{ResultSet rs = pstmt.executeQuery();while(rs.next()){int id = rs.getInt("id");String name = rs.getString("name");int age = rs.getInt("age");Student student =newStudent(id, name, age);
            ctx.collect(student);}}@Overridepublicvoidcancel(){}}
/**
 * Flink中datasource的使用
 */publicclassFlinkDataSourceApp{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();/**
         * 使用内置的dataSource
         *///        DataStreamSource<String> source = env.readFile(new TextInputFormat(null), "data/wc.data");//        // 这个readTextFile方法底层其实调用的就是readFile//        DataStreamSource<String> source = env.readTextFile("data/wc.txt");//        System.out.println(source.getParallelism());  // 8////        SingleOutputStreamOperator<String> mapStream = source.map(String::toUpperCase);//        System.out.println(mapStream.getParallelism());//        mapStream.print();////        DataStreamSource<Long> source = env.fromParallelCollection(new NumberSequenceIterator(1, 10), Long.TYPE);//        System.out.println(source.getParallelism());// 8//        SingleOutputStreamOperator<Long> map = source.map(x -> x + 1);//        map.print();////        DataStreamSource<Access> source = env.addSource(new AccessSourceV2()).setParallelism(3); // 对于ParallelSourceFunction是可以根据具体情况来设定并行度的//        System.out.println(source.getParallelism());//        source.print();/**
         * 使用自定义数据源
         *///        env.addSource(new AccessSource()).print();//        env.addSource(new AccessSourceV2()).setParallelism(3).print(); // 多并行度的可以自行设置并行度/**
         * 使用Flink自定义MySQL的数据源,进而读取MySQL里面的数据
         * 该方式已过时 …… flink更新太快了
         */
        env.addSource(newMySQLSource()).print();/**
         * 单并行度:fromElements  fromCollection  socketTextStream
         * 多并行度:readTextFile fromParallelCollection generateSequence  readFile
         * 自定义:
         */
        env.execute("作业名字");}}

(3)使用Source接口

暂无

4、数据处理(Transformation)

(1)总览

官方文档:

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/overview/

运算符将一个或多个数据流转换为新的数据流。程序可以将多种转换组合成复杂的数据流拓扑。

(2)Map(后续用该结果测试)

把DataStream转换成新的DataStream。

// 将读取的文件,按照,分割,然后每一行数据组成一个Access对象DataStream<Integer> dataStream = env.readTextFile("data/access.log");SingleOutputStreamOperator<Access> mapStream = dataStream.map(newMapFunction<String,Access>(){@OverridepublicAccessmap(String value)throwsException{String[] splits = value.split(",");Access access =newAccess();
        access.setTime(Long.parseLong(splits[0].trim()));
        access.setDomain(splits[1].trim());
        access.setTraffic(Double.parseDouble(splits[2].trim()));return access;}});
mapStream.print();

(3)Filter

把DataStream转换成新的DataStream。
计算每个元素的布尔函数,并保留函数返回true的元素。
也即:过滤出满足条件的元素。

// 过滤出不为0的元素
dataStream.filter(newFilterFunction<Integer>(){@Overridepublicbooleanfilter(Integer value)throwsException{return value !=0;}});

(4)FlatMap

把DataStream转换成新的DataStream。
可以是一对一、一对多、一对0 一个元素进来,可以出去0、1、多个元素。

dataStream.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String value,Collector<String> out)throwsException{for(String word: value.split(" ")){// 把每一个元素按空格分割
            out.collect(word);// 收集每一个 分割后的 元素}}});

(5)KeyBy

把DataStream转换为KeyedStream 。

在逻辑上将流划分为不相交的分区。具有相同关键字的所有记录都被分配到同一个

分区


在内部,keyBy()是通过散列分区实现的。
(类似Map - Reduce思想)
注意!如果是根据一个对象分组,要重写 hashCode()方法,否则会使用默认的Object.hashCode()。

// 根据value的某个属性分组,相当于mysql的group by// 通常分组之后,就要求和、求一些统计数据了
dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);

dataStream
.keyBy(value -> value.getSomeKey()).sum("field")// 根据字段求和还可以求最大值最小值等.print();

(6)Reduce

把KeyedStream 转换为 DataStream。
将当前元素与最后一个减少的值合并,并发出新值。

keyedStream.reduce(newReduceFunction<Integer>(){@OverridepublicIntegerreduce(Integer value1,Integer value2)throwsException{return value1 + value2;}});

(7)Union

把多个DataStream合并为一个DataStream。
两个或多个数据流的联合,创建一个包含所有流中所有元素的新流。注意:如果您将数据流与其自身联合,您将在结果流中两次获得每个元素。

/**
 * union:合并多个流
 * 数据类型问题:union的多个流中数据类型是需要相同的
 * 数据类型相同的多流操作
 */DataStreamSource<Integer> stream1 = env.fromElements(1,2,3);DataStreamSource<Integer> stream2 = env.fromElements(11,12,13);DataStreamSource<String> stream3 = env.fromElements("A","B","C");

stream1.union(stream2).map(x ->"PK_"+ x).print();
stream1.union(stream1).print();
stream1.union(stream1, stream2).print();

(8)Connect

把两个DataStream 合并为 ConnectedStream。

DataStream<Integer> someStream =//...DataStream<String> otherStream =//...ConnectedStreams<Integer,String> connectedStreams = someStream.connect(otherStream);
/**
 * connect: 数据类型可以不同
 * 两个流的操作
 * 只是形式的连接
 */ConnectedStreams<Integer,String> connectedStreams = stream1.connect(stream3);
connectedStreams.map(newCoMapFunction<Integer,String,String>(){// 共享状态String prefix ="common_";// 对第一个流的操作@OverridepublicStringmap1(Integer value)throwsException{return prefix + value*10;}// 对第二个流的操作@OverridepublicStringmap2(String value)throwsException{return prefix + value.toLowerCase();}}).print();

(9)CoMap, CoFlatMap

将ConnectedStream 转换为 DataStream。
类似于连接数据流上的map和flatMap。

connectedStreams.map(newCoMapFunction<Integer,String,Boolean>(){@OverridepublicBooleanmap1(Integer value){returntrue;}@OverridepublicBooleanmap2(String value){returnfalse;}});
connectedStreams.flatMap(newCoFlatMapFunction<Integer,String,String>(){@OverridepublicvoidflatMap1(Integer value,Collector<String> out){
       out.collect(value.toString());}@OverridepublicvoidflatMap2(String value,Collector<String> out){for(String word: value.split(" ")){
         out.collect(word);}}});

(10)Physical Partitioning 分区

importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.Partitioner;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassPartitionTest2{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(3);// 一般情况下,并行度跟分区相同,相同分区在同一个线程中执行DataStreamSource<String> sourcePartition = env.readTextFile("data/access.log");
        sourcePartition
                // 读取数据转成Access对象.map(newRichMapFunction<String,Access>(){@OverridepublicAccessmap(String value)throwsException{String[] splits = value.split(",");Access access =newAccess();
                        access.setTime(Long.parseLong(splits[0].trim()));
                        access.setDomain(splits[1].trim());
                        access.setTraffic(Double.parseDouble(splits[2].trim()));return access;}})// 按照指定字段进行分区.partitionCustom(newPartitioner<String>(){@Overridepublicintpartition(String key,int numPartitions){System.out.println(numPartitions);if("test1.com".equals(key)){return0;}elseif("test2.com".equals(key)){return1;}else{return2;}}}, x -> x.getDomain())// 下面的这段map方法目的是验证:相同的域名是否真的在同一个分区内,看threadid是否相同即可.map(newMapFunction<Access,Access>(){@OverridepublicAccessmap(Access value)throwsException{System.out.println("current thread id is "+Thread.currentThread().getId()+", value is:"+ value);return value;}}).print();
        env.execute("作业名字");}}

(11)Side Outputs 分流操作

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/side_output/
/**
 * 分流操作:把一个分拆分成多个流
 *
 * split 在老的flink版本中是有的,但是新的版本中已经没有这个api
 *
 * 那就说明新版本肯定提供了更好用的使用方式:side output
 */DataStreamSource<String> source = env.readTextFile("data/access.log");SingleOutputStreamOperator<Access> stream = source.map(newAccessConvertFunction());// 很low的写法//        SingleOutputStreamOperator<Access> pk1Stream = stream.filter(x -> "test1.com".equals(x.getDomain()));//        SingleOutputStreamOperator<Access> pk2Stream = stream.filter(x -> "test1.com".equals(x.getDomain()));//        pk1Stream.print("域名是pk1.com的流");//        pk2Stream.print("域名是pk2.com的流");// 定义两个TagOutputTag<Access> test1OutputTag =newOutputTag<Access>("test1"){};OutputTag<Access> test2OutputTag =newOutputTag<Access>("test2"){};SingleOutputStreamOperator<Access> processStream = stream.process(newProcessFunction<Access,Access>(){@OverridepublicvoidprocessElement(Access value,Context ctx,Collector<Access> out)throwsException{if("test1.com".equals(value.getDomain())){
            ctx.output(test1OutputTag, value);// pk1.com的走pk1的OutputTag}elseif("test2.com".equals(value.getDomain())){
            ctx.output(test2OutputTag, value);// pk2.com的走pk2的OutputTag}else{
            out.collect(value);// pk3.com的走主流}}});

processStream.print("主流:");
processStream.getSideOutput(test1OutputTag).print("test1的:");
processStream.getSideOutput(test2OutputTag).print("test2的:");

env.execute("作业名字");

5、数据输出(Data Sinks、Connectors)

(1)总览

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/overview/#data-sinks
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/overview/

数据接收器消费数据流,并将它们转发到文件、Socket、外部系统或打印它们。
Flink自带多种内置输出格式:

writeAsText() / TextOutputFormat

:将元素作为字符串逐行写入。字符串是通过调用每个元素的toString()方法获得的。

writeAsCsv(...) / CsvOutputFormat

:将元组写入逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。

print() / printToErr()

:打印标准输出/标准错误流中每个元素的toString()值。可选地,可以提供一个前缀(msg),将其附加到输出的前面。这有助于区分不同的打印调用。如果并行度大于1,输出还将加上产生输出的任务的标识符。

writeUsingOutputFormat() / FileOutputFormat

:自定义文件输出的方法和基类。支持自定义对象到字节的转换。

writeToSocket

:根据SerializationSchema将元素写入Socket。

addSink

:调用自定义接收器函数。Flink与其他系统(如Apache Kafka)的连接器捆绑在一起,这些连接器被实现为sink函数。

(2)print

stream.print();/*
>7> Access{time=202810110120, domain='test1.com', traffic=2000.0}
1> Access{time=202810110120, domain='test2.com', traffic=4000.0}
11> Access{time=202810110120, domain='test1.com', traffic=5000.0}
4> Access{time=202810110120, domain='test3.com', traffic=1000.0}
9> Access{time=202810110120, domain='test2.com', traffic=6000.0}
线程号 + 数据.toString()

如果这样:
stream.print().setParallelism(1);
并行度设置为1,那么前面就不会输出数字

这样打印红色:
stream.printToErr();
*/

源码:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

(3)自定义Sink打印到控制台

stream.addSink(newRichSinkFunction<Access>(){int subTaskId;// num>@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);

        subTaskId =getRuntimeContext().getIndexOfThisSubtask();}@Overridepublicvoidinvoke(Access value,SinkFunction.Context context)throwsException{System.out.println(subTaskId +1+"> "+ value);// 最终执行的方法,输出到终端}});

(4)写入到文件

// 已过时// 写入到文件,每一个并行度,会生成一个文件。并行度为1会生成test一个文件
stream.writeAsText("out/test",FileSystem.WriteMode.OVERWRITE).setParallelism(1);// 也已经过时了,推荐使用 org.apache.flink.connector.file.sink.FileSink,需要额外引入包StreamingFileSink<String> fileSink =StreamingFileSink.forRowFormat(newPath("out"),newSimpleStringEncoder()).withRollingPolicy(DefaultRollingPolicy.builder()// 构建文本滚动生成的策略.withRolloverInterval(Duration.ofMinutes(15))// 按时间间隔滚动.withInactivityInterval(Duration.ofSeconds(5))// 按不活跃滚动.withMaxPartSize(MemorySize.ofMebiBytes(1))// 按大小滚动.build()).build();// 数据类型需要前后对应
stream.map(Access::toString).addSink(fileSink);

(5)输出到MySQL

JdbcSink.sink

提供

至少一次

保证。然而有效的是,通过创建

upsert

SQL语句或幂等SQL更新可以实现“恰好一次”。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version></dependency>
// 写入到mysql// 需要使用upsert语句SinkFunction<Access> jdbcSink =JdbcSink.sink(// sql"insert into access (id, name) values (?, ?) on duplicate key update name=VALUES(name)",// sql的参数(JdbcStatementBuilder<Access>)(preparedStatement, access)->{
            preparedStatement.setInt(1,(int)access.getTraffic());
            preparedStatement.setString(2, access.getDomain());},// 执行参数JdbcExecutionOptions.builder().withBatchSize(5).withBatchIntervalMs(200).withMaxRetries(5)// 重试.build(),// jdbc连接信息newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://192.168.56.10:3306/testdb").withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("root").build());

stream.addSink(jdbcSink);

自1.13起, Flink JDBC sink支持

恰好一次

模式。该实现依赖于XA标准的JDBC驱动程序支持。如果数据库也支持XA,则大多数驱动程序都支持XA(因此驱动程序通常是相同的)。

StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env
        .fromElements(...).addSink(JdbcSink.exactlyOnceSink("insert into books (id, title, author, price, qty) values (?,?,?,?,?)",(ps, t)->{
                    ps.setInt(1, t.id);
                    ps.setString(2, t.title);
                    ps.setString(3, t.author);
                    ps.setDouble(4, t.price);
                    ps.setInt(5, t.qty);},JdbcExecutionOptions.builder().withMaxRetries(0).build(),JdbcExactlyOnceOptions.defaults(),()->{// create a driver-specific XA DataSource// The following example is for derby EmbeddedXADataSource ds =newEmbeddedXADataSource();
                    ds.setDatabaseName("my_db");return ds;});
env.execute();

(6)输出到Redis

https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
publicstaticclassRedisExampleMapperimplementsRedisMapper<Tuple2<String,String>>{@OverridepublicRedisCommandDescriptiongetCommandDescription(){returnnewRedisCommandDescription(RedisCommand.HSET,"HASH_NAME");}@OverridepublicStringgetKeyFromData(Tuple2<String,String> data){return data.f0;}@OverridepublicStringgetValueFromData(Tuple2<String,String> data){return data.f1;}}FlinkJedisPoolConfig conf =newFlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();DataStream<String> stream =...;
stream.addSink(newRedisSink<Tuple2<String,String>>(conf,newRedisExampleMapper());

(7)输出到Socket

// 输出到Socket,注意类型匹配,输出为字符串
stream.map(Access::toString).writeToSocket("localhost",9528,newSimpleStringSchema());

(8)输出到Kafka

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/
标签: flink 大数据

本文转载自: https://blog.csdn.net/A_art_xiang/article/details/136074350
版权归原作者 秃了也弱了。 所有, 如有侵权,请联系我们删除。

“Flink从入门到实践(二):Flink DataStream API”的评论:

还没有评论