0


Flink介绍

什么是Flink

Apache Flink 是一个开源的流处理框架,用于处理实时数据流和批处理数据。它具有高吞吐量、低延迟和容错性强的特点,适用于各种实时数据处理场景,如实时分析、事件驱动应用和数据管道等。Flink 提供了丰富的 API,支持 Java、Scala 和 Python 等编程语言,并且可以与 Hadoop、Kafka 等大数据生态系统无缝集成

能做什么

Apache Flink 是一个分布式流处理框架,广泛应用于实时数据处理和大数据分析领域。以下是一些实际应用的例子:

  1. 实时数据分析:Flink 可以用于实时监控和分析数据。例如,在金融行业,Flink 可以用于实时监控交易数据,检测异常交易行为,防止欺诈。
  2. 日志处理:在互联网公司,Flink 常用于处理服务器日志数据。通过实时分析日志,可以监控系统运行状态,及时发现和解决问题。
  3. 推荐系统:电商平台可以使用 Flink 实时处理用户行为数据,生成个性化推荐。例如,用户浏览商品时,Flink 可以实时分析用户的浏览和购买历史,推荐相关商品。
  4. 物联网(IoT):在物联网应用中,Flink 可以处理来自各种传感器的数据。例如,在智能城市中,Flink 可以实时处理交通传感器数据,优化交通信号灯的控制,减少交通拥堵。
  5. 社交媒体分析:社交媒体平台可以使用 Flink 实时分析用户发布的内容和互动行为。例如,实时分析推文内容,识别热门话题和趋势。
  6. 广告投放:广告平台可以使用 Flink 实时处理用户点击和浏览数据,优化广告投放策略,提高广告效果。
  7. 金融风控:银行和金融机构可以使用 Flink 实时处理交易数据,进行风险评估和管理。例如,实时监控信用卡交易,识别潜在的欺诈行为。

Flink的source和sink可以有哪些

Apache Flink 是一个分布式流处理框架,它支持多种数据源(source)和数据接收器(sink)。以下是一些常见的 Flink source 和 sink:

Source(数据源)

  1. 文件系统:可以从本地文件系统、HDFS、S3 等读取文件。
  2. Kafka:从 Apache Kafka 主题中读取数据。
  3. 数据库:通过 JDBC 连接从关系型数据库(如 MySQL、PostgreSQL)中读取数据。
  4. 消息队列:如 RabbitMQ、ActiveMQ 等。
  5. Socket:从网络套接字中读取数据。
  6. 自定义源:用户可以实现自定义的 SourceFunction 来读取特定的数据源。
  7. 集合:从 Java 集合(如 List、Array)中读取数据,通常用于测试。
  8. Elasticsearch:从 Elasticsearch 中读取数据。
  9. Kinesis:从 Amazon Kinesis 流中读取数据。
  10. Pulsar:从 Apache Pulsar 主题中读取数据。

Sink(数据接收器)

  1. 文件系统:将数据写入本地文件系统、HDFS、S3 等。
  2. Kafka:将数据写入 Apache Kafka 主题。
  3. 数据库:通过 JDBC 连接将数据写入关系型数据库(如 MySQL、PostgreSQL)。
  4. 消息队列:如 RabbitMQ、ActiveMQ 等。
  5. Socket:将数据写入网络套接字。
  6. 自定义接收器:用户可以实现自定义的 SinkFunction 来写入特定的目标。
  7. Elasticsearch:将数据写入 Elasticsearch。
  8. Kinesis:将数据写入 Amazon Kinesis 流。
  9. Pulsar:将数据写入 Apache Pulsar 主题。
  10. 打印:将数据输出到标准输出(通常用于调试)。

这些 source 和 sink 使得 Flink 可以灵活地与各种数据存储和消息系统集成,满足不同的应用需求。

Flink同步有哪几种方式

Flink-sql举例

配置mysql-cdc,Debezium是一个开源的CDC(Change Data Capture)工具,可以捕获MySQL数据库的变化。你需要配置Debezium来监听A库A表的变化。

  1. {
  2. "name": "mysql-connector",
  3. "config": {
  4. "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  5. "database.hostname": "localhost",
  6. "database.port": "3306",
  7. "database.user": "your-username",
  8. "database.password": "your-password",
  9. "database.server.id": "184054",
  10. "database.server.name": "dbserver1",
  11. "database.whitelist": "A库",
  12. "table.whitelist": "A库.A表",
  13. "database.history.kafka.bootstrap.servers": "kafka:9092",
  14. "database.history.kafka.topic": "schema-changes.A库"
  15. }
  16. }

1.创建来源表

2.创建目标表

3.同步语句

4.将 Flink SQL脚本提交到Flink集群中运行,Flink会自动将A库A表的数据变化同步到B库b表

  1. -- 创建Debezium MySQL Source
  2. CREATE TABLE source_table (
  3. id INT,
  4. name STRING,
  5. age INT,
  6. PRIMARY KEY (id) NOT ENFORCED
  7. ) WITH (
  8. 'connector' = 'mysql-cdc',
  9. 'hostname' = 'localhost',
  10. 'port' = '3306',
  11. 'username' = 'your-username',
  12. 'password' = 'your-password',
  13. 'database-name' = 'A库',
  14. 'table-name' = 'A表'
  15. );
  16. -- 创建JDBC Sink
  17. CREATE TABLE sink_table (
  18. id INT,
  19. name STRING,
  20. age INT,
  21. PRIMARY KEY (id) NOT ENFORCED
  22. ) WITH (
  23. 'connector' = 'jdbc',
  24. 'url' = 'jdbc:mysql://localhost:3306/B库',
  25. 'table-name' = 'b表',
  26. 'username' = 'your-username',
  27. 'password' = 'your-password'
  28. );
  29. -- 将数据从source_table同步到sink_table
  30. INSERT INTO sink_table
  31. SELECT * FROM source_table;

Flink-jar举例

除了用flink-sql,还有一种更灵活的jar代码形式的

  1. //依赖
  2. <dependency>
  3. <groupId>com.ververica</groupId>
  4. <artifactId>flink-connector-mysql-cdc</artifactId>
  5. <version>2.0.0</version>
  6. </dependency>
  7. //数据源
  8. MySQLSource<String> mySQLSource = MySQLSource.<String>builder()
  9. .hostname("localhost")
  10. .port(3306)
  11. .databaseList("your_database") // 设置数据库名称
  12. .tableList("your_database.table1", "your_database.table2") // 设置表名称
  13. .username("your_username")
  14. .password("your_password")
  15. .deserializer(new JsonDebeziumDeserializationSchema()) // 设置反序列化器
  16. .build();
  17. //执行环境
  18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19. DataStream<String> stream = env.addSource(mySQLSource);
  20. //处理数据
  21. DataStream<YourDataType> processedStream = stream
  22. .map(jsonString -> {
  23. // 解析 JSON 字符串并转换为自定义数据类型
  24. return parseJsonToYourDataType(jsonString);
  25. })
  26. .keyBy(YourDataType::getKey)
  27. .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  28. .apply(new YourJoinFunction());
  29. //启动作业
  30. env.execute("Flink MySQL CDC Example");

实际业务举例-使用 Apache Flink 实时处理用户行为数据并生成个性化推荐系统

  1. 数据收集:- 从各种数据源(如日志文件、数据库、消息队列等)收集用户行为数据。常用的消息队列系统包括 Apache Kafka 和 RabbitMQ。
  2. 数据预处理:- 使用 Flink 对原始数据进行清洗、过滤和转换。例如,去除无效数据、解析日志格式、提取有用字段等。
  3. 特征提取:- 从用户行为数据中提取特征,例如用户点击、浏览、购买等行为。可以使用 Flink 的窗口操作(如滑动窗口、滚动窗口)来聚合数据。
  4. 实时计算:- 使用 Flink 的流处理能力对用户行为数据进行实时计算。例如,计算用户的实时兴趣、热门商品、用户相似度等。
  5. 推荐算法:- 实现推荐算法,如协同过滤、基于内容的推荐、矩阵分解等。可以将这些算法集成到 Flink 的数据流中,实时生成推荐结果。
  6. 结果输出:- 将推荐结果输出到存储系统或消息队列,以便后续使用。例如,将推荐结果存储到 Redis 以便快速查询,或者通过 Kafka 发送到前端系统。
  7. 系统监控和优化:- 监控 Flink 作业的运行状态,确保系统的稳定性和性能。可以使用 Flink 自带的监控工具或集成第三方监控系统(如 Prometheus 和 Grafana)。
  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  5. import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
  6. import java.util.Properties;
  7. public class UserBehaviorRecommendation {
  8. public static void main(String[] args) throws Exception {
  9. // 设置 Flink 执行环境
  10. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. // 配置 Kafka 消费者
  12. Properties properties = new Properties();
  13. properties.setProperty("bootstrap.servers", "localhost:9092");
  14. properties.setProperty("group.id", "user-behavior-group");
  15. // 从 Kafka 读取用户行为数据
  16. DataStream<String> userBehaviorStream = env.addSource(
  17. new FlinkKafkaConsumer<>("user-behavior-topic", new SimpleStringSchema(), properties)
  18. );
  19. // 数据预处理和特征提取
  20. DataStream<UserBehavior> processedStream = userBehaviorStream
  21. .map(new MapFunction<String, UserBehavior>() {
  22. @Override
  23. public UserBehavior map(String value) throws Exception {
  24. // 解析用户行为数据
  25. return parseUserBehavior(value);
  26. }
  27. });
  28. // 实时计算和推荐算法
  29. DataStream<Recommendation> recommendationStream = processedStream
  30. .keyBy("userId")
  31. .map(new RecommendationFunction());
  32. // 输出推荐结果
  33. recommendationStream.addSink(new RecommendationSink());
  34. // 启动 Flink 作业
  35. env.execute("User Behavior Recommendation");
  36. }
  37. private static UserBehavior parseUserBehavior(String value) {
  38. // 解析逻辑
  39. return new UserBehavior();
  40. }
  41. // 用户行为数据类
  42. public static class UserBehavior {
  43. public String userId;
  44. public String itemId;
  45. public String behavior;
  46. public long timestamp;
  47. }
  48. // 推荐结果类
  49. public static class Recommendation {
  50. public String userId;
  51. public String recommendedItemId;
  52. }
  53. // 推荐算法函数
  54. public static class RecommendationFunction extends RichMapFunction<UserBehavior, Recommendation> {
  55. @Override
  56. public Recommendation map(UserBehavior value) throws Exception {
  57. // 推荐算法逻辑
  58. return new Recommendation();
  59. }
  60. }
  61. // 推荐结果输出
  62. public static class RecommendationSink extends RichSinkFunction<Recommendation> {
  63. @Override
  64. public void invoke(Recommendation value, Context context) throws Exception {
  65. // 输出逻辑
  66. }
  67. }
  68. }
标签: flink 大数据

本文转载自: https://blog.csdn.net/dzhou1991/article/details/143688166
版权归原作者 syty2020 所有, 如有侵权,请联系我们删除。

“Flink介绍”的评论:

还没有评论