0


flink用法详解

Flink

  • 无界流 (Unbounded Streams): 数据流理论上没有终点,持续不断地流入系统。Flink 会连续地处理这些事件,即使在处理过程中新的数据还在不断到来。
  • 有界流 (Bounded Streams): 数据流有一个明确的起点和终点,处理完所有数据后任务即结束。Flink 可以像处理流一样处理批数据,采用相同的 API 并提供高效执行。

编写一个Flink步骤

  1. 创建一个flink执行环境
  2. 创建一个数据来源 source
  3. 编写流操作 transformations
  4. 数据流出口 sink
  5. 执行任务 execute
publicclassSimpleFlinkJob{publicstaticvoidmain(String[] args)throwsException{// 创建执行环境finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置 Kafka 消费者参数Properties kafkaProps =newProperties();
        kafkaProps.setProperty("bootstrap.servers","localhost:9092");
        kafkaProps.setProperty("group.id","testGroup");// 创建 Kafka 数据源FlinkKafkaConsumer<String> kafkaSource =newFlinkKafkaConsumer<>("input-topic",newSimpleStringSchema(), kafkaProps);// 从 Kafka 中读取数据流DataStream<String> stream = env.addSource(kafkaSource);// 定义数据转换操作,这里是对字符串计数DataStream<Tuple2<String,Integer>> counts = stream
            .map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value){returnnewTuple2<>(value,1);}}).keyBy(0).sum(1);// 打印输出结果
        counts.print().setParallelism(1);// 执行任务
        env.execute("Simple Flink Job");}

Source:flink的数据来源

  • 集合*** 一般用于学习测试时使用*1.env.fromElements(可变参数);*2.env.fromColletion(各种集合);*3.env.generateSequence(开始,结束);*4.env.fromSequence(开始,结束);**@param args 基于集合*@throwsException*/// sourceDataStream<String> ds1 = env.fromElements("i am alanchan","i like flink");DataStream<String> ds2 = env.fromCollection(Arrays.asList("i am alanchan","i like flink"));DataStream<Long> ds3 = env.generateSequence(1,10);//已过期,使用fromSequence方法DataStream<Long> ds4 = env.fromSequence(1,10);
  • 文件/** * env.readTextFile(本地/HDFS文件/文件夹),压缩文件也可以 */DataStream<String> ds1 = env.readTextFile("D:src/main/resources/words.txt");DataStream<String> ds2 = env.readTextFile("D:/workspace/distribute_cache_student");DataStream<String> ds3 = env.readTextFile("D:/words.tar.gz");DataStream<String> ds4 = env.readTextFile("hdfs://server2:8020///flinktest/wc-1688627439219");
  • socket/** * @author alanchan * 在192.168.10.42上使用nc -lk 9999 向指定端口发送数据 * nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据 * 如果没有该命令可以下安装 yum install -y nc * */DataStream<String> lines = env.socketTextStream("192.168.10.42",9999);
  • kafka
/**
     * 参数解释:
     *  -bs broker 地址
     *  -kcg kafka consumer group
     *  -it kafka 输入数据 topic
     *  -ct 是否自动创建 topic
     *  -pt topic 分区数
     *  -rf topic 副本数
     */finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();finalMultipleParameterTool cmd =MultipleParameterTool.fromArgs(args);finalString bootstrapServer = cmd.get("bs","localhost:9092");finalString kafkaConsumerGroup = cmd.get("kcg","flink-consumer");finalString inputTopic = cmd.get("it","quickstart-events");finalboolean createTopic = cmd.getBoolean("ct",false);

        log.info("broker is {} and topic is {}", bootstrapServer, inputTopic);// 如果 topic 不存在,并且开启了由 flink 任务创建 TOPIC。默认不开启,一般情况下,部署人员应当根据实际情况设置不同topic的并行度,副本数if(createTopic){finalint partitions = cmd.getInt("pt",1);finalshort replicationFactor = cmd.getShort("rf",(short)1);createTopic(bootstrapServer, inputTopic, partitions, replicationFactor);}finalKafkaSource<String> kafkaSource =KafkaSource.<String>builder().setGroupId(kafkaConsumerGroup).setStartingOffsets(OffsetsInitializer.latest()).setBootstrapServers(bootstrapServer).setTopics(inputTopic).setValueOnlyDeserializer(newSimpleStringSchema()).build();finalDataStreamSource<String> kafkaStream = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"Kafka Source");

总结:

env.addSource(sourceFunction) 适用于需要自定义数据源逻辑、处理非标准数据源或对数据源控制有特殊需求的情况。使用时需要自行实现 SourceFunction,处理数据读取、错误恢复、并行化等细节。
env.fromSource(source) 适用于对接已知、常见的数据源,如 Kafka、文件、数据库等,利用 Flink 提供的预封装数据源类。这种方式简化了开发过程,提供了更好的容错性和易用性,但可能不支持所有定制化需求。
  • 自定义SourceFlink提供了数据源接口,实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:SourceFunction,非并行数据源,并行度只能=1 RichSourceFunction,多功能非并行数据源,并行度只能=1 ParallelSourceFunction,并行数据源,并行度能够>=1 RichParallelSourceFunction,多功能并行数据源,并行度能够>=1
publicclassCustomMySQLSourceextendsRichParallelSourceFunction<User>{privateboolean flag =true;privateConnection conn =null;privatePreparedStatement ps =null;privateResultSet rs =null;@Overridepublicvoidopen(Configuration parameters)throwsException{
        conn =DriverManager.getConnection("jdbc:mysql://server4:3306/test?useUnicode=true&characterEncoding=UTF-8","root","123456");String sql ="select id,name,pwd,email,age,balance from user";
        ps = conn.prepareStatement(sql);}@Overridepublicvoidrun(SourceContext<User> ctx)throwsException{while(flag){
            rs = ps.executeQuery();while(rs.next()){User user =newUser(rs.getInt("id"), rs.getString("name"), rs.getString("pwd"), rs.getString("email"), rs.getInt("age"), rs.getDouble("balance"));
                ctx.collect(user);}//每5秒查询一次数据库Thread.sleep(5000);}}@Overridepublicvoidcancel(){
        flag =false;}}
DataStream<User> userDS = env.addSource(newCustomMySQLSource()).setParallelism(1);

transformations

1.FlatMap 算子

作用:操作又称为扁平映射,主要用于将数据流中的整体(通常是集合类型)拆分成单个个体。与 Map() 算子不同,FlatMap() 可以产生 0 到多个元素,意味着对于输入数据流中的每个元素,FlatMap() 可以根据定义的处理逻辑生成一个或多个输出元素。这种操作在大数据处理中非常有用,特别是在需要对集合类型数据进行拆分和转换的场景中。

使用:实现FlatMapFunction类,param1=数据流操作前对象 ,param2=数据流操作后对象流出

.flatMap(newFlatMapFunction<Event,Event>(){@OverridepublicvoidflatMap(Event event,Collector<Event> collector)throwsException{if(event.user.equals("Marry"))
                    collector.collect(event);}}).flatMap(newMyflatMapFilter())publicstaticclassMyflatMapFilter imlements  FlatMapFunction<Event,Event>{@OverridepublicvoidflatMap(Event event,Collector<Event> collector)throwsException{if(event.user.equals("Marry"))
                    collector.collect(event);}}

2.Map 算子

作用:对数据流中的每个元素应用一个函数,产生一个新的数据流。

使用:实现MapFunction类,param1=数据流操作前对象 ,param2=数据流操作后对象流出

.map(newMapFunction<Event,Event>(){@OverridepublicEventmap(Event event,Event event)throwsException{return event;})publicstaticclassMyMapFilter imlements  MapFunction<Event,Event>{@OverridepublicEventmap(Event event,Event event)throwsException{return event;})}

3.Filter 算子

作用:根据提供的条件过滤出数据流中的元素。

使用:实现FilterFunction类

.filter(newFilterFunction<Event>(){@Overridepublicbooleanfilter(Event event)throwsException{return event.user.equals("Marry");}})publicstaticclassMyFilter imlements  FilterFunction<Event>{@Overridepublicbooleanfilter(Event event)throwsException{return event.user.equals("Marry");}}

DataStreamKeyedStream

逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。从逻辑上分区去对这些数据进行处理,物理上的位置无关紧要。不过最终同一个Key中的数据一定在一个任务槽中,这样会出现数据倾斜的问题。

4.KeyBy 算子
作用:对数据流进行分区,确保具有相同键的元素发送到同一个并行任务中。

方法:keyBy 通过指定 key 来将 DataStream 转换成 KeyedStream。基于不同的 key,流中的事件将被分配到不同的分区中去。所有具有相同 key 的事件将会在接下来的操作符的同一个子任务槽中进行处理。拥有不同 key 的事件可以在同一个任务中处理。但是算子只能访问当前事件的 key 所对应的状态。keyBy() 方法接收一个参数,这个参数指定了 key 或者 keys,有很多不同的方法来指定 key

.keyBy(newKeySelector<WordWithCount,String>(){publicStringgetKey(WordWithCount value)throwsException{return value.word;}});

5.滚动聚合算子

只要存在分组,就一定存在聚合,所以提出了滚动聚合的概念

概念:滚动聚合算子由 KeyedStream 调用,并生成一个聚合以后的 DataStream,例如:sum,minimum,maximum。一个滚动聚合算子会为每一个观察到的 key 保存一个聚合的值。针对每一个输入事件,算子将会更新保存的聚合结果,并发送一个带有更新后的值的事件到下游算子。滚动聚合不需要用户自定义函数,但需要接受一个参数,这个参数指定了在哪一个字段上面做聚合操作。

使用:滚动聚合算子无法组合起来使用,每次计算只能使用一个单独的滚动聚合算子。

sum():在输入流上对指定的字段做滚动相加操作。 min():在输入流上对指定的字段求最小值。 max():在输入流上对指定的字段求最大值。 minBy():在输入流上针对指定字段求最小值,并返回包含当前观察到的最小值的事件。 maxBy():在输入流上针对指定字段求最大值,并返回包含当前观察到的最大值的事件。

6.reduce

概念:它将一个 ReduceFunction 应用到了一个 KeyedStream 上面去。reduce 算子将会把每一个输入事件和当前已经 reduce 出来的值做聚合计算。reduce 操作不会改变流的事件类型。输出流数据类型和输入流数据类型是一样的。

实现:reduce 函数可以通过实现接口 ReduceFunction 来创建一个类。ReduceFunction 接口定义了 reduce() 方法,此方法接收两个输入事件,输出一个相同类型的事件。

使用:ReduceFunction(流的对象类型 )param1 = 数据流操作对象 ,param2 = 当前已经 reduce 出来的值 return :一个相同类型的事件

keyedStream
        .reduce(newReduceFunction<Tuple2<Integer,Integer>>(){@OverridepublicTuple2<Integer,Integer>reduce(Tuple2<Integer,Integer> value1,Tuple2<Integer,Integer> value2)throwsException{returnTuple2.of(value1.f0,value1.f1 + value2.f1);}})

7.Window 算子
作用:将数据流划分为有限大小的窗口,并对窗口内的数据进行聚合或其他操作。

// Java API 示例,对数据流按 event time 进行滑动窗口处理DataStream<Tuple2<String,Integer>> windowedCounts = words
    .keyBy(0).timeWindow(Time.seconds(10))// 10 秒滑动窗口.sum(1);// 对第二个字段求和

8.Join 算子
作用:连接两个数据流,基于指定的键进行内连接、外连接等操作。

// Java API 示例DataStream stream1 =...;DataStream stream2 =...;DataStream<Tuple2<String,Tuple2<String,String>>> joinedStreams = stream1
    .join(stream2).where(value -> value.length())// 指定第一个流的 join key.equalTo(value -> value.length())// 指定第二个流的 join key.apply(newJoinFunction<String,String,Tuple2<String,String>>(){@OverridepublicTuple2<String,String>join(String first,String second){returnnewTuple2<>(first, second);}});

9.Union 算子
作用:将两个数据流合并为一个

DataStream streamA =...;DataStream streamB =...;DataStream combinedStream = streamA.union(streamB);

10.富函数类(Rich Function Classes)
“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。

与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

Rich Function有生命周期的概念。典型的生命周期方法有:

  • open()方法:是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
  • close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
  • 需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map,在每条数据来到后都会触发一次调用。

使用:在流过程中操作其他,通过mysql获取黑名单,通过redis获取白名单 等操作

publicstaticclassMyFilterextendsRichMapFunction<Integer,Integer>{//因为使用的是redis集群所以使用  JedisCluster 
    privde JedisCluster jedis;//mysql 连接
    privde PerardStatement statement;
    privde Connection conn;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);//1.可以操作链接redisSet<HostAndPort> jedisClusterNode =newHashSet<>;String host ="127.0.0.1"Integer port =6380;
        jedisClusterNode.add(newHostAndPort(host, port));
        jedis =newJedisCluster(jedisClusterNode);//2.操作连接MySqlProperties properties =newProperties();//Properties可以保存到流中或从流中加载,属性列表中的键及其对应的值都是字符串。//数据都在properties 对象中
        properties.load(newFileInputStream("src/db.properties"));

        url = properties.getProperty("url");
        user = properties.getProperty("user");
        password = properties.getProperty("password");String diver = properties.getProperty("driver");Class.forName(diver);
        connection =DriverManager.getConnection(url, user, password);////创建一个PreparedStatement对象,用于将参数化的SQL语句发送到数据库。//例如添加数据String sql ="elect * from work where id = ? and name = ?";
        statement = connection.prepareStatement(sql);//3.发起HTTP请求}@OverridepublicIntegermap(Integer integer)throwsException{//重写需要过滤的逻辑//获取预处理的搬运工对象的元数据对象的个数(参数型sql的?个数)int columnCount1 = preparedStatement.getParameterMetaData().getParameterCount();//将数据赋值给sql的参数//此时数据在preparedStatement
        preparedStatement.setInt(1,1);
        preparedStatement.setString(2,"张三");//声明一个数组,后面用数组对象存储查询到的值,然后输出List<T> list =newArrayList<>();//查询生成一个ResultSet对象,其中包含查询产生的数据
        resultSet = preparedStatement.executeQuery();//获取结果集元数据ResultSetMetaData metaData = resultSet.getMetaData();//获取结果集元数据的列的个数(下面就是结果集元数据)/**
         *id    name  age        info
         * 1    张三     18        喜欢玩游戏
         * 2    李四     22        喜欢打篮球
         * 3    王五     24        喜欢唱跳Rap
         * 4    老六     26        喜欢偷袭
         * 5    赵七     28        喜欢散步
         */int columnCount2 = metaData.getColumnCount();//用过双层循环获取数据//resultSet.next()有点像迭代器,也有光标,会随着循环下移while(resultSet.next()){//通过Class文件反射声明一个对象,用于存放数据T t = cls.getConstructor(null).newInstance(null);for(int i =1; i <= columnCount2; i++){//在元数据中获取列的名字String columnName = metaData.getColumnName(i);//通过列的名字,用结果集获取该列的值Object value = resultSet.getObject(columnName);//通过BeanUtils的jar包下的setProperty(数组, 属性名, 值)方法插入数据//使用BeanUtils需要插入beanutils和logging两个jar包BeanUtils.setProperty(t, columnName, value);}//添加数据
            list.add(t);}return list.size()!=0? list :null;}@Overridepublicvoidclose()throwsException{super.close();//关闭资源//关闭redis连接//关闭mysql连接}})
标签: flink 大数据 java

本文转载自: https://blog.csdn.net/qq_43673709/article/details/139629757
版权归原作者 慕寒এꦿ᭄ 所有, 如有侵权,请联系我们删除。

“flink用法详解”的评论:

还没有评论