0


flink与kafka基础知识

一、实时计算:

1.大数据离线实时框架 流程:(采集,存储,处理,应用)

  • 做项目的流程:第一步:数据采集,第二步:数据存储,第三步:数据处理,最后就是数据应用(对最后结果进行使用)

2.数据和业务?

  1. 以前的系统都是对事务进行处理的,事务可以理解为一些事情。增删改查
  2. 从数据挖掘价值推动决策
  3. 数据时效性:数据的价值随着时间延迟迅速降低。
  4. 越快越好,越快越有竞争优势,大数据实时化——实时计算
  5. spark Streaming底层也是批处理,job由时间决定的。

3.离线计算和实时计算的区别

  1. 离线计算是用户启动的,数据已经准备好了,已完成采集,等待数据,加载数据,处理结果。
  2. 实时计算要等待数据过来,才会立马处理。是事件驱动

4.主流实时计算框架对比

** 模型API保证次数容错机制延时吞吐量批流统一业务模式**易用性StormNative组合式At-least-onceRecord ACKs★★★★不支持需要其他框架★Spark StreamingMirco-batching声明式Exectly-onceRDD Checkpoint★(批处理)慢★★★支持需要其他框架★★Apache FlinkNative组合式Exectly-onceCheckpoint★★★★★★支持需要其他框架★★

  1. Apache Flink 组合式:source,transfrom(转换),sink
  2. 保证次数:2.1、At-least-once:至少一次2.2、Exectly-once:完全一次
  3. Checkpoint(容错),失败了可以找到最新的状态恢复。
  4. 状态:状态跟时间有关系,某一时刻的结果。状态是做容错考虑的。

5.Spark Streaming微批处理和Flink流式处理

  1. 微批处理可能会导致雪崩
  2. 流式处理一条一条处理数据,会导致某一时间段延迟会很大,但不会导致雪崩。

二、flink概念

1.什么是flink?

  1. Flink是一个分布式实时计算框架。用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
  2. 有状态:有状态计算是将当前批次结果加上上一批次计算的结果。
  3. 无界流 有定义流的开始,但没有定义流的结束(没有边界)。它们会无休止地产生数据。流处理。
  4. 有界流 有定义流的开始,也有定义流的结束。有界流(假如1000条数据),即可批处理(一次处理一批),也可以流处理(一条一条处理)。
  5. 任意规模进行计算:指的是flink主从节点。jobmanager:driver端;task manager:executor端。

2.flink特性

3.Flink底层原理

spark与flink底层原理对比

  1. spark底层基于mapreduce,要先执行map端再执行reduce端,延迟高。
  2. flink底层结构重新设计,持续流。

4.flink在流处理和批处理上的source

5.flink并行度

  1. 一个任务对应一个并行度,每个并行度对应一个槽位
  2. 并行度根据吞吐量决定的,Task Slot数量 是由任务中最大的并行度决定,Task的数量由并行度以及有无Shuffle一起决定
  3. flink并行度设置的集中方式1、通过env设置,不推荐,如果需要台调整并行度得修改代码重新打包提交任务2、每个算子可以单独设置并行度,视实际情况决定,一般不常用3、还可以在提交任务的时候指定并行度,最常用 比较推荐的方式web UI:填写parallelism输入框即可设置,优先级:算子本身的设置 > env做的全局设置 > 提交任务时指定的 > 配置文件flink-conf.yaml
  4. Flink中的任务被分为多个并行任务来执行,其中每个并行的实例处理一部分数据。这些并行实例的数量被称为并行度。我们在实际生产环境中可以从四个不同层面设置并行度,并行度直接决定了Flink程序需要申请多少资源

6.事件时间

事件时间:数据自带的时间(是由数据自带的时间去触发窗口计算) 处理时间:数据时间以系统时间为准。

为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。其通常通过使用

  1. TimestampAssigner

API 从元素中的某个字段去访问/提取时间戳。

时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定

  1. WatermarkGenerator

来配置 watermark 的生成方式。

7.窗口

窗口三大类:时间,计算,会话

  1. stream
  2. .keyBy(...) <- keyed 窗口需要
  3. .window(...) <- 必填项:"assigner"
  4. [.trigger(...)] <- 可选项:"trigger" (省略则使用默认 trigger)
  5. [.evictor(...)] <- 可选项:"evictor" (省略则不使用 evictor)
  6. [.allowedLateness(...)] <- 可选项:"lateness" (省略则为 0)
  7. [.sideOutputLateData(...)] <- 可选项:"output tag" (省略则不对迟到数据使用 side output)
  8. .reduce/aggregate/apply() <- 必填项:"function"
  9. [.getSideOutput(...)] <- 可选项:"output tag"
7.1滚动窗口

滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建

7.2滑动窗口

与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据

8、Flink 中的Watermark是什么概念,起到什么作用

  • Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制, 本质上是一种时间戳。 一般来讲Watermark经常和Window一起被用来处理乱序事件。
  • 水位线默认等于最新数据的时间戳,水位线只能增长不能降低。
  • 由于数据在传输的过程中可能会乱序,为了解决乱序问题,可以将水位线前移,延迟窗口的计算,避免数据丢失

三、flink代码

1.flink core

  1. public class Demo01StreamWordCount {
  2. /**
  3. * flink整体代码大致分为四块
  4. * 第一部分:构建flink环境
  5. * 第二部分:构建第一个DStream
  6. * 第三部分:DStream之间的转换
  7. * 第四部分:打印得到结果进行保存
  8. */
  9. public static void main(String[] args) throws Exception {
  10. //1.构建flink环境
  11. //ExecutionEnvironment.getExecutionEnvironment();执行环境。获取执行环境
  12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. env.setParallelism(2);
  14. env.setBufferTimeout(200);
  15. //2.通过Socket(套接字)模拟无界流环境,方便flink处理
  16. //虚拟机启动:nc-lk 10086 //cpu核数有关
  17. //从source构建第一个DataStream
  18. DataStream<String> lineDS = env.socketTextStream("master", 10086);
  19. System.out.println("lineDS并行度:"+lineDS.getParallelism());
  20. //统计每个单词的数量
  21. //第一步:将每行数据的每个单词进行扁平化处理
  22. //<String, Integer>输入数据类型和输出数据类型
  23. DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
  24. /**
  25. *
  26. * @param line DS中的一条数据
  27. * @param out 通过collect方法将数据发送到下游
  28. * @throws Exception
  29. */
  30. @Override
  31. public void flatMap(String line, Collector<String> out) throws Exception {
  32. for (String word : line.split(",")) {
  33. out.collect(word);
  34. }
  35. }
  36. });
  37. System.out.println("wordsDS并行度:"+wordsDS.getParallelism());
  38. // 第二步:将每个单词变成 KV格式,V置为1
  39. DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
  40. @Override
  41. public Tuple2<String, Integer> map(String word) throws Exception {
  42. return Tuple2.of(word, 1);
  43. }
  44. });
  45. System.out.println("wordKVDS并行度:"+wordKVDS.getParallelism());
  46. // 第三步:按每一个单词进行分组
  47. // keyBy之后数据流会进行分组,相同的key会进入同一个线程中被处理
  48. // 传递数据的规则:hash取余(线程总数,默认CPU的总线程数)原理
  49. KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
  50. @Override
  51. public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
  52. return tuple2.f0;
  53. }
  54. });
  55. System.out.println("keyedDS并行度:" + keyedDS.getParallelism());
  56. // 第四步:对1进行聚合sum
  57. DataStream<Tuple2<String, Integer>> wordCutDS = keyedDS.sum(1);
  58. System.out.println("wordCntDS并行度:" + wordCutDS.getParallelism());
  59. //打印结果:将DS中的内容sink到控制台
  60. wordsDS.print();
  61. env.execute();
  62. }
  63. }
  1. public class Demo02BatchWordCount {
  2. public static void main(String[] args) throws Exception {
  3. //1.构建环境
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. //设置flink的处理方式:默认是流处理
  6. /**
  7. * BATCH:批处理,只能处理有界流,底层是MR模型,可以进行预聚合
  8. * STREAMING:流处理,可以处理有界流,也可以处理无界流,底层是持续流
  9. * AUTOMATIC:自动判断,当所有的Source都是有界流则使用BATCH,当Source有一个是无界流则会使用STREAMING模式
  10. */
  11. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  12. //2.获得第一个DS
  13. // 通过readTextFile可以基于文件构建有界流
  14. DataStream<String> wordsFileDS = env.readTextFile("flink/data/words.txt");
  15. //3.DS之间的转换
  16. // Flink处理逻辑传入的方式
  17. // new XXXFunction 使用匿名内部类
  18. DataStream<String> wordsDS = wordsFileDS.flatMap(new FlatMapFunction<String, String>() {
  19. @Override
  20. public void flatMap(String line, Collector<String> out) throws Exception {
  21. for (String word : line.split(",")) {
  22. // 将每个单词发送到下游
  23. out.collect(word);
  24. }
  25. }
  26. });
  27. // 使用lambada表达式
  28. /**
  29. * ()->{}
  30. * 通过 -> 分隔,左边是函数的参数,右边是函数实现的具体逻辑
  31. */
  32. DataStream<String> wordDS = wordsFileDS.flatMap((line, out) -> {
  33. for (String word : line.split(",")) {
  34. out.collect(word);
  35. }
  36. }, Types.STRING);
  37. // 第二步:将每个单词变成 KV格式,V置为1
  38. DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
  39. @Override
  40. public Tuple2<String, Integer> map(String word) throws Exception {
  41. return Tuple2.of(word, 1);
  42. }
  43. });
  44. // 第三步:按每一个单词进行分组
  45. // keyBy之后数据流会进行分组,相同的key会进入同一个线程中被处理
  46. // 传递数据的规则:hash取余(线程总数,默认CPU的总线程数)原理
  47. KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
  48. @Override
  49. public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
  50. return tuple2.f0;
  51. }
  52. });
  53. KeyedStream<Tuple2<String, Integer>, String> keyDS = wordKVDS.keyBy(kv -> kv.f0, Types.STRING);
  54. // 第四步:对1进行聚合sum
  55. DataStream<Tuple2<String, Integer>> wordCutDS = keyDS.sum(1);//索引,第二个位置
  56. wordCutDS.print();
  57. //4.最终结果的处理
  58. env.execute();
  59. }
  60. }
  1. public class Demo03Parallelism {
  2. public static void main(String[] args) throws Exception {
  3. /*
  4. * 如何设置并行度?
  5. * 1、考虑吞吐量
  6. * 有聚合操作的任务:1w条/s 一个并行度
  7. * 无聚合操作的任务:10w条/s 一个并行度
  8. * 2、考虑集群本身的资源
  9. * Task的数量由并行度以及有无Shuffle一起决定
  10. *
  11. * Task Slot数量 是由任务中最大的并行度决定
  12. * TaskManager的数量由配置文件中每个TaskManager设置的Slot数量及任务所需的Slot数量一起决定
  13. */
  14. // FLink 并行度设置的集中方式
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. // 1、通过env设置,不推荐,如果需要调整并行度得修改代码重新打包提交任务
  17. // env.setParallelism(2);
  18. DataStreamSource<String> ds = env.socketTextStream("master", 8888);
  19. // 2、每个算子可以单独设置并行度,视实际情况决定,一般不常用
  20. SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = ds.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT))
  21. .setParallelism(4);
  22. SingleOutputStreamOperator<Tuple2<String, Integer>> wordCntDS2P = kvDS.keyBy(kv -> kv.f0)
  23. .sum(1)
  24. .setParallelism(2);
  25. // 如果算子不设置并行度则以全局为准
  26. wordCntDS2P.print();
  27. // 3、还可以在提交任务的时候指定并行度,最常用 比较推荐的方式
  28. // 命令行:flink run 可以通过 -p 参数设置全局并行度
  29. // web UI:填写parallelism输入框即可设置,优先级:算子本身的设置 > env做的全局设置 > 提交任务时指定的 > 配置文件flink-conf.yaml
  30. env.execute();
  31. }
  32. }
  1. public class Demo04EventTime {
  2. public static void main(String[] args) throws Exception {
  3. // 事件时间:数据本身自带的时间
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. // 并行度
  6. env.setParallelism(1);
  7. /*
  8. 数据格式:单词,时间戳(很大的整数,Long类型)
  9. a,1722233813000
  10. a,1722233814000
  11. a,1722233815000
  12. */
  13. DataStreamSource<String> wordTsDS = env.socketTextStream("master", 8888);
  14. SingleOutputStreamOperator<Tuple2<String, Long>> mapDS = wordTsDS
  15. .map(line -> Tuple2.of(line.split(",")[0], Long.parseLong(line.split(",")[1])), Types.TUPLE(Types.STRING, Types.LONG));
  16. // 指定数据的时间戳,告诉Flink,将其作为事件时间进行处理
  17. //assignTimestampsAndWatermarks 分配时间戳和水印
  18. //WatermarkStrategy 水印策略
  19. SingleOutputStreamOperator<Tuple2<String, Long>> assDS = mapDS
  20. .assignTimestampsAndWatermarks(//分配时间戳和生成水印
  21. WatermarkStrategy
  22. // // 单调递增时间戳策略,不考虑数据乱序问题
  23. // .<Tuple2<String, Long>>forMonotonousTimestamps()
  24. /**
  25. * 容忍5s的数据乱序到达,本质上将水位线前移5s,缺点:导致任务延时变大
  26. * 水位线:某个线程中所接收到的数据中最大的时间戳
  27. * forBoundedOutOfOrderness 有界乱序数据
  28. * forMonotonousTimestamps 单调时间戳
  29. * Duration 持续时间
  30. */
  31. .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
  32. // 可以提取数据的某一部分作为事件时间
  33. .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
  34. @Override
  35. public long extractTimestamp(Tuple2<String, Long> t2, long recordTimestamp) {
  36. return t2.f1;
  37. }
  38. })
  39. );
  40. // 不管是事件时间还是处理时间都需要搭配窗口操作一起使用
  41. assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT))
  42. .keyBy(t2 -> t2.f0)
  43. // 窗口触发的条件:水位线超过了窗口的结束时间
  44. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  45. .sum(1)
  46. .print();
  47. env.execute();
  48. }
  49. }

2.flink source

2.1、基于集合的source
  1. public class Demo01ListSource {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. // 本地集合Source
  5. ArrayList<String> arrList = new ArrayList<>();
  6. arrList.add("flink");
  7. arrList.add("flink");
  8. arrList.add("flink");
  9. arrList.add("flink");
  10. arrList.add("flink");
  11. arrList.add("flink");
  12. // 有界流
  13. DataStream<String> lineDS = env.fromCollection(arrList);
  14. lineDS.print();
  15. env.execute();
  16. }
  17. }
2.2、基于文件的source
  1. public class Demo02FileSource {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. // 历史版本读文件的方式,有界流
  5. DataStream<String> oldFileDS = env.readTextFile("flink/data/words.txt");
  6. // oldFileDS.print();
  7. // 新版本加载文件的方式:FileSource,默认是有界流
  8. FileSource<String> fileSource = FileSource
  9. .forRecordStreamFormat(
  10. new TextLineInputFormat()
  11. , new Path("flink/data/words.txt")
  12. )
  13. .build();
  14. // 从Source加载数据构建DS
  15. DataStream<String> fileSourceDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
  16. // fileSourceDS.print();
  17. // 将读取文件变成无界流
  18. FileSource<String> fileSource2 = FileSource
  19. .forRecordStreamFormat(
  20. new TextLineInputFormat()
  21. , new Path("flink/data/words")
  22. )
  23. // 类似Flume中的spool dir,可以监控一个目录下文件的变化
  24. .monitorContinuously(Duration.ofSeconds(4))
  25. .build();
  26. DataStream<String> fileSourceDS2 = env.fromSource(fileSource2,WatermarkStrategy.noWatermarks(),"fileSource2");
  27. fileSourceDS2.print();
  28. env.execute();
  29. }
  30. }
2.3、基于自定义的source
  1. public class Demo03MySource {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. DataStream<String> mySourceDS = env.addSource(new MySource());
  5. mySourceDS.print();
  6. env.execute();
  7. }
  8. }
  9. class MySource implements SourceFunction<String>{
  10. // Source启动时会执行
  11. // run方法如果会结束,则Source会得到一个有界流
  12. // run方法如果不会结束,则Source会得到一个无界流
  13. @Override
  14. public void run(SourceContext<String> ctx) throws Exception {
  15. System.out.println("run方法启动了");
  16. // ctx 可以通过collect方法向下游发送数据
  17. long cnt=0l;
  18. while (true){
  19. // ctx.collect(cnt+"");
  20. cnt++;
  21. // 休眠一会
  22. Thread.sleep(1000);
  23. }
  24. }
  25. // Source结束时会执行
  26. @Override
  27. public void cancel() {
  28. System.out.println("Source结束了");
  29. }
  30. }

3.flink transformation

算子数据流转换,数据转换的各种操作,将数据转换计算成想要的数据。

3.1、Map:输入一个元素同时输出一个元素
  1. public class Demo01Map {
  2. public static void main(String[] args) throws Exception {
  3. // 传入一条数据返回一条数据
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. DataStream<String> ds = env.socketTextStream("master", 8888);
  6. // 1、使用匿名内部类
  7. DataStream<Tuple2<String, Integer>> mapDS = ds.map(new MapFunction<String, Tuple2<String, Integer>>() {
  8. @Override
  9. public Tuple2<String, Integer> map(String word) throws Exception {
  10. return Tuple2.of(word, 1);
  11. }
  12. });
  13. // mapDS.print();
  14. // 2、使用lambda表达式
  15. DataStream<Tuple2<String, Integer>> mapDS2 = ds.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));
  16. mapDS2.print();
  17. env.execute();
  18. }
  19. }
3.2、flatMap:输入一个元素同时产生零个、一个或多个元素
  1. public class Demo02FlatMap {
  2. public static void main(String[] args) throws Exception {
  3. // 传入一条数据返回多条数据,类似UDTF函数
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. DataStreamSource<String> ds = env.socketTextStream("master", 8888);
  6. // 1、使用匿名内部类
  7. SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  8. @Override
  9. public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
  10. for (String word : line.split(",")) {
  11. out.collect(Tuple2.of(word, 1));
  12. }
  13. }
  14. });
  15. // flatMapDS.print();
  16. // 2、使用lambda表达式
  17. SingleOutputStreamOperator<Tuple> flatMapDS2 = ds.flatMap((line, out) -> {
  18. for (String word : line.split(",")) {
  19. out.collect(Tuple2.of(word, 1));
  20. }
  21. }, Types.TUPLE(Types.STRING, Types.INT));
  22. flatMapDS2.print();
  23. env.execute();
  24. }
  25. }
3.3、filter:过滤,为每个元素执行一个布尔 function
  1. public class Demo03Filter {
  2. public static void main(String[] args) throws Exception {
  3. // 过滤数据,注意返回值必须是布尔类型,返回true则保留数据,返回false则过滤数据
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. DataStreamSource<String> ds = env.socketTextStream("master", 8888);
  6. // 只输出大于10的数字
  7. SingleOutputStreamOperator<String> ftDS = ds.filter(new FilterFunction<String>() {
  8. @Override
  9. public boolean filter(String value) throws Exception {
  10. return Integer.parseInt(value) > 10;
  11. }
  12. });
  13. // ftDS.print();
  14. ds.filter(value->Integer.parseInt(value)>10).print();
  15. env.execute();
  16. }
  17. }
3.4、keyBy:分组,具有相同 key 的记录都分配到同一个分区
  1. public class Demo04KeyBy {
  2. public static void main(String[] args) throws Exception {
  3. // 用于就数据流分组,让相同的Key进入到同一个任务中进行处理,后续可以跟聚合操作
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. DataStreamSource<String> ds = env.socketTextStream("master", 8888);
  6. KeyedStream<String, String> keyByDS = ds.keyBy(new KeySelector<String, String>() {
  7. @Override
  8. public String getKey(String value) throws Exception {
  9. return value;
  10. }
  11. });
  12. // keyByDS.print();
  13. //lambda表达式
  14. ds.keyBy(value->value,Types.STRING).print();
  15. ds.keyBy(value->value.toLowerCase(),Types.STRING).print();
  16. // ds.keyBy(String::toLowerCase,Types.STRING).print();
  17. env.execute();
  18. }
  19. }
3.5、 reduce:在相同 key 的数据流上“滚动”执行 reduce
  1. public class Demo05Reduce {
  2. public static void main(String[] args) throws Exception {
  3. // 用于对KeyBy之后的数据流进行聚合计算
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. DataStream<String> ds = env.socketTextStream("master", 8888);
  6. // 统计班级的平均年龄
  7. SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> kvDS = ds.map(line -> {
  8. String[] split = line.split(",");
  9. String clazz = split[0];
  10. int age = Integer.parseInt(split[1]);
  11. return Tuple3.of(clazz, age, 1);
  12. }, Types.TUPLE(Types.STRING, Types.INT, Types.INT));
  13. KeyedStream<Tuple3<String, Integer, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0, Types.STRING);
  14. keyByDS.reduce(new ReduceFunction<Tuple3<String, Integer, Integer>>() {
  15. @Override
  16. public Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> v1, Tuple3<String, Integer, Integer> v2) throws Exception {
  17. return Tuple3.of(v1.f0,v1.f1+v2.f1,v1.f2+v2.f2);
  18. }
  19. }).map(t3->Tuple2.of(t3.f0,(double)t3.f1/t3.f2),Types.TUPLE(Types.STRING,Types.DOUBLE))
  20. .print();
  21. keyByDS.reduce((v1,v2)->Tuple3.of(v1.f0,v1.f1 + v2.f1, v1.f2 + v2.f2)).print();
  22. env.execute();
  23. }
  24. }
3.6、window:可以在已经分区的 KeyedStreams 上定义 Window
  1. public class Demo06Window {
  2. public static void main(String[] args) throws Exception {
  3. // Flink窗口操作:时间、计数、会话
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. DataStream<String> ds = env.socketTextStream("master", 8888);
  6. SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = ds.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));
  7. // 每隔5s钟统计每个单词的数量 ---> 滚动窗口实现
  8. SingleOutputStreamOperator<Tuple2<String, Integer>> winDS = kvDS.keyBy(kv -> kv.f0, Types.STRING)
  9. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  10. .sum(1);
  11. // winDS.print();
  12. // 每隔5s钟统计最近10s内的每个单词的数量 ---> 滑动窗口实现
  13. kvDS.keyBy(kv->kv.f0,Types.STRING)
  14. .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
  15. .sum(1).print();
  16. env.execute();
  17. }
  18. }

3.7、union:将两个或多个数据流联合来创建一个包含所有流中数据的新流

  1. public class Demo07Union {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. DataStream<String> ds01 = env.socketTextStream("master", 8888);
  5. DataStream<String> ds02 = env.socketTextStream("master", 6666);
  6. DataStream<String> unionDS = ds01.union(ds02);
  7. // union 就是将两个相同结构的DS合并成一个DS
  8. unionDS.print();
  9. env.execute();
  10. }
  11. }

3.8、process:处理函数

  1. public class Demo08Process {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. DataStream<String> ds01 = env.socketTextStream("master", 5555);
  5. SingleOutputStreamOperator<Object> pDS = ds01.process(new ProcessFunction<String, Object>() {
  6. /*
  7. * 每进来一条数据就会执行一次
  8. * value :一条数据
  9. * ctx:可以获取任务执行时的信息
  10. * out:用于输出数据
  11. */
  12. @Override
  13. public void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) throws Exception {
  14. // 通过processElement实现Map算子操作
  15. out.collect(Tuple2.of(value, 1));
  16. // 通过processElement实现flatMap算子操作
  17. for (String word : value.split(",")) {
  18. out.collect(word);
  19. }
  20. // 通过processElement实现filter算子操作
  21. if ("flink".equals(value)) {
  22. out.collect("flink yes");
  23. }
  24. }
  25. });
  26. pDS.print();
  27. }
  28. }

4.flink sink

4.1、写入文件
  1. public class Demo01FileSink {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. DataStreamSource<String> linesDS = env.socketTextStream("master", 8888);
  5. // 构建FileSink
  6. FileSink<String> flinkSink = FileSink.forRowFormat(
  7. new Path("flink/data/fileFlink"),
  8. new SimpleStringEncoder<String>("UTF-8"))
  9. .withRollingPolicy(
  10. DefaultRollingPolicy.builder()
  11. .withRolloverInterval(Duration.ofSeconds(10))
  12. .withInactivityInterval(Duration.ofSeconds(10))
  13. .withMaxPartSize(MemorySize.ofMebiBytes(1))
  14. .build())
  15. .build();
  16. linesDS.sinkTo(flinkSink);
  17. env.execute();
  18. }
  19. }
4.2、自定义sink
  1. public class Demo02MySink {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. ArrayList<String> arrList = new ArrayList<>();
  5. arrList.add("flink");
  6. arrList.add("flink");
  7. arrList.add("flink");
  8. arrList.add("flink");
  9. DataStreamSource<String> ds = env.fromCollection(arrList);
  10. ds.addSink(new MySinkFunction());
  11. env.execute();
  12. }
  13. }
  14. class MySinkFunction implements SinkFunction<String>{
  15. @Override
  16. public void invoke(String value,Context context) throws Exception {
  17. System.out.println("进入了invoke方法");
  18. // invoke 每一条数据会执行一次
  19. // 最终数据需要sink到哪里,就对value进行处理即可
  20. System.out.println(value);
  21. }
  22. }

5. flink window

5.1、时间窗口
  1. public class Demo01TimeWindow {
  2. /*
  3. * 时间窗口:滚动、滑动
  4. * 时间类型:处理时间、事件时间
  5. */
  6. public static void main(String[] args) throws Exception {
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8. env.setParallelism(1);
  9. DataStream<MyEvent> myDS = env.socketTextStream("master", 8888)
  10. .map(new MapFunction<String, MyEvent>() {
  11. @Override
  12. public MyEvent map(String value) throws Exception {
  13. String[] split = value.split(",");
  14. return new MyEvent(split[0], Long.parseLong(split[1]));
  15. }
  16. });
  17. // 基于处理时间的滚动、滑动窗口
  18. myDS.map(e-> Tuple2.of(e.getWord(),1), Types.TUPLE(Types.STRING,Types.INT))
  19. .keyBy(t2->t2.f0)
  20. // 滚动窗口 每隔5s统计一次
  21. // .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  22. // 滑动窗口 每隔5s统计最近10s内的数据
  23. .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  24. .sum(1);
  25. // 基于事件时间的滚动、滑动窗口
  26. SingleOutputStreamOperator<MyEvent> assDS = myDS.assignTimestampsAndWatermarks(
  27. WatermarkStrategy
  28. .<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
  29. .withTimestampAssigner((event, ts) -> event.getTs())
  30. );
  31. SingleOutputStreamOperator<Tuple2<String, Integer>> eventDS = assDS.map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT))
  32. .keyBy(t2 -> t2.f0)
  33. // 滚动窗口,由于水位线前移了5s,整体有5s的延时
  34. // .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  35. // 滑动窗口
  36. .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  37. .sum(1);
  38. // processDS.print();
  39. eventDS.print();
  40. env.execute();
  41. }
  42. }
5.2、会话窗口
  1. public class Demo02SessionWindow {
  2. public static void main(String[] args) throws Exception {
  3. // 会话窗口:当一段时间没有数据,那么就认定此次会话结束并触发窗口的执行
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. env.setParallelism(1);
  6. DataStream<MyEvent> myDS = env.socketTextStream("master", 8888)
  7. .map(new MapFunction<String, MyEvent>() {
  8. @Override
  9. public MyEvent map(String value) throws Exception {
  10. String[] split = value.split(",");
  11. return new MyEvent(split[0], Long.parseLong(split[1]));
  12. }
  13. });
  14. // 基于处理时间的会话窗口
  15. SingleOutputStreamOperator<Tuple2<String, Integer>> processSessionDS = myDS.map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT))
  16. .keyBy(t2 -> t2.f0)
  17. .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
  18. .sum(1);
  19. // 基于事件时间的会话窗口
  20. // 指定水位线策略并提供数据中的时间戳解析规则
  21. SingleOutputStreamOperator<MyEvent> assDS = myDS.assignTimestampsAndWatermarks(WatermarkStrategy
  22. .<MyEvent>forMonotonousTimestamps()
  23. .withTimestampAssigner((e, ts) -> e.getTs())
  24. );
  25. SingleOutputStreamOperator<Tuple2<String, Integer>> eventSessionDS = assDS.map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT))
  26. .keyBy(t2 -> t2.f0)
  27. .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
  28. .sum(1);
  29. // eventSessionDS.print();
  30. // processSessionDS.print();
  31. env.execute();
  32. }
  33. }
5.3、计算窗口
  1. public class Demo03CountWindow {
  2. public static void main(String[] args) throws Exception {
  3. // 计数窗口:滚动、滑动
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. DataStreamSource<String> ds = env.socketTextStream("master", 8888);
  6. ds.map(word-> Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT))
  7. .keyBy(t2->t2.f0)
  8. .countWindow(5) // 每同一个key的5条数据会统计一次
  9. // .countWindow(10,5)// 每隔同一个key的5条数据统计最近10条数据
  10. .sum(1)
  11. .print();
  12. env.execute();
  13. }
  14. }

四、kafka 搭建及使用

1、Kafka简介

kafka是一个高吞吐的分布式消息系统。生产者负责生产数据 消费者负责消费数据

Kafka架构

  • 生产者、Broker、消费者、ZK;
  • 注意:Zookeeper中保存Broker id和消费者offsets等信息,但是没有生产者信息

2、kafka特点

高性能:单节点支持上千个客户端,百MB/s吞吐; 持久性:消息直接持久化在普通磁盘上且性能好; 分布式:数据副本冗余、流量负载均衡、可扩展; 很灵活:消息长时间持久化+Client维护消费状态。

3、Kafka性能好的原因

kafka写磁盘是顺序的,所以不断的往前产生,不断的往后写 kafka还用了sendFile的0拷贝技术,提高速度 而且还用到了批量读写,一批批往里写,64K为单位。

4、副本数设定、日志保存时间

一般我们设置成2个或3个,很多企业设置为2个;副本的优势:提高可靠性;副本劣势:增加了网络IO传输。默认保存7天;生产环境建议3天

5、Kafka搭建

5.1、上传解压修改环境变量

解压

tar -zxvf kafka_2.11-1.0.0.tgz -C /usr/local/soft
mv kafka_2.11-1.0.0 kafka-1.0.0

配置环境变量

vim /etc/profile

export KAFKA_HOME=/usr/local/soft/kafka-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

5.2、修改配置文件

vim config/server.properties

  1. broker.id=0 每一个节点broker.id 要不一样
  2. zookeeper.connect=master:2181,node1:2181,node2:2181/kafka
  3. log.dirs=/usr/local/soft/kafka-1.0.0/data 数据存放的位置
5.3、将kafka文件同步到node1,node2

同步kafka文件

scp -r kafka-1.0.0/ node1:pwd
scp -r kafka-1.0.0/ node2:pwd

修改node1、node2中的/etc/profile,增加Kafka环境变量

export KAFKA_HOME=/usr/local/soft/kafka-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin

在ndoe1和node2中执行source

source /etc/profile

5.4、修改node1和node2中的broker.id

vim config/server.properties

  1. # node1
  2. broker.id=1
  3. # node2
  4. broker.id=2
5.5、启动kafka

1、需要启动zookeeper, kafka使用zk保存元数据

需要在每个节点中执行启动的命令

zkServer.sh start

查看启动的状体

zkServer.sh status

2、启动kafka,每个节点中都要启动(去中心化的架构)

-daemon后台启动

kafka-server-start.sh -daemon /usr/local/soft/kafka-1.0.0/config/server.properties

测试是否成功

#生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic topic01

消费者

--from-beginning 从头消费, 如果不在执行消费的新的数据
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic topic01

6、 kafka使用

6.1、创建topic

在生产和消费数据时,如果topic不存在会自动创建一个分区为1,副本为1的topic

--replication-factor ---每一个分区的副本数量, 同一个分区的副本不能放在同一个节点,副本的数量不能大于kafak集群节点的数量
--partition --分区数, 根据数据量设置
--zookeeper zk的地址,将topic的元数据保存在zookeeper中

kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic topic01

6.2、查看topic描述信息

kafka-topics.sh --describe --zookeeper master:2181,node1:2181,node2:2181/kafka --topic topic01

6.3、获取所有topic

kafka-topics.sh --list --zookeeper master:2181,node1:2181,node2:2181/kafka

6.4、创建控制台生产者

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic topic01

6.5、创建控制台消费者

kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node3:9092 --from-beginning --topic topic01

标签: flink 大数据 kafka

本文转载自: https://blog.csdn.net/2403_83630621/article/details/140830680
版权归原作者 Act-F 所有, 如有侵权,请联系我们删除。

“flink与kafka基础知识”的评论:

还没有评论