Flink
- 无界流 (Unbounded Streams): 数据流理论上没有终点,持续不断地流入系统。Flink 会连续地处理这些事件,即使在处理过程中新的数据还在不断到来。
- 有界流 (Bounded Streams): 数据流有一个明确的起点和终点,处理完所有数据后任务即结束。Flink 可以像处理流一样处理批数据,采用相同的 API 并提供高效执行。
编写一个Flink步骤
- 创建一个flink执行环境
- 创建一个数据来源 source
- 编写流操作 transformations
- 数据流出口 sink
- 执行任务 execute
publicclassSimpleFlinkJob{publicstaticvoidmain(String[] args)throwsException{// 创建执行环境finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置 Kafka 消费者参数Properties kafkaProps =newProperties();
kafkaProps.setProperty("bootstrap.servers","localhost:9092");
kafkaProps.setProperty("group.id","testGroup");// 创建 Kafka 数据源FlinkKafkaConsumer<String> kafkaSource =newFlinkKafkaConsumer<>("input-topic",newSimpleStringSchema(), kafkaProps);// 从 Kafka 中读取数据流DataStream<String> stream = env.addSource(kafkaSource);// 定义数据转换操作,这里是对字符串计数DataStream<Tuple2<String,Integer>> counts = stream
.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value){returnnewTuple2<>(value,1);}}).keyBy(0).sum(1);// 打印输出结果
counts.print().setParallelism(1);// 执行任务
env.execute("Simple Flink Job");}
Source:flink的数据来源
- 集合
*** 一般用于学习测试时使用*1.env.fromElements(可变参数);*2.env.fromColletion(各种集合);*3.env.generateSequence(开始,结束);*4.env.fromSequence(开始,结束);**@param args 基于集合*@throwsException*/// sourceDataStream<String> ds1 = env.fromElements("i am alanchan","i like flink");DataStream<String> ds2 = env.fromCollection(Arrays.asList("i am alanchan","i like flink"));DataStream<Long> ds3 = env.generateSequence(1,10);//已过期,使用fromSequence方法DataStream<Long> ds4 = env.fromSequence(1,10);
- 文件
/** * env.readTextFile(本地/HDFS文件/文件夹),压缩文件也可以 */DataStream<String> ds1 = env.readTextFile("D:src/main/resources/words.txt");DataStream<String> ds2 = env.readTextFile("D:/workspace/distribute_cache_student");DataStream<String> ds3 = env.readTextFile("D:/words.tar.gz");DataStream<String> ds4 = env.readTextFile("hdfs://server2:8020///flinktest/wc-1688627439219");
- socket
/** * @author alanchan * 在192.168.10.42上使用nc -lk 9999 向指定端口发送数据 * nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据 * 如果没有该命令可以下安装 yum install -y nc * */DataStream<String> lines = env.socketTextStream("192.168.10.42",9999);
- kafka
/**
* 参数解释:
* -bs broker 地址
* -kcg kafka consumer group
* -it kafka 输入数据 topic
* -ct 是否自动创建 topic
* -pt topic 分区数
* -rf topic 副本数
*/finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();finalMultipleParameterTool cmd =MultipleParameterTool.fromArgs(args);finalString bootstrapServer = cmd.get("bs","localhost:9092");finalString kafkaConsumerGroup = cmd.get("kcg","flink-consumer");finalString inputTopic = cmd.get("it","quickstart-events");finalboolean createTopic = cmd.getBoolean("ct",false);
log.info("broker is {} and topic is {}", bootstrapServer, inputTopic);// 如果 topic 不存在,并且开启了由 flink 任务创建 TOPIC。默认不开启,一般情况下,部署人员应当根据实际情况设置不同topic的并行度,副本数if(createTopic){finalint partitions = cmd.getInt("pt",1);finalshort replicationFactor = cmd.getShort("rf",(short)1);createTopic(bootstrapServer, inputTopic, partitions, replicationFactor);}finalKafkaSource<String> kafkaSource =KafkaSource.<String>builder().setGroupId(kafkaConsumerGroup).setStartingOffsets(OffsetsInitializer.latest()).setBootstrapServers(bootstrapServer).setTopics(inputTopic).setValueOnlyDeserializer(newSimpleStringSchema()).build();finalDataStreamSource<String> kafkaStream = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"Kafka Source");
总结:
env.addSource(sourceFunction) 适用于需要自定义数据源逻辑、处理非标准数据源或对数据源控制有特殊需求的情况。使用时需要自行实现 SourceFunction,处理数据读取、错误恢复、并行化等细节。
env.fromSource(source) 适用于对接已知、常见的数据源,如 Kafka、文件、数据库等,利用 Flink 提供的预封装数据源类。这种方式简化了开发过程,提供了更好的容错性和易用性,但可能不支持所有定制化需求。
- 自定义SourceFlink提供了数据源接口,实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:
SourceFunction,非并行数据源,并行度只能=1 RichSourceFunction,多功能非并行数据源,并行度只能=1 ParallelSourceFunction,并行数据源,并行度能够>=1 RichParallelSourceFunction,多功能并行数据源,并行度能够>=1
publicclassCustomMySQLSourceextendsRichParallelSourceFunction<User>{privateboolean flag =true;privateConnection conn =null;privatePreparedStatement ps =null;privateResultSet rs =null;@Overridepublicvoidopen(Configuration parameters)throwsException{
conn =DriverManager.getConnection("jdbc:mysql://server4:3306/test?useUnicode=true&characterEncoding=UTF-8","root","123456");String sql ="select id,name,pwd,email,age,balance from user";
ps = conn.prepareStatement(sql);}@Overridepublicvoidrun(SourceContext<User> ctx)throwsException{while(flag){
rs = ps.executeQuery();while(rs.next()){User user =newUser(rs.getInt("id"), rs.getString("name"), rs.getString("pwd"), rs.getString("email"), rs.getInt("age"), rs.getDouble("balance"));
ctx.collect(user);}//每5秒查询一次数据库Thread.sleep(5000);}}@Overridepublicvoidcancel(){
flag =false;}}
DataStream<User> userDS = env.addSource(newCustomMySQLSource()).setParallelism(1);
transformations
1.FlatMap 算子
作用:操作又称为扁平映射,主要用于将数据流中的整体(通常是集合类型)拆分成单个个体。与 Map() 算子不同,FlatMap() 可以产生 0 到多个元素,意味着对于输入数据流中的每个元素,FlatMap() 可以根据定义的处理逻辑生成一个或多个输出元素。这种操作在大数据处理中非常有用,特别是在需要对集合类型数据进行拆分和转换的场景中。
使用:实现FlatMapFunction类,param1=数据流操作前对象 ,param2=数据流操作后对象流出
.flatMap(newFlatMapFunction<Event,Event>(){@OverridepublicvoidflatMap(Event event,Collector<Event> collector)throwsException{if(event.user.equals("Marry"))
collector.collect(event);}}).flatMap(newMyflatMapFilter())publicstaticclassMyflatMapFilter imlements FlatMapFunction<Event,Event>{@OverridepublicvoidflatMap(Event event,Collector<Event> collector)throwsException{if(event.user.equals("Marry"))
collector.collect(event);}}
2.Map 算子
作用:对数据流中的每个元素应用一个函数,产生一个新的数据流。
使用:实现MapFunction类,param1=数据流操作前对象 ,param2=数据流操作后对象流出
.map(newMapFunction<Event,Event>(){@OverridepublicEventmap(Event event,Event event)throwsException{return event;})publicstaticclassMyMapFilter imlements MapFunction<Event,Event>{@OverridepublicEventmap(Event event,Event event)throwsException{return event;})}
3.Filter 算子
作用:根据提供的条件过滤出数据流中的元素。
使用:实现FilterFunction类
.filter(newFilterFunction<Event>(){@Overridepublicbooleanfilter(Event event)throwsException{return event.user.equals("Marry");}})publicstaticclassMyFilter imlements FilterFunction<Event>{@Overridepublicbooleanfilter(Event event)throwsException{return event.user.equals("Marry");}}
DataStream→KeyedStream:
逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。从逻辑上分区去对这些数据进行处理,物理上的位置无关紧要。不过最终同一个Key中的数据一定在一个任务槽中,这样会出现数据倾斜的问题。
4.KeyBy 算子
作用:对数据流进行分区,确保具有相同键的元素发送到同一个并行任务中。
方法:keyBy 通过指定 key 来将 DataStream 转换成 KeyedStream。基于不同的 key,流中的事件将被分配到不同的分区中去。所有具有相同 key 的事件将会在接下来的操作符的同一个子任务槽中进行处理。拥有不同 key 的事件可以在同一个任务中处理。但是算子只能访问当前事件的 key 所对应的状态。keyBy() 方法接收一个参数,这个参数指定了 key 或者 keys,有很多不同的方法来指定 key
.keyBy(newKeySelector<WordWithCount,String>(){publicStringgetKey(WordWithCount value)throwsException{return value.word;}});
5.滚动聚合算子
只要存在分组,就一定存在聚合,所以提出了滚动聚合的概念
概念:滚动聚合算子由 KeyedStream 调用,并生成一个聚合以后的 DataStream,例如:sum,minimum,maximum。一个滚动聚合算子会为每一个观察到的 key 保存一个聚合的值。针对每一个输入事件,算子将会更新保存的聚合结果,并发送一个带有更新后的值的事件到下游算子。滚动聚合不需要用户自定义函数,但需要接受一个参数,这个参数指定了在哪一个字段上面做聚合操作。
使用:滚动聚合算子无法组合起来使用,每次计算只能使用一个单独的滚动聚合算子。
sum():在输入流上对指定的字段做滚动相加操作。 min():在输入流上对指定的字段求最小值。 max():在输入流上对指定的字段求最大值。 minBy():在输入流上针对指定字段求最小值,并返回包含当前观察到的最小值的事件。 maxBy():在输入流上针对指定字段求最大值,并返回包含当前观察到的最大值的事件。
6.reduce
概念:它将一个 ReduceFunction 应用到了一个 KeyedStream 上面去。reduce 算子将会把每一个输入事件和当前已经 reduce 出来的值做聚合计算。reduce 操作不会改变流的事件类型。输出流数据类型和输入流数据类型是一样的。
实现:reduce 函数可以通过实现接口 ReduceFunction 来创建一个类。ReduceFunction 接口定义了 reduce() 方法,此方法接收两个输入事件,输出一个相同类型的事件。
使用:ReduceFunction(流的对象类型 )param1 = 数据流操作对象 ,param2 = 当前已经 reduce 出来的值 return :一个相同类型的事件
keyedStream
.reduce(newReduceFunction<Tuple2<Integer,Integer>>(){@OverridepublicTuple2<Integer,Integer>reduce(Tuple2<Integer,Integer> value1,Tuple2<Integer,Integer> value2)throwsException{returnTuple2.of(value1.f0,value1.f1 + value2.f1);}})
7.Window 算子
作用:将数据流划分为有限大小的窗口,并对窗口内的数据进行聚合或其他操作。
// Java API 示例,对数据流按 event time 进行滑动窗口处理DataStream<Tuple2<String,Integer>> windowedCounts = words
.keyBy(0).timeWindow(Time.seconds(10))// 10 秒滑动窗口.sum(1);// 对第二个字段求和
8.Join 算子
作用:连接两个数据流,基于指定的键进行内连接、外连接等操作。
// Java API 示例DataStream stream1 =...;DataStream stream2 =...;DataStream<Tuple2<String,Tuple2<String,String>>> joinedStreams = stream1
.join(stream2).where(value -> value.length())// 指定第一个流的 join key.equalTo(value -> value.length())// 指定第二个流的 join key.apply(newJoinFunction<String,String,Tuple2<String,String>>(){@OverridepublicTuple2<String,String>join(String first,String second){returnnewTuple2<>(first, second);}});
9.Union 算子
作用:将两个数据流合并为一个
DataStream streamA =...;DataStream streamB =...;DataStream combinedStream = streamA.union(streamB);
10.富函数类(Rich Function Classes)
“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
- open()方法:是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
- close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
- 需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map,在每条数据来到后都会触发一次调用。
使用:在流过程中操作其他,通过mysql获取黑名单,通过redis获取白名单 等操作
publicstaticclassMyFilterextendsRichMapFunction<Integer,Integer>{//因为使用的是redis集群所以使用 JedisCluster
privde JedisCluster jedis;//mysql 连接
privde PerardStatement statement;
privde Connection conn;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);//1.可以操作链接redisSet<HostAndPort> jedisClusterNode =newHashSet<>;String host ="127.0.0.1"Integer port =6380;
jedisClusterNode.add(newHostAndPort(host, port));
jedis =newJedisCluster(jedisClusterNode);//2.操作连接MySqlProperties properties =newProperties();//Properties可以保存到流中或从流中加载,属性列表中的键及其对应的值都是字符串。//数据都在properties 对象中
properties.load(newFileInputStream("src/db.properties"));
url = properties.getProperty("url");
user = properties.getProperty("user");
password = properties.getProperty("password");String diver = properties.getProperty("driver");Class.forName(diver);
connection =DriverManager.getConnection(url, user, password);////创建一个PreparedStatement对象,用于将参数化的SQL语句发送到数据库。//例如添加数据String sql ="elect * from work where id = ? and name = ?";
statement = connection.prepareStatement(sql);//3.发起HTTP请求}@OverridepublicIntegermap(Integer integer)throwsException{//重写需要过滤的逻辑//获取预处理的搬运工对象的元数据对象的个数(参数型sql的?个数)int columnCount1 = preparedStatement.getParameterMetaData().getParameterCount();//将数据赋值给sql的参数//此时数据在preparedStatement
preparedStatement.setInt(1,1);
preparedStatement.setString(2,"张三");//声明一个数组,后面用数组对象存储查询到的值,然后输出List<T> list =newArrayList<>();//查询生成一个ResultSet对象,其中包含查询产生的数据
resultSet = preparedStatement.executeQuery();//获取结果集元数据ResultSetMetaData metaData = resultSet.getMetaData();//获取结果集元数据的列的个数(下面就是结果集元数据)/**
*id name age info
* 1 张三 18 喜欢玩游戏
* 2 李四 22 喜欢打篮球
* 3 王五 24 喜欢唱跳Rap
* 4 老六 26 喜欢偷袭
* 5 赵七 28 喜欢散步
*/int columnCount2 = metaData.getColumnCount();//用过双层循环获取数据//resultSet.next()有点像迭代器,也有光标,会随着循环下移while(resultSet.next()){//通过Class文件反射声明一个对象,用于存放数据T t = cls.getConstructor(null).newInstance(null);for(int i =1; i <= columnCount2; i++){//在元数据中获取列的名字String columnName = metaData.getColumnName(i);//通过列的名字,用结果集获取该列的值Object value = resultSet.getObject(columnName);//通过BeanUtils的jar包下的setProperty(数组, 属性名, 值)方法插入数据//使用BeanUtils需要插入beanutils和logging两个jar包BeanUtils.setProperty(t, columnName, value);}//添加数据
list.add(t);}return list.size()!=0? list :null;}@Overridepublicvoidclose()throwsException{super.close();//关闭资源//关闭redis连接//关闭mysql连接}})
版权归原作者 慕寒এꦿ᭄ 所有, 如有侵权,请联系我们删除。