0


flink重温笔记(七):Flink 流批一体 API 开发—— Connector 连接器

Flink学习笔记

前言:今天是学习 flink 的第七天啦!学习了 flink 中 connector(数据连接器) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,主要学习了数据存储到以下三处:
1、关系型数据库 mysql ;
2、消息队列:kafka;
3、非关系型数据库:redis
我觉得还是比较有意思的,这些是以后工作要用到的技能,我一定要好好掌握!

Tips:“莫道春光难揽取,浮云过后艳阳天!”明天周一,又是新的一天,要深入学习 flink 的四大基石属性!

文章目录

二、Flink 流批一体 API 开发

5. Connectors

5.1 JDBC Connector

该连接器可以向 JDBC 数据库写入数据。

5.1.1 依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.13.1</version>
</dependency>
5.1.2 案例演示
  • 需求:从指定的socket读取数据,对单词进行计算,最后将结果写入到MySQL
  • Mysql创建表:
CREATE TABLE `t_wordcount` (
   `word` varchar(255) NOT NULL,
   `counts` int(11) DEFAULT '0',
    PRIMARY KEY (`word`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 代码:
packagecn.itcast.day07.connectors;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.connector.jdbc.JdbcConnectionOptions;importorg.apache.flink.connector.jdbc.JdbcSink;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
 * @author lql
 * @time 2024-02-18 20:25:31
 * @description TODO:从指定的socket读取数据,对单词进行计算,最后将结果写入到MySQL
 */publicclassJDBCSinkDemo{publicstaticvoidmain(String[] args)throwsException{// 1) 初始化flink流处理的环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 启动检查点机制,每隔 5 秒生成一个 slot// 栅栏会发送给job作业第一个算子(source),(source)收到栅栏后,会阻塞当前算子计算任务,将当前计算结果 state 数据持久化存储// 通过checkpoint将中间结果存储下来,只需要在处理完成后进行一次性写入,可以减少与数据库的交互次数// 而通过checkpoint将数据分批写入数据库,可以避免数据库锁,资源竞争等问题。// 一致性语义: exactly-once:就是操作失败时会进行回滚
        env.enableCheckpointing(5000);// 2)定义数据源DataStreamSource<String> lines = env.socketTextStream("node1",9999);// 词频统计SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = lines.flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicvoidflatMap(String line,Collector<Tuple2<String,Integer>> collector)throwsException{String[] words = line.split(" ");for(String word : words){
                    collector.collect(Tuple2.of(word,1));}}});// 分组KeyedStream<Tuple2<String,Integer>,String> grouped = wordAndOne.keyBy(f -> f.f0);// 聚合SingleOutputStreamOperator<Tuple2<String,Integer>> sumed = grouped.sum(1);
        sumed.print();// 将聚合后的数据写入到数据库中
        sumed.addSink(JdbcSink.sink("INSERT INTO t_wordcount(word, counts) VALUES(?,?) ON DUPLICATE KEY UPDATE counts = ?",(ps, t)->{
                    ps.setString(1, t.f0);
                    ps.setInt(2, t.f1);
                    ps.setInt(3, t.f1);},newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://node1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false").withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("123456").build()));// 启动
        env.execute();}}

结果:

终端输出:打印更新后的词频统计
8> (hadoop,1)
7> (flink,1)
1> (spark,1)
1> (kafak,1)
7> (flink,2)
7> (flume,1)

mysql 的表中实时更新词频统计数据

总结:

  • 1- checkpoint 机制能够生成 state,然后批量写入数据库
  • 2- 写入数据库操作中,此事不需要和之前数据源在 mysql 中要定义 java bean 类
  • 3- (ps, t) -> 这里是运用 lambda 表达式,preparedstatement 对象和 tuple 元组获取 sql 语句中的输入参数
  • 4- 写入到 mysql 中要 JdbcSink.sink 方法,四个参数,一个是 sql 语句,一个是 sql 传入参数,一个是 mysql 配置连接
  • 5- mysql 连接对象需要 new 一个 JdbcConnectionOptions.JdbcConnectionOptionsBuilder
5.2 Kafka Connector

Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。它提供对一个或多个 Kafka topics 的访问。

Flink 的 Kafka Producer 称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。

5.2.1 依赖
# 要使用此反序列化 schema 必须添加以下依赖:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro</artifactId>
    <version>{{site.version }}</version>
</dependency>
5.2.2 代码实现-Kafka Producer

将 socket 数据源写入到 kafka 的 test Topic 中

packagecn.itcast.day07.connectors;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importjava.util.Properties;/**
 * @author lql
 * @time 2024-02-18 22:31:46
 * @description TODO:使用自定义sink官方提供flink-connector-kafka-2.11中的FlinkKafkaProducer实现数据的流的方式写入kafka数据
 */publicclassKafkaProducer{publicstaticvoidmain(String[] args)throwsException{//todo 1)初始化flink流处理环境Configuration configuration =newConfiguration();
        configuration.setInteger("rest.port",8081);//设置webui的端口号StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);// 开启 checkpoint 保证一致性语义
        env.enableCheckpointing(5000);//todo 2)接入数据源DataStreamSource<String> lines = env.socketTextStream("node1",9999);//todo 3)创建kafka的生产者实例//指定topic的名称String topicName ="test";// 实例化 FlinkKafkaProducer 对象Properties props =newProperties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");FlinkKafkaProducer<String> myProducer =newFlinkKafkaProducer<>(topicName,newSimpleStringSchema(), props);// todo 4) 将数据写入 kafka 中
        lines.addSink(myProducer);
        env.execute();}}

结果:socket 数据能够源源不断写入到 kafka 中

总结:

  • 1- 开启 checkpoint 能够保证一致性语义
  • 2- FlinkKafkaProducer 能够担任 kafka 生产者角色
5.2.3 代码实现-Kafka Comsumer

消费 kafka 指定主题中的数据

packagecn.itcast.day07.connectors;/**
 * @author lql
 * @time 2024-02-18 22:37:50
 * @description TODO:使用flink-connector-kafka-2.11中的FlinkKafkaConsumer消费kafka中的数据进行wordcount计算
 */importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importorg.apache.flink.util.Collector;importorg.apache.kafka.clients.consumer.ConsumerConfig;importjava.util.Properties;/**
 * 需求:使用flink-connector-kafka-2.11中的FlinkKafkaConsumer消费kafka中的数据进行wordcount计算
 * 需要设置的参数:
 * 1:主题名称
 * 2:反序列化的规则
 * 3:消费者属性-集群地址
 * 4:消费者属性-消费者组id(如果不设置,会有默认的消费者组id,但是默认的不方便管理)
 * 5:消费者属性-offset重置规则
 * 6:动态分区检测(当kafka的分区数量发生变化,flink能够感知到)
 * 7:如果没有开启checkpoint,那么可以设置自动递交offset
 *    如果开启了checkpoint,checkpoint会将kafka的offset随着checkpoint成功的时候递交到默认的主题中
 */publicclassKafkaConsumer{publicstaticvoidmain(String[] args)throwsException{//todo 1)初始化flink流处理环境Configuration configuration =newConfiguration();
        configuration.setInteger("rest.port",8082);//设置webui的端口号StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);//todo 2)接入数据源//指定topic的名称String topicName ="test";//实例化kafkaConsumer对象Properties props =newProperties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test001");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");// 消费最新的数据
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");// 自动提交偏移量offset
        props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"2000");// 提交偏移量的时间间隔
        props.setProperty("flink.partition-discovery.interval-millis","5000");//开启一个后台线程每隔5s检测一次kafka的分区情况FlinkKafkaConsumer<String> kafkaSource =newFlinkKafkaConsumer<>(topicName,newSimpleStringSchema(), props);//在开启checkpoint以后,offset的递交会随着checkpoint的成功而递交,从而实现一致性语义,默认就是true
        kafkaSource.setCommitOffsetsOnCheckpoints(true);DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);//todo 3)单词计数操作SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = kafkaDS.flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Integer>> out)throwsException{String[] words = value.split(" ");for(String word : words){
                    out.collect(Tuple2.of(word,1));}}});//todo 4)单词分组操作SingleOutputStreamOperator<Tuple2<String,Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);//todo 5)打印计算结果
        result.print();//todo 6)启动作业
        env.execute();}}

结果:一个模拟生产者,一个模拟消费者,做到实时生产数据和消费数据!

总结:

  • 1- 理解各类参数作用
  • 2- FlinkKafkaConsumer 是 kafka 消费者
  • 3- setCommitOffsetsOnCheckpoints 这里是 offset 要实现一致性语义

拓展:

  • 1- 首先要启动 zookeeper,三台机器一键启动脚本
[root@node1~]# vim startZk.sh
  
#!/bin/bash
hosts=(node1 node2 node3)for host in ${hosts[*]}do
 ssh $host "source /etc/profile;/export/server/zookeeper-3/bin/zkServer.sh start"
done

# 赋予脚本用户执行权限
[root@node1~]# chmod u+x startZk.sh
  • 2- zookeeper 一键关闭脚本
[root@node1~]# vim stopZk.sh
  
#!/bin/bash
hosts=(node1 node2 node3)for host in ${hosts[*]}do
 ssh $host "/export/server/zookeeper-3.4.6/bin/zkServer.sh stop"Done

# 赋予脚本用户执行权限
[root@node1~]# chmod u+x stopZk.sh
  • 3- kafka 可视化工具 kafka Tool 很方便!
  • 4- kafka 基本命令:
    前台:
    cd /export/servers/kafka_2.11-0.10.0.0 
    bin/kafka-server-start.sh config/server.properties
    后台:
    cd /export/servers/kafka_2.11-0.10.0.0
    nohup bin/kafka-server-start.sh config/server.properties 2>&1&
    停止:
    cd /export/servers/kafka_2.11-0.10.0.0
    bin/kafka-server-stop.sh

    消费数据:
    bin/kafka-console-consumer.sh --zookeeper node01:2181--from-beginning --topic vehicledata-dev
    
    生产数据:
    bin/kafka-console-producer.sh --broker-list node01:9092--topic vehicledata-dev

    创建topic:
    ./bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1--partitions 1--topic test1
    
    删除topic:
    ./bin/kafka-topics.sh  --delete --zookeeper node01:2181--topic vehicledata-dev
    bin/zkCli.sh -server node01:2181
    rmr /brokers/topics/vehicledata-dev
    
    罗列topic:
    bin/kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
5.3 Redis Connector

数据输出到Redis数据库中,Redis是一个基于内存、性能极高的NoSQL数据库,数据还可以持久化到磁盘,读写速度快,适合存储key-value类型的数据。

/**
 * 从指定的socket读取数据,对单词进行计算,将结果写入到Redis中
 */publicclassRedisSinkDemo{publicstaticvoidmain(String[] args)throwsException{//创建Flink流计算执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//创建DataStream//SourceDataStreamSource<String> lines = env.socketTextStream("node01",9999);//调用Transformation开始SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = lines.flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicvoidflatMap(String line,Collector<Tuple2<String,Integer>> collector)throwsException{String[] words = line.split(" ");for(String word : words){//new Tuple2<String, Integer>(word, 1)
                    collector.collect(Tuple2.of(word,1));}}});//分组KeyedStream<Tuple2<String,Integer>,String> keyed = wordAndOne.keyBy(newKeySelector<Tuple2<String,Integer>,String>(){@OverridepublicStringgetKey(Tuple2<String,Integer> tp)throwsException{return tp.f0;}});//聚合SingleOutputStreamOperator<Tuple2<String,Integer>> summed = keyed.sum(1);//Transformation结束//调用Sink//summed.addSink()FlinkJedisPoolConfig conf =newFlinkJedisPoolConfig.Builder().setHost("node03").setPassword("123456").setDatabase(8).build();

        summed.addSink(newRedisSink<Tuple2<String,Integer>>(conf,newRedisWordCountMapper()));//启动执行
        env.execute("StreamingWordCount");}// 声明一个公共的静态内部类 RedisWordCountMapper,它实现了 RedisMapper 接口。  // RedisMapper 接口可能是用于将数据映射到Redis命令和键值对的自定义接口。  publicstaticclassRedisWordCountMapperimplementsRedisMapper<Tuple2<String,Integer>>{// 重写 getCommandDescription 方法,该方法返回一个 RedisCommandDescription 对象,  // 该对象描述了要执行的Redis命令和相关的参数。  // 在这里,它配置为执行 HSET 命令(用于设置哈希表中的字段的值)在名为 "WORD_COUNT" 的Redis哈希上。  @OverridepublicRedisCommandDescriptiongetCommandDescription(){returnnewRedisCommandDescription(RedisCommand.HSET,"WORD_COUNT");}// 重写 getKeyFromData 方法,该方法从给定的 Tuple2 数据中提取键(在这里是字符串)。  // Tuple2 是一个包含两个元素(在这里是一个字符串和一个整数)的元组,f0 是其第一个元素。  @OverridepublicStringgetKeyFromData(Tuple2<String,Integer> data){return data.f0;// 返回元组的第一个元素,即字符串,作为Redis的键。  }// 重写 getValueFromData 方法,该方法从给定的 Tuple2 数据中提取值(在这里是整数),并将其转换为字符串。  // Tuple2 的 f1 是其第二个元素,即整数。这个方法将这个整数转换为字符串,因为Redis通常存储字符串值。  @OverridepublicStringgetValueFromData(Tuple2<String,Integer> data){return data.f1.toString();// 返回元组的第二个元素(整数)的字符串表示形式,作为Redis的值。  }}}
标签: flink 笔记 大数据

本文转载自: https://blog.csdn.net/m0_60732994/article/details/136160670
版权归原作者 卡林神不是猫 所有, 如有侵权,请联系我们删除。

“flink重温笔记(七):Flink 流批一体 API 开发—— Connector 连接器”的评论:

还没有评论