0


Flink-Source的使用

Data Sources 是什么呢?就字面意思其实就可以知道:数据来源

Flink 做为一款流式计算框架,它可用来做批处理,也可以用来做流处理,这个 Data Sources 就是数据的来源地。

flink在/流****处理中常见的source主要有两大类。

预定义Source

基于本地集合的source(Collection-based-source)

基于文件的source(File-based-source)

基于网络套接字(socketTextStream)

自定义Source

预定义Source演示

Collection [测试]--本地集合Source

在flink最常见的创建DataStream方式有四种:

l 使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。

注意:类型要一致,不一致可以用Object接收,但是使用会报错,比如:env.fromElements("haha", 1);

源码注释中有写:

|使用env.fromCollection(),这种方式支持多种Collection的具体类型,如List,Set,Queue

l 使用env.generateSequence()方法创建基于Sequence的DataStream --已经废弃了

l 使用env.fromSequence()方法创建基于开始和结束的DataStream

一般用于学习测试时编造数据时使用

1.env.fromElements(可变参数);

2.env.fromColletion(各种集合);

3.env.fromSequence(开始,结束);

  1. package com.bigdata.source;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import java.util.ArrayList;
  6. import java.util.Arrays;
  7. import java.util.List;
  8. public class _01YuDingYiSource {
  9. public static void main(String[] args) throws Exception {
  10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. // 各种获取数据的Source
  12. DataStreamSource<String> dataStreamSource = env.fromElements("hello world txt", "hello nihao kongniqiwa");
  13. dataStreamSource.print();
  14. // 演示一个错误的
  15. //DataStreamSource<Object> dataStreamSource2 = env.fromElements("hello", 1,3.0f);
  16. //dataStreamSource2.print();
  17. DataStreamSource<Tuple2<String, Integer>> elements = env.fromElements(
  18. Tuple2.of("张三", 18),
  19. Tuple2.of("lisi", 18),
  20. Tuple2.of("wangwu", 18)
  21. );
  22. elements.print();
  23. // 有一个方法,可以直接将数组变为集合 复习一下数组和集合以及一些非常常见的API
  24. String[] arr = {"hello","world"};
  25. System.out.println(arr.length);
  26. System.out.println(Arrays.toString(arr));
  27. List<String> list = Arrays.asList(arr);
  28. System.out.println(list);
  29. env.fromElements(
  30. Arrays.asList(arr),
  31. Arrays.asList(arr),
  32. Arrays.asList(arr)
  33. ).print();
  34. // 第二种加载数据的方式
  35. // Collection 的子接口只有 Set 和 List
  36. ArrayList<String> list1 = new ArrayList<>();
  37. list1.add("python");
  38. list1.add("scala");
  39. list1.add("java");
  40. DataStreamSource<String> ds1 = env.fromCollection(list1);
  41. DataStreamSource<String> ds2 = env.fromCollection(Arrays.asList(arr));
  42. // 第三种
  43. DataStreamSource<Long> ds3 = env.fromSequence(1, 100);
  44. ds3.print();
  45. // execute 下面的代码不运行,所以,这句话要放在最后。
  46. env.execute("获取预定义的Source");
  47. }
  48. }

本地文件的案例:

  1. package com.bigdata.source;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import java.io.File;
  6. import java.util.ArrayList;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. public class _02YuDingYiSource {
  10. public static void main(String[] args) throws Exception {
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. // 获取并行度
  13. System.out.println(env.getParallelism());
  14. // 讲第二种Source File类型的
  15. // 给了一个相对路径,说路径不对,老闫非要写,我咋办?
  16. // 相对路径,转绝对路径
  17. File file = new File("datas/wc.txt");
  18. File file2 = new File("./");
  19. System.out.println(file.getAbsoluteFile());
  20. System.out.println(file2.getAbsoluteFile());
  21. DataStreamSource<String> ds1 = env.readTextFile("datas/wc.txt");
  22. ds1.print();
  23. // 还可以获取hdfs路径上的数据
  24. DataStreamSource<String> ds2 = env.readTextFile("hdfs://bigdata01:9820/home/a.txt");
  25. ds2.print();
  26. // execute 下面的代码不运行,所以,这句话要放在最后。
  27. env.execute("获取预定义的Source");
  28. }
  29. }

Socket [测试]

socketTextStream(String hostname, int port) 方法是一个非并行的Source,该方法需要传入两个参数,第一个是指定的IP地址或主机名,第二个是端口号,即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法,其中一个是socketTextStream(String hostname, int port, String delimiter, long maxRetry),这个重载的方法可以指定行分隔符和最大重新连接次数。这两个参数,默认行分隔符是”\n”,最大重新连接次数为0。

提示:

如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,为了方便,Mac或Linux用户可以在命令行终端输入nc -lk 8888启动一个Socket服务并在命令行中向该Socket服务发送数据。Windows用户可以在百度中搜索windows安装netcat命令。

使用nc 进行数据的发送

  1. yum install -y nc
  2. nc -lk 8888 --向8888端口发送消息,这个命令先运行,如果先运行java程序,会报错!

如果是windows平台:nc -lp 8888

代码演示:

  1. //socketTextStream创建的DataStream,不论怎样,并行度永远是1
  2. public class StreamSocketSource {
  3. public static void main(String[] args) throws Exception {
  4. //local模式默认的并行度是当前机器的逻辑核的数量
  5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6. int parallelism0 = env.getParallelism();
  7. System.out.println("执行环境默认的并行度:" + parallelism0);
  8. DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
  9. //获取DataStream的并行度
  10. int parallelism = lines.getParallelism();
  11. System.out.println("SocketSource的并行度:" + parallelism);
  12. SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  13. @Override
  14. public void flatMap(String line, Collector<String> collector) throws Exception {
  15. String[] words = line.split(" ");
  16. for (String word : words) {
  17. collector.collect(word);
  18. }
  19. }
  20. });
  21. int parallelism2 = words.getParallelism();
  22. System.out.println("调用完FlatMap后DataStream的并行度:" + parallelism2);
  23. words.print();
  24. env.execute();
  25. }
  26. }

以下用于演示:统计socket中的 单词数量,体会流式计算的魅力!

  1. import org.apache.flink.api.common.RuntimeExecutionMode;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.common.functions.MapFunction;
  4. import org.apache.flink.api.java.functions.KeySelector;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.datastream.KeyedStream;
  8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.util.Collector;
  11. public class SourceDemo02_Socket {
  12. public static void main(String[] args) throws Exception {
  13. //TODO 1.env-准备环境
  14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  16. //TODO 2.source-加载数据
  17. DataStream<String> socketDS = env.socketTextStream("bigdata01", 8889);
  18. //TODO 3.transformation-数据转换处理
  19. //3.1对每一行数据进行分割并压扁
  20. DataStream<String> wordsDS = socketDS.flatMap(new FlatMapFunction<String, String>() {
  21. @Override
  22. public void flatMap(String value, Collector<String> out) throws Exception {
  23. String[] words = value.split(" ");
  24. for (String word : words) {
  25. out.collect(word);
  26. }
  27. }
  28. });
  29. //3.2每个单词记为<单词,1>
  30. DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
  31. @Override
  32. public Tuple2<String, Integer> map(String value) throws Exception {
  33. return Tuple2.of(value, 1);
  34. }
  35. });
  36. //3.3分组
  37. KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
  38. @Override
  39. public String getKey(Tuple2<String, Integer> value) throws Exception {
  40. return value.f0;
  41. }
  42. });
  43. //3.4聚合
  44. SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);
  45. //TODO 4.sink-数据输出
  46. result.print();
  47. //TODO 5.execute-执行
  48. env.execute();
  49. }
  50. }

自定义数据源

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

  1. package com.bigdata.day02;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
  8. import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
  9. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  10. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  11. import java.util.Random;
  12. import java.util.UUID;
  13. /**
  14. * 需求: 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
  15. * 要求:
  16. * - 随机生成订单ID(UUID)
  17. * - 随机生成用户ID(0-2)
  18. * - 随机生成订单金额(0-100)
  19. * - 时间戳为当前系统时间
  20. */
  21. @Data // set get toString
  22. @AllArgsConstructor
  23. @NoArgsConstructor
  24. class OrderInfo{
  25. private String orderId;
  26. private int uid;
  27. private int money;
  28. private long timeStamp;
  29. }
  30. // class MySource extends RichSourceFunction<OrderInfo> {
  31. //class MySource extends RichParallelSourceFunction<OrderInfo> {
  32. class MySource implements SourceFunction<OrderInfo> {
  33. boolean flag = true;
  34. @Override
  35. public void run(SourceContext ctx) throws Exception {
  36. // 源源不断的产生数据
  37. Random random = new Random();
  38. while(flag){
  39. OrderInfo orderInfo = new OrderInfo();
  40. orderInfo.setOrderId(UUID.randomUUID().toString());
  41. orderInfo.setUid(random.nextInt(3));
  42. orderInfo.setMoney(random.nextInt(101));
  43. orderInfo.setTimeStamp(System.currentTimeMillis());
  44. ctx.collect(orderInfo);
  45. Thread.sleep(1000);// 间隔1s
  46. }
  47. }
  48. // source 停止之前需要干点啥
  49. @Override
  50. public void cancel() {
  51. flag = false;
  52. }
  53. }
  54. public class CustomSource {
  55. public static void main(String[] args) throws Exception {
  56. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  57. env.setParallelism(2);
  58. // 将自定义的数据源放入到env中
  59. DataStreamSource dataStreamSource = env.addSource(new MySource())/*.setParallelism(1)*/;
  60. System.out.println(dataStreamSource.getParallelism());
  61. dataStreamSource.print();
  62. env.execute();
  63. }
  64. }

通过****ParallelSourceFunction创建可并行Source

  1. /**
  2. * 自定义多并行度Source
  3. */
  4. public class CustomerSourceWithParallelDemo {
  5. public static void main(String[] args) throws Exception {
  6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  7. DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);
  8. mySource.print();
  9. env.execute();
  10. }
  11. public static class MySource implements ParallelSourceFunction<String> {
  12. @Override
  13. public void run(SourceContext<String> ctx) throws Exception {
  14. ctx.collect(UUID.randomUUID().toString());
  15. /*
  16. 如果不设置无限循环可以看出,设置了多少并行度就打印出多少条数据
  17. */
  18. }
  19. @Override
  20. public void cancel() {}
  21. }
  22. }

如果代码换成ParallelSourceFunction,每次生成12个数据,假如是12核数的话。

总结:Rich富函数总结 ctrl + o

Rich 类型的Source可以比非Rich的多出有:
- open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)
- close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦
- getRuntimeContext 方法可以获得当前的Runtime对象(底层API)

Kafka Source --从kafka中读取数据

https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_2.11</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>

创建一个topic1 这个主题:

  1. cd /opt/installs/kafka3/
  2. bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic1
  3. 通过控制台向topic1发送消息:
  4. bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic topic1
  1. package com.bigdata.day02;
  2. import org.apache.flink.api.common.functions.FilterFunction;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  7. import java.util.Properties;
  8. public class KafkaSource {
  9. public static void main(String[] args) throws Exception {
  10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. Properties properties = new Properties();
  12. properties.setProperty("bootstrap.servers", "bigdata01:9092");
  13. properties.setProperty("group.id", "g1");
  14. FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);
  15. DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);
  16. // 以下代码跟flink消费kakfa数据没关系,仅仅是将需求搞的复杂一点而已
  17. // 返回true 的数据就保留下来,返回false 直接丢弃
  18. dataStreamSource.filter(new FilterFunction<String>() {
  19. @Override
  20. public boolean filter(String word) throws Exception {
  21. // 查看单词中是否包含success 字样
  22. return word.contains("success");
  23. }
  24. }).print();
  25. env.execute();
  26. }
  27. }
标签: flink 大数据 Source

本文转载自: https://blog.csdn.net/weixin_63297999/article/details/143989738
版权归原作者 时差953 所有, 如有侵权,请联系我们删除。

“Flink-Source的使用”的评论:

还没有评论