0


Flink定制化功能开发,demo代码

前言:

  1. 这是一个Flink自定义开发的基础教学。本文将通过flinkDataStream模块API,以kafka为数据源,构建一个基础测试环境;包含一个kafka生产者线程工具,一个自定义FilterFunction算子,一个自定义MapFunction算子,用一个flink任务的代码逻辑,将实时读kafka并多层处理串起来;让读者体会通过Flink构建自定义函数的技巧。

一、Flink的开发模块分析

Flink提供四个基础模块:核心SDK开发API分别是处理实时计算的DataStream和处理离线计算的DataSet;基于这两个SDK,在其上包装了TableAPI开发模块的SDK;在Table API之上,定义了高度抽象可用SQL开发任务的FlinkSQL。在核心开发API之下,还有基础API的接口,可用于对时间,状态,算子等最细粒度的特性对象做操作,如包装自定义算子的ProcessWindowFunction和ProcessFunction等基础函数以及内置的对象状态StateTtlConfig;

FLINK开发API关系结构如下:

二、定制化开发Demo演示

2.1 场景介绍

Flink实时任务的的通用技术架构是消息队列中间件+Flink任务:

将数据采集到Kafka或pulser这类队列中间件的Topic,然后使用Flink内置的kafkaSource,监控Topic的数据情况,做实时处理。

  1. 这里提供一个kafka的生产者线程,可以自定义构建需要的数据和上传时间,用于控制写入kafka的数据源;
  2. 重写两个DataStream的基础算子:FilterFunction和MapFunction,用于让读者体会,如何对FLINK函数的重新包装,后续更基础的函数原理一样;我这里用String数据对象做处理,减少对象转换的SDK引入,通常要基于业务做数据polo的加工,这个自己处理,将对象换成业务对象;
  3. 然后使用Flink将整个业务串起来,从kafka读数据,经过两层处理,最终输出需要的结果;

2.2 本地demo演示

2.2.1 pom文件

这里以flink1.14.6+scala1.12版本为例:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <parent>
  7. <groupId>org.example</groupId>
  8. <artifactId>flinkCDC</artifactId>
  9. <version>1.0-SNAPSHOT</version>
  10. </parent>
  11. <artifactId>flinkStream</artifactId>
  12. <properties>
  13. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  14. <maven.compiler.source>8</maven.compiler.source>
  15. <maven.compiler.target>8</maven.compiler.target>
  16. <flink-version>1.14.6</flink-version>
  17. <scala-version>2.12</scala-version>
  18. <hadop-common-version>2.9.1</hadop-common-version>
  19. <elasticsearch.version>7.6.2</elasticsearch.version>
  20. <target.java.version>1.8</target.java.version>
  21. <scala.binary.version>2.12</scala.binary.version>
  22. <maven.compiler.source>${target.java.version}</maven.compiler.source>
  23. <maven.compiler.target>${target.java.version}</maven.compiler.target>
  24. <log4j.version>2.17.1</log4j.version>
  25. </properties>
  26. <repositories>
  27. <repository>
  28. <id>apache.snapshots</id>
  29. <name>Apache Development Snapshot Repository</name>
  30. <url>https://repository.apache.org/content/repositories/snapshots/</url>
  31. <releases>
  32. <enabled>false</enabled>
  33. </releases>
  34. <snapshots>
  35. </snapshots>
  36. </repository>
  37. </repositories>
  38. <dependencies>
  39. <dependency>
  40. <groupId>org.apache.flink</groupId>
  41. <artifactId>flink-java</artifactId>
  42. <version>${flink-version}</version>
  43. <!-- <scope>provided</scope>-->
  44. </dependency>
  45. <dependency>
  46. <groupId>org.apache.flink</groupId>
  47. <artifactId>flink-core</artifactId>
  48. <version>${flink-version}</version>
  49. </dependency>
  50. <dependency>
  51. <groupId>org.apache.flink</groupId>
  52. <artifactId>flink-clients_${scala-version}</artifactId>
  53. <version>${flink-version}</version>
  54. <!-- <scope>provided</scope>-->
  55. </dependency>
  56. <dependency>
  57. <groupId>org.apache.flink</groupId>
  58. <artifactId>flink-connector-kafka_${scala-version}</artifactId>
  59. <exclusions>
  60. <exclusion>
  61. <groupId>org.slf4j</groupId>
  62. <artifactId>slf4j-log4j12</artifactId>
  63. </exclusion>
  64. <exclusion>
  65. <groupId>org.apache.kafka</groupId>
  66. <artifactId>kafka-clients</artifactId>
  67. </exclusion>
  68. </exclusions>
  69. <version>${flink-version}</version>
  70. </dependency>
  71. <dependency>
  72. <groupId>org.apache.kafka</groupId>
  73. <artifactId>kafka-clients</artifactId>
  74. <version>2.4.1</version>
  75. </dependency>
  76. </dependencies>
  77. <build>
  78. <plugins>
  79. <!-- Java Compiler -->
  80. <plugin>
  81. <groupId>org.apache.maven.plugins</groupId>
  82. <artifactId>maven-compiler-plugin</artifactId>
  83. <version>3.1</version>
  84. <configuration>
  85. <source>${target.java.version}</source>
  86. <target>${target.java.version}</target>
  87. </configuration>
  88. </plugin>
  89. <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
  90. <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
  91. <plugin>
  92. <groupId>org.apache.maven.plugins</groupId>
  93. <artifactId>maven-shade-plugin</artifactId>
  94. <version>3.1.1</version>
  95. <executions>
  96. <!-- Run shade goal on package phase -->
  97. <execution>
  98. <phase>package</phase>
  99. <goals>
  100. <goal>shade</goal>
  101. </goals>
  102. <configuration>
  103. <createDependencyReducedPom>false</createDependencyReducedPom>
  104. <artifactSet>
  105. <excludes>
  106. <exclude>org.apache.flink:flink-shaded-force-shading</exclude>
  107. <exclude>com.google.code.findbugs:jsr305</exclude>
  108. <exclude>org.slf4j:*</exclude>
  109. <exclude>org.apache.logging.log4j:*</exclude>
  110. </excludes>
  111. </artifactSet>
  112. <filters>
  113. <filter>
  114. <!-- Do not copy the signatures in the META-INF folder.
  115. Otherwise, this might cause SecurityExceptions when using the JAR. -->
  116. <artifact>*:*</artifact>
  117. <excludes>
  118. <exclude>META-INF/*.SF</exclude>
  119. <exclude>META-INF/*.DSA</exclude>
  120. <exclude>META-INF/*.RSA</exclude>
  121. </excludes>
  122. </filter>
  123. </filters>
  124. <transformers>
  125. <transformer
  126. implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
  127. <transformer
  128. implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  129. <mainClass>myflinkml.DataStreamJob</mainClass>
  130. </transformer>
  131. </transformers>
  132. </configuration>
  133. </execution>
  134. </executions>
  135. </plugin>
  136. </plugins>
  137. <pluginManagement>
  138. <plugins>
  139. <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
  140. <plugin>
  141. <groupId>org.eclipse.m2e</groupId>
  142. <artifactId>lifecycle-mapping</artifactId>
  143. <version>1.0.0</version>
  144. <configuration>
  145. <lifecycleMappingMetadata>
  146. <pluginExecutions>
  147. <pluginExecution>
  148. <pluginExecutionFilter>
  149. <groupId>org.apache.maven.plugins</groupId>
  150. <artifactId>maven-shade-plugin</artifactId>
  151. <versionRange>[3.1.1,)</versionRange>
  152. <goals>
  153. <goal>shade</goal>
  154. </goals>
  155. </pluginExecutionFilter>
  156. <action>
  157. <ignore/>
  158. </action>
  159. </pluginExecution>
  160. <pluginExecution>
  161. <pluginExecutionFilter>
  162. <groupId>org.apache.maven.plugins</groupId>
  163. <artifactId>maven-compiler-plugin</artifactId>
  164. <versionRange>[3.1,)</versionRange>
  165. <goals>
  166. <goal>testCompile</goal>
  167. <goal>compile</goal>
  168. </goals>
  169. </pluginExecutionFilter>
  170. <action>
  171. <ignore/>
  172. </action>
  173. </pluginExecution>
  174. </pluginExecutions>
  175. </lifecycleMappingMetadata>
  176. </configuration>
  177. </plugin>
  178. </plugins>
  179. </pluginManagement>
  180. </build>
  181. </project>
2.2.2 kafka生产者线程方法

  1. package org.example.util;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.Producer;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.util.*;
  6. /**
  7. * 向kafka生产数据
  8. *
  9. * @author i7杨
  10. * @date 2024/01/12 13:02:29
  11. */
  12. public class KafkaProducerUtil extends Thread {
  13. private String topic;
  14. public KafkaProducerUtil(String topic) {
  15. super();
  16. this.topic = topic;
  17. }
  18. private static Producer<String, String> createProducer() {
  19. // 通过Properties类设置Producer的属性
  20. Properties properties = new Properties();
  21. // 测试环境 kafka 配置
  22. properties.put("bootstrap.servers", "ip2:9092,ip:9092,ip3:9092");
  23. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  24. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  25. return new KafkaProducer<String, String>(properties);
  26. }
  27. @Override
  28. public void run() {
  29. Producer<String, String> producer = createProducer();
  30. Random random = new Random();
  31. Random random2 = new Random();
  32. while (true) {
  33. int nums = random.nextInt(10);
  34. int nums2 = random.nextInt(50);
  35. // double nums2 = random2.nextDouble();
  36. String time = new Date().getTime() / 1000 + 5 + "";
  37. String type = "pv";
  38. try {
  39. if (nums2 % 2 == 0) {
  40. type = "pv";
  41. } else {
  42. type = "uv";
  43. }
  44. // String info = "{\"user\":" + nums + ",\"item\":" + nums * 10 + ",\"category\":" + nums2 + ",\"pv\":" + nums2 * 5 + ",\"ts\":\"" + time + "\"}";
  45. String info = nums + "=" + nums2;
  46. System.out.println("message : " + info);
  47. producer.send(new ProducerRecord<String, String>(this.topic, info));
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. }
  51. System.out.println("=========数据已经写入==========");
  52. try {
  53. sleep(1000);
  54. } catch (InterruptedException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. }
  59. public static void main(String[] args) {
  60. new KafkaProducerUtil("test01").run();
  61. }
  62. public static void sendMessage(String topic, String message) {
  63. Producer<String, String> producer = createProducer();
  64. producer.send(new ProducerRecord<String, String>(topic, message));
  65. }
  66. }
2.2.3 自定义基础函数

这里自定义了filter和map两个算子函数,测试逻辑按照数据结构变化:

自定义FilterFunction函数算子:阈值小于40的过滤掉

  1. package org.example.funtion;
  2. import org.apache.flink.api.common.functions.FilterFunction;
  3. /**
  4. * FilterFunction重构
  5. *
  6. * @author i7杨
  7. * @date 2024/01/12 13:02:29
  8. */
  9. public class InfoFilterFunction implements FilterFunction<String> {
  10. private double threshold;
  11. public InfoFilterFunction(double threshold) {
  12. this.threshold = threshold;
  13. }
  14. @Override
  15. public boolean filter(String value) throws Exception {
  16. if (value.split("=").length == 2)
  17. // 阈值过滤
  18. return Double.valueOf(value.split("=")[1]) > threshold;
  19. else return false;
  20. }
  21. }

自定义MapFunction函数:后缀为2的,添加上特殊信息

  1. package org.example.funtion;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. public class ActionMapFunction implements MapFunction<String, String> {
  4. @Override
  5. public String map(String value) throws Exception {
  6. System.out.println("value:" + value);
  7. if (value.endsWith("2"))
  8. return value.concat(":Special processing information");
  9. else return value;
  10. }
  11. }
2.2.4 flink任务代码

任务逻辑:使用kafka工具产生数据,然后监控kafka的topic,讲几个函数串起来,输出结果;

  1. package org.example.service;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  6. import org.apache.kafka.clients.consumer.ConsumerConfig;
  7. import org.apache.kafka.common.serialization.StringDeserializer;
  8. import org.example.funtion.ActionMapFunction;
  9. import org.example.funtion.InfoFilterFunction;
  10. import java.util.*;
  11. public class FlinkTestDemo {
  12. public static void main(String[] args) throws Exception {
  13. // 设置执行环境
  14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15. // Kafka 配置
  16. Properties kafkaProps = new Properties();
  17. kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9092,ip2:9092,ip3:9092");
  18. kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");
  19. kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  20. kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  21. kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  22. // 创建 Kafka 消费者
  23. FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
  24. "test01",// Kafka 主题名称
  25. new SimpleStringSchema(),
  26. kafkaProps);
  27. // 从 Kafka 中读取数据流
  28. DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
  29. env.disableOperatorChaining();
  30. kafkaStream
  31. .filter(new InfoFilterFunction(40))
  32. .map(new ActionMapFunction())
  33. .print("阈值大于40以上的message=");
  34. // 执行任务
  35. env.execute("This is a testing task");
  36. }
  37. }

运行结果:

标签: linq c#

本文转载自: https://blog.csdn.net/weixin_42049123/article/details/135561586
版权归原作者 i7杨 所有, 如有侵权,请联系我们删除。

“Flink定制化功能开发,demo代码”的评论:

还没有评论