Flink学习笔记
前言:今天是学习 flink 的第七天啦!学习了 flink 中 connector(数据连接器) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,主要学习了数据存储到以下三处:
1、关系型数据库 mysql ;
2、消息队列:kafka;
3、非关系型数据库:redis
我觉得还是比较有意思的,这些是以后工作要用到的技能,我一定要好好掌握!
Tips:“莫道春光难揽取,浮云过后艳阳天!”明天周一,又是新的一天,要深入学习 flink 的四大基石属性!
文章目录
二、Flink 流批一体 API 开发
5. Connectors
5.1 JDBC Connector
该连接器可以向 JDBC 数据库写入数据。
5.1.1 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.13.1</version>
</dependency>
5.1.2 案例演示
- 需求:从指定的socket读取数据,对单词进行计算,最后将结果写入到MySQL
- Mysql创建表:
CREATE TABLE `t_wordcount` (
`word` varchar(255) NOT NULL,
`counts` int(11) DEFAULT '0',
PRIMARY KEY (`word`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 代码:
packagecn.itcast.day07.connectors;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.connector.jdbc.JdbcConnectionOptions;importorg.apache.flink.connector.jdbc.JdbcSink;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
* @author lql
* @time 2024-02-18 20:25:31
* @description TODO:从指定的socket读取数据,对单词进行计算,最后将结果写入到MySQL
*/publicclassJDBCSinkDemo{publicstaticvoidmain(String[] args)throwsException{// 1) 初始化flink流处理的环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 启动检查点机制,每隔 5 秒生成一个 slot// 栅栏会发送给job作业第一个算子(source),(source)收到栅栏后,会阻塞当前算子计算任务,将当前计算结果 state 数据持久化存储// 通过checkpoint将中间结果存储下来,只需要在处理完成后进行一次性写入,可以减少与数据库的交互次数// 而通过checkpoint将数据分批写入数据库,可以避免数据库锁,资源竞争等问题。// 一致性语义: exactly-once:就是操作失败时会进行回滚
env.enableCheckpointing(5000);// 2)定义数据源DataStreamSource<String> lines = env.socketTextStream("node1",9999);// 词频统计SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = lines.flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicvoidflatMap(String line,Collector<Tuple2<String,Integer>> collector)throwsException{String[] words = line.split(" ");for(String word : words){
collector.collect(Tuple2.of(word,1));}}});// 分组KeyedStream<Tuple2<String,Integer>,String> grouped = wordAndOne.keyBy(f -> f.f0);// 聚合SingleOutputStreamOperator<Tuple2<String,Integer>> sumed = grouped.sum(1);
sumed.print();// 将聚合后的数据写入到数据库中
sumed.addSink(JdbcSink.sink("INSERT INTO t_wordcount(word, counts) VALUES(?,?) ON DUPLICATE KEY UPDATE counts = ?",(ps, t)->{
ps.setString(1, t.f0);
ps.setInt(2, t.f1);
ps.setInt(3, t.f1);},newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://node1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false").withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("123456").build()));// 启动
env.execute();}}
结果:
终端输出:打印更新后的词频统计
8> (hadoop,1)
7> (flink,1)
1> (spark,1)
1> (kafak,1)
7> (flink,2)
7> (flume,1)
mysql 的表中实时更新词频统计数据
总结:
- 1- checkpoint 机制能够生成 state,然后批量写入数据库
- 2- 写入数据库操作中,此事不需要和之前数据源在 mysql 中要定义 java bean 类
- 3- (ps, t) -> 这里是运用 lambda 表达式,preparedstatement 对象和 tuple 元组获取 sql 语句中的输入参数
- 4- 写入到 mysql 中要 JdbcSink.sink 方法,四个参数,一个是 sql 语句,一个是 sql 传入参数,一个是 mysql 配置连接
- 5- mysql 连接对象需要 new 一个 JdbcConnectionOptions.JdbcConnectionOptionsBuilder
5.2 Kafka Connector
Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。它提供对一个或多个 Kafka topics 的访问。
Flink 的 Kafka Producer 称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。
5.2.1 依赖
# 要使用此反序列化 schema 必须添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>{{site.version }}</version>
</dependency>
5.2.2 代码实现-Kafka Producer
将 socket 数据源写入到 kafka 的 test Topic 中
packagecn.itcast.day07.connectors;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importjava.util.Properties;/**
* @author lql
* @time 2024-02-18 22:31:46
* @description TODO:使用自定义sink官方提供flink-connector-kafka-2.11中的FlinkKafkaProducer实现数据的流的方式写入kafka数据
*/publicclassKafkaProducer{publicstaticvoidmain(String[] args)throwsException{//todo 1)初始化flink流处理环境Configuration configuration =newConfiguration();
configuration.setInteger("rest.port",8081);//设置webui的端口号StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);// 开启 checkpoint 保证一致性语义
env.enableCheckpointing(5000);//todo 2)接入数据源DataStreamSource<String> lines = env.socketTextStream("node1",9999);//todo 3)创建kafka的生产者实例//指定topic的名称String topicName ="test";// 实例化 FlinkKafkaProducer 对象Properties props =newProperties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");FlinkKafkaProducer<String> myProducer =newFlinkKafkaProducer<>(topicName,newSimpleStringSchema(), props);// todo 4) 将数据写入 kafka 中
lines.addSink(myProducer);
env.execute();}}
结果:socket 数据能够源源不断写入到 kafka 中
总结:
- 1- 开启 checkpoint 能够保证一致性语义
- 2- FlinkKafkaProducer 能够担任 kafka 生产者角色
5.2.3 代码实现-Kafka Comsumer
消费 kafka 指定主题中的数据
packagecn.itcast.day07.connectors;/**
* @author lql
* @time 2024-02-18 22:37:50
* @description TODO:使用flink-connector-kafka-2.11中的FlinkKafkaConsumer消费kafka中的数据进行wordcount计算
*/importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importorg.apache.flink.util.Collector;importorg.apache.kafka.clients.consumer.ConsumerConfig;importjava.util.Properties;/**
* 需求:使用flink-connector-kafka-2.11中的FlinkKafkaConsumer消费kafka中的数据进行wordcount计算
* 需要设置的参数:
* 1:主题名称
* 2:反序列化的规则
* 3:消费者属性-集群地址
* 4:消费者属性-消费者组id(如果不设置,会有默认的消费者组id,但是默认的不方便管理)
* 5:消费者属性-offset重置规则
* 6:动态分区检测(当kafka的分区数量发生变化,flink能够感知到)
* 7:如果没有开启checkpoint,那么可以设置自动递交offset
* 如果开启了checkpoint,checkpoint会将kafka的offset随着checkpoint成功的时候递交到默认的主题中
*/publicclassKafkaConsumer{publicstaticvoidmain(String[] args)throwsException{//todo 1)初始化flink流处理环境Configuration configuration =newConfiguration();
configuration.setInteger("rest.port",8082);//设置webui的端口号StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);//todo 2)接入数据源//指定topic的名称String topicName ="test";//实例化kafkaConsumer对象Properties props =newProperties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test001");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");// 消费最新的数据
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");// 自动提交偏移量offset
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"2000");// 提交偏移量的时间间隔
props.setProperty("flink.partition-discovery.interval-millis","5000");//开启一个后台线程每隔5s检测一次kafka的分区情况FlinkKafkaConsumer<String> kafkaSource =newFlinkKafkaConsumer<>(topicName,newSimpleStringSchema(), props);//在开启checkpoint以后,offset的递交会随着checkpoint的成功而递交,从而实现一致性语义,默认就是true
kafkaSource.setCommitOffsetsOnCheckpoints(true);DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);//todo 3)单词计数操作SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = kafkaDS.flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Integer>> out)throwsException{String[] words = value.split(" ");for(String word : words){
out.collect(Tuple2.of(word,1));}}});//todo 4)单词分组操作SingleOutputStreamOperator<Tuple2<String,Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);//todo 5)打印计算结果
result.print();//todo 6)启动作业
env.execute();}}
结果:一个模拟生产者,一个模拟消费者,做到实时生产数据和消费数据!
总结:
- 1- 理解各类参数作用
- 2- FlinkKafkaConsumer 是 kafka 消费者
- 3- setCommitOffsetsOnCheckpoints 这里是 offset 要实现一致性语义
拓展:
- 1- 首先要启动 zookeeper,三台机器一键启动脚本
[root@node1~]# vim startZk.sh
#!/bin/bash
hosts=(node1 node2 node3)for host in ${hosts[*]}do
ssh $host "source /etc/profile;/export/server/zookeeper-3/bin/zkServer.sh start"
done
# 赋予脚本用户执行权限
[root@node1~]# chmod u+x startZk.sh
- 2- zookeeper 一键关闭脚本
[root@node1~]# vim stopZk.sh
#!/bin/bash
hosts=(node1 node2 node3)for host in ${hosts[*]}do
ssh $host "/export/server/zookeeper-3.4.6/bin/zkServer.sh stop"Done
# 赋予脚本用户执行权限
[root@node1~]# chmod u+x stopZk.sh
- 3- kafka 可视化工具 kafka Tool 很方便!
- 4- kafka 基本命令:
前台:
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-server-start.sh config/server.properties
后台:
cd /export/servers/kafka_2.11-0.10.0.0
nohup bin/kafka-server-start.sh config/server.properties 2>&1&
停止:
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-server-stop.sh
消费数据:
bin/kafka-console-consumer.sh --zookeeper node01:2181--from-beginning --topic vehicledata-dev
生产数据:
bin/kafka-console-producer.sh --broker-list node01:9092--topic vehicledata-dev
创建topic:
./bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1--partitions 1--topic test1
删除topic:
./bin/kafka-topics.sh --delete --zookeeper node01:2181--topic vehicledata-dev
bin/zkCli.sh -server node01:2181
rmr /brokers/topics/vehicledata-dev
罗列topic:
bin/kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
5.3 Redis Connector
数据输出到Redis数据库中,Redis是一个基于内存、性能极高的NoSQL数据库,数据还可以持久化到磁盘,读写速度快,适合存储key-value类型的数据。
/**
* 从指定的socket读取数据,对单词进行计算,将结果写入到Redis中
*/publicclassRedisSinkDemo{publicstaticvoidmain(String[] args)throwsException{//创建Flink流计算执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//创建DataStream//SourceDataStreamSource<String> lines = env.socketTextStream("node01",9999);//调用Transformation开始SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = lines.flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicvoidflatMap(String line,Collector<Tuple2<String,Integer>> collector)throwsException{String[] words = line.split(" ");for(String word : words){//new Tuple2<String, Integer>(word, 1)
collector.collect(Tuple2.of(word,1));}}});//分组KeyedStream<Tuple2<String,Integer>,String> keyed = wordAndOne.keyBy(newKeySelector<Tuple2<String,Integer>,String>(){@OverridepublicStringgetKey(Tuple2<String,Integer> tp)throwsException{return tp.f0;}});//聚合SingleOutputStreamOperator<Tuple2<String,Integer>> summed = keyed.sum(1);//Transformation结束//调用Sink//summed.addSink()FlinkJedisPoolConfig conf =newFlinkJedisPoolConfig.Builder().setHost("node03").setPassword("123456").setDatabase(8).build();
summed.addSink(newRedisSink<Tuple2<String,Integer>>(conf,newRedisWordCountMapper()));//启动执行
env.execute("StreamingWordCount");}// 声明一个公共的静态内部类 RedisWordCountMapper,它实现了 RedisMapper 接口。 // RedisMapper 接口可能是用于将数据映射到Redis命令和键值对的自定义接口。 publicstaticclassRedisWordCountMapperimplementsRedisMapper<Tuple2<String,Integer>>{// 重写 getCommandDescription 方法,该方法返回一个 RedisCommandDescription 对象, // 该对象描述了要执行的Redis命令和相关的参数。 // 在这里,它配置为执行 HSET 命令(用于设置哈希表中的字段的值)在名为 "WORD_COUNT" 的Redis哈希上。 @OverridepublicRedisCommandDescriptiongetCommandDescription(){returnnewRedisCommandDescription(RedisCommand.HSET,"WORD_COUNT");}// 重写 getKeyFromData 方法,该方法从给定的 Tuple2 数据中提取键(在这里是字符串)。 // Tuple2 是一个包含两个元素(在这里是一个字符串和一个整数)的元组,f0 是其第一个元素。 @OverridepublicStringgetKeyFromData(Tuple2<String,Integer> data){return data.f0;// 返回元组的第一个元素,即字符串,作为Redis的键。 }// 重写 getValueFromData 方法,该方法从给定的 Tuple2 数据中提取值(在这里是整数),并将其转换为字符串。 // Tuple2 的 f1 是其第二个元素,即整数。这个方法将这个整数转换为字符串,因为Redis通常存储字符串值。 @OverridepublicStringgetValueFromData(Tuple2<String,Integer> data){return data.f1.toString();// 返回元组的第二个元素(整数)的字符串表示形式,作为Redis的值。 }}}
版权归原作者 卡林神不是猫 所有, 如有侵权,请联系我们删除。