文章目录
案例:实时处理电商订单信息
使用
Flink
消费
Kafka
中的数据,并进行相应的数据统计计算。
数据格式为:
"3443","严致","13207871570","1449.00","1001","2790","第4大街第5号楼4单元464门","描述345855","214537477223728","小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机等1件商品","2020/4/25 18:47:14","2020/4/26 18:59:01","2020/4/25 19:02:14","","","http://img.gmall.com/117814.jpg","20","0.00","1442.00","7.00"
"3444","慕容亨","13028730359","17805.00","1004","2015","第9大街第26号楼3单元383门","描述948496","226551358533723","Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待等2件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/353392.jpg","11","0.00","17800.00","5.00"
"3445","姚兰凤","13080315675","16180.00","1003","8263","第5大街第1号楼7单元722门","描述148518","754426449478474","联想(Lenovo)拯救者Y7000 英特尔酷睿i7 2019新款 15.6英寸发烧游戏本笔记本电脑(i7-9750H 8GB 512GB SSD GTX1650 4G 高色域等3件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/478856.jpg","26","3935.00","20097.00","18.00"
"3446","柏锦黛","13487267342","4922.00","1002","7031","第17大街第40号楼2单元564门","描述779464","262955273144195","十月稻田 沁州黄小米 (黄小米 五谷杂粮 山西特产 真空装 大米伴侣 粥米搭档) 2.5kg等4件商品","2020/4/25 18:47:14","2020/4/26 19:11:37","2020/4/25 19:02:14","","","http://img.gmall.com/144444.jpg","30","0.00","4903.00","19.00"
字段描述为:
其中 order_status 订单状态的描述为:1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。
提示:pom 文件依赖,放在在文章最后有需要自取。
需求一:统计商城实时订单实收金额
注意:(需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加)
代码实现:
importorg.apache.flink.api.common.serialization.SimpleStringSchema
importorg.apache.flink.streaming.api.TimeCharacteristic
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
importorg.apache.kafka.clients.consumer.ConsumerConfig
importjava.text.SimpleDateFormat
importjava.util.Properties
object test1 {// 封装数据用到的样例类caseclass order(id:String, consignee:String, consignee_tel:String, feight_fee:Double, amount:Double, status:String, create_time:Long, operate_time:Long)def main(args: Array[String]):Unit={// 创建流数据环境val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置时间语义 设置为时间时间,根据数据的时间戳来判断
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 创建 Kafka 消费者配置对象(这个需要导入的包是 java.util 里面的包)val properties =new Properties()// 配置 Kafka 服务端地址
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092")// 指定消费组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"order_consumer")// 创建消费者val consumer =new FlinkKafkaConsumer[String]("order",new SimpleStringSchema(), properties)// 连接 Kafka 数据源,获取数据val dataStream = env.addSource(consumer)// 得到数据后,我们就可以开始处理元数据了val data = dataStream
// 对每条数据使用逗号 "," 进行切分,去掉前后双引号.map(
data =>{val arr = data.split(",")// 遍历数组,去点每个字段的前后双引号for(i <-0 until arr.length){
arr(i)= arr(i).replace("\"","")// 因为有些字段是空的,所以我们判断如果是空的我们就把他赋值为 nullif(arr(i)==""){
arr(i)="null"}}// 数据格式处理完后,开始对可用数据进行封装// 需要封装的字段为:id,consignee,consignee_tel,final_total_amount,order_status,create_time,operate_time// 使用 SimpleDateFormat 将时间字符串转换为 时间戳
order(arr(0), arr(1), arr(2), arr(arr.length -1).toDouble, arr(3).toDouble, arr(4),new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(10)).getTime,new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(11)).getTime)})// 过滤不进入计算的数据.filter(_.status !="1003").filter(_.status !="1005").filter(_.status !="1006")// 将 时间戳设置为事件时间.assignAscendingTimestamps(t =>{if(t.create_time > t.operate_time){
t.create_time
}else{
t.operate_time
}})
data.print("清洗完的数据")// 输出数据
data.map(
data =>{(1, data.amount)}).keyBy(0).sum(1).print("订单实收金额")
env.execute()}}
测试:
这里我就不开启
Kafka 生产者
了,直接使用上一篇文章中写的 Flume 从端口获取数据,输出到多端(Kafka、HDFS)单文件 文件,去监控端口转存到
Kafka
中。
如果没有配置 Flume,可以直接使用
Kafka
开启一个生产者:
bin/kafka-console-producer.sh --topic order --broker-list master:9092
(注意需要进入到 Kafka 安装目录下)
开启 Flume 监控端口后,就可以直接用
telnet master 10050
登陆端口,进行发送数据。
发送数据:
"3443","严致","13207871570","1449.00","1001","2790","第4大街第5号楼4单元464门","描述345855","214537477223728","小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机等1件商品","2020/4/25 18:47:14","2020/4/26 18:59:01","2020/4/25 19:02:14","","","http://img.gmall.com/117814.jpg","20","0.00","1442.00","7.00"
"3444","慕容亨","13028730359","17805.00","1004","2015","第9大街第26号楼3单元383门","描述948496","226551358533723","Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待等2件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/353392.jpg","11","0.00","17800.00","5.00"
"3445","姚兰凤","13080315675","16180.00","1003","8263","第5大街第1号楼7单元722门","描述148518","754426449478474","联想(Lenovo)拯救者Y7000 英特尔酷睿i7 2019新款 15.6英寸发烧游戏本笔记本电脑(i7-9750H 8GB 512GB SSD GTX1650 4G 高色域等3件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/478856.jpg","26","3935.00","20097.00","18.00"
"3446","柏锦黛","13487267342","4922.00","1002","7031","第17大街第40号楼2单元564门","描述779464","262955273144195","十月稻田 沁州黄小米 (黄小米 五谷杂粮 山西特产 真空装 大米伴侣 粥米搭档) 2.5kg等4件商品","2020/4/25 18:47:14","2020/4/26 19:11:37","2020/4/25 19:02:14","","","http://img.gmall.com/144444.jpg","30","0.00","4903.00","19.00"
"3447","计娴瑾","13208002474","6665.00","1001","5903","第4大街第25号楼6单元338门","描述396659","689816418657611","荣耀10青春版 幻彩渐变 2400万AI自拍 全网通版4GB+64GB 渐变蓝 移动联通电信4G全面屏手机 双卡双待等3件商品","2020/4/25 18:47:14","2020/4/25 18:47:14","2020/4/25 19:02:14","","","http://img.gmall.com/793265.jpg","29","0.00","6660.00","5.00"
"3448","时友裕","13908519819","217.00","1004","5525","第19大街第27号楼9单元874门","描述286614","675628874311147","迪奥(Dior)烈艳蓝金唇膏 口红 3.5g 999号 哑光-经典正红等1件商品","2020/4/25 18:47:14","2020/4/26 18:55:02","2020/4/25 19:02:14","","","http://img.gmall.com/553516.jpg","32","55.00","252.00","20.00"
"3449","东郭妍","13289011809","164.00","1006","9321","第11大街第33号楼6单元645门","描述368435","489957278482334","北纯精制黄小米(小黄米 月子米 小米粥 粗粮杂粮 大米伴侣)2.18kg等1件商品","2020/4/25 18:47:14","2020/4/26 23:10:20","2020/4/25 19:02:14","","","http://img.gmall.com/235333.jpg","22","0.00","145.00","19.00"
"3450","汪毅","13419873912","1064.00","1001","1088","第7大街第9号楼2单元723门","描述774486","661124816482447","Dior迪奥口红唇膏送女友老婆礼物生日礼物 烈艳蓝金999+888两支装礼盒等3件商品","2020/4/25 18:47:14","2020/4/26 18:48:16","2020/4/25 19:02:14","","","http://img.gmall.com/552723.jpg","28","432.00","1488.00","8.00"
运行结果:
成功!!
需求二:将上面的最后计算的结果,存储到 Redis 中(Key 为:totalprice)
关键知识点:
RedisSink
、
自定义 RedisMapper 类
Redis Sink
Flink 有专门的 Sink 到 Redis 的对象(RedisSink):
public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper)
创建时,还需要设置输入数据的泛型:
new RedisSink[IN]()
需要传入的数据为 Redis 的基本配置:主机、端口…,redisSinkMapper 方法。
我们先创建一个
FlinkJedisPoolConfig
构造器:
val conf =new FlinkJedisPoolConfig.Builder()// 设置主机.setHost("master")// 设置端口.setPort(6379)// 构建.build()
创建一个 RedisSink:
val sinkToRedis = new RedisSink[(Int,Double)](conf,new MyRedis("totalprice"))
(MyRedis 见下方 自定义 RedisMapper 类)
自定义 RedisMapper 类
类源码:
publicinterfaceRedisMapper<T>extendsFunction,Serializable{/**
* Returns descriptor which defines data type.
*
* @return data type descriptor
*/RedisCommandDescriptiongetCommandDescription();/**
* Extracts key from data.
*
* @param data source data
* @return key
*/StringgetKeyFromData(T data);/**
* Extracts value from data.
*
* @param data source data
* @return value
*/StringgetValueFromData(T data);}
通过源码我们可以知道,
T
表示传入数据的泛型,还需要重写三个方法:
getCommandDescription
(创建 Redis 描述器:使用什么方法写入 Redis,比如:HSET、SET…),
getKeyFromData
(传入 Redis 中键值对的 Key 值),
getValueFromData
(传入 Redis 中键值对的值)。
代码示例:
// 自定义 SinkRedis 类// 调用时需要传入一个值作为 Keyclass MyRedis(Key:String)extends RedisMapper[(Int,Double)]{overridedef getCommandDescription: RedisCommandDescription ={// 创建 Redis 描述器,使用 SET 方法new RedisCommandDescription(RedisCommand.SET)}// Key 的名字使用前方传入的值为 Key 值overridedef getKeyFromData(data:(Int,Double)):String= Key
// 将数据转换成 String 类型再存储到 Redis 中overridedef getValueFromData(data:(Int,Double)):String= data._2.toString
}
整体代码如下:
importorg.apache.flink.api.common.serialization.SimpleStringSchema
importorg.apache.flink.streaming.api.TimeCharacteristic
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
importorg.apache.flink.streaming.connectors.redis.RedisSink
importorg.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
importorg.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}importorg.apache.kafka.clients.consumer.ConsumerConfig
importjava.text.SimpleDateFormat
importjava.util.Properties
object test1 {// 封装数据用到的样例类caseclass order(id:String, consignee:String, consignee_tel:String, feight_fee:Double, amount:Double, status:String, create_time:Long, operate_time:Long)def main(args: Array[String]):Unit={// 创建流数据环境val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置时间语义 设置为时间时间,根据数据的时间戳来判断
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 创建 Kafka 消费者配置对象(这个需要导入的包是 java.util 里面的包)val properties =new Properties()// 配置 Kafka 服务端地址
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092")// 指定消费组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"order_consumer")// 创建消费者val consumer =new FlinkKafkaConsumer[String]("order",new SimpleStringSchema(), properties)// 连接 Kafka 数据源,获取数据val dataStream = env.addSource(consumer)// 得到数据后,我们就可以开始处理元数据了val data = dataStream
// 对每条数据使用逗号 "," 进行切分,去掉前后双引号.map(
data =>{val arr = data.split(",")// 遍历数组,去点每个字段的前后双引号for(i <-0 until arr.length){
arr(i)= arr(i).replace("\"","")// 因为有些字段是空的,所以我们判断如果是空的我们就把他赋值为 nullif(arr(i)==""){
arr(i)="null"}}// 数据格式处理完后,开始对可用数据进行封装// 需要封装的字段为:id,consignee,consignee_tel,final_total_amount,order_status,create_time,operate_time// 使用 SimpleDateFormat 将时间字符串转换为 时间戳
order(arr(0), arr(1), arr(2), arr(arr.length -1).toDouble, arr(3).toDouble, arr(4),new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(10)).getTime,new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(11)).getTime)})// 过滤不进入计算的数据.filter(_.status !="1003").filter(_.status !="1005").filter(_.status !="1006")// 将 时间戳设置为事件时间.assignAscendingTimestamps(t =>{if(t.create_time > t.operate_time){
t.create_time
}else{
t.operate_time
}})
data.print("清洗完的数据")// 输出数据val sinkData = data.map(
data =>{(1, data.amount)}).keyBy(0).sum(1)
sinkData.print("订单实收金额")// ----------------------------- 分割线------------------------------------------// 下面为 需求二 添加的代码val conf =new FlinkJedisPoolConfig.Builder().setHost("master").setPort(6379).build()val SinkToRedis =new RedisSink[(Int,Double)](conf,new MyRedis("totalprice"))
sinkData.addSink(SinkToRedis)
env.execute()}}// 自定义 SinkRedis 类// 调用时需要传入一个值作为 Keyclass MyRedis(Key:String)extends RedisMapper[(Int,Double)]{overridedef getCommandDescription: RedisCommandDescription ={// 创建 Redis 描述器,使用 SET 方法new RedisCommandDescription(RedisCommand.SET)}// Key 的名字使用前方传入的值为 Key 值overridedef getKeyFromData(data:(Int,Double)):String= Key
// 将数据转换成 String 类型再存储到 Redis 中overridedef getValueFromData(data:(Int,Double)):String= data._2.toString
}
需要先启动 Redis:
redis-server &
(后台启动)
进入 Redis 交互界面:
redis-cli
输入数据:
"3443","严致","13207871570","1449.00","1001","2790","第4大街第5号楼4单元464门","描述345855","214537477223728","小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机等1件商品","2020/4/25 18:47:14","2020/4/26 18:59:01","2020/4/25 19:02:14","","","http://img.gmall.com/117814.jpg","20","0.00","1442.00","7.00"
"3444","慕容亨","13028730359","17805.00","1004","2015","第9大街第26号楼3单元383门","描述948496","226551358533723","Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待等2件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/353392.jpg","11","0.00","17800.00","5.00"
"3445","姚兰凤","13080315675","16180.00","1003","8263","第5大街第1号楼7单元722门","描述148518","754426449478474","联想(Lenovo)拯救者Y7000 英特尔酷睿i7 2019新款 15.6英寸发烧游戏本笔记本电脑(i7-9750H 8GB 512GB SSD GTX1650 4G 高色域等3件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/478856.jpg","26","3935.00","20097.00","18.00"
"3446","柏锦黛","13487267342","4922.00","1002","7031","第17大街第40号楼2单元564门","描述779464","262955273144195","十月稻田 沁州黄小米 (黄小米 五谷杂粮 山西特产 真空装 大米伴侣 粥米搭档) 2.5kg等4件商品","2020/4/25 18:47:14","2020/4/26 19:11:37","2020/4/25 19:02:14","","","http://img.gmall.com/144444.jpg","30","0.00","4903.00","19.00"
测试结果:
需求三:使用侧边流,监控发现 order_status 字段为退回完成,将退回总额存入到 Redis 中,将 order_status 字段为取消订单的存入到 MySQL 中(Sink 到 MySQL 的偷懒没有仔细写了,直接放在最后的代码里面了)。
侧输出流
需要创建一个为侧输出流的变量:
new outputTag[T](id:String)
参数说明:
T
:输入数据类型;id:String
:设置创建侧边流的 id,必须为字符串类型。
然后我们就可以使用万能的
processFunction
进行分流操作:
val sideToRedis =new OutputTag[order]("Status1006")val sideToMySQL =new OutputTag[order]("Status1003")
data
.process(new ProcessFunction[order,order]{// i:输入数据;context:上下文,可以获取时间戳、输出数据到侧输出流;collector:抛出数据,设置类型为样例类 order;overridedef processElement(i: order, context: ProcessFunction[order, order]#Context, collector: Collector[order]):Unit={// 传入的数据状态为 1006 就放入侧输出流 sideToRedis 中。if(i.status =="1006"){
context.output(sideToRedis,i)}elseif(i.status =="1003"){
context.output(sideToMySQL,i)}else{// 其他的就抛出去正常输出
collector.collect(i)}}})
注意我们这里使用了侧输出流,上面的 filter 过滤就可以去掉了,因为我们等于使用侧输出流的方式将数据过滤掉了。
完整代码:
importorg.apache.flink.api.common.serialization.SimpleStringSchema
importorg.apache.flink.configuration.Configuration
importorg.apache.flink.streaming.api.TimeCharacteristic
importorg.apache.flink.streaming.api.functions.ProcessFunction
importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
importorg.apache.flink.streaming.connectors.redis.RedisSink
importorg.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
importorg.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}importorg.apache.flink.util.Collector
importorg.apache.kafka.clients.consumer.ConsumerConfig
importjava.sql.{Connection, DriverManager, PreparedStatement}importjava.text.SimpleDateFormat
importjava.util.Properties
caseclass order(id:String, consignee:String, consignee_tel:String, feight_fee:Double, amount:Double, status:String, create_time:Long, operate_time:Long)object test1 {// 封装数据用到的样例类def main(args: Array[String]):Unit={// 创建流数据环境val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置时间语义 设置为时间时间,根据数据的时间戳来判断
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 创建 Kafka 消费者配置对象(这个需要导入的包是 java.util 里面的包)val properties =new Properties()// 配置 Kafka 服务端地址
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092")// 指定消费组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"order_consumer")// 创建消费者val consumer =new FlinkKafkaConsumer[String]("order",new SimpleStringSchema(), properties)// 连接 Kafka 数据源,获取数据val dataStream = env.addSource(consumer)// 得到数据后,我们就可以开始处理元数据了val data = dataStream
// 对每条数据使用逗号 "," 进行切分,去掉前后双引号.map(
data =>{val arr = data.split(",")// 遍历数组,去点每个字段的前后双引号for(i <-0 until arr.length){
arr(i)= arr(i).replace("\"","")// 因为有些字段是空的,所以我们判断如果是空的我们就把他赋值为 nullif(arr(i)==""){
arr(i)="null"}}// 数据格式处理完后,开始对可用数据进行封装// 需要封装的字段为:id,consignee,consignee_tel,final_total_amount,order_status,create_time,operate_time// 使用 SimpleDateFormat 将时间字符串转换为 时间戳
order(arr(0), arr(1), arr(2), arr(arr.length -1).toDouble, arr(3).toDouble, arr(4),new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(10)).getTime,new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(11)).getTime)})// 将 时间戳设置为事件时间.assignAscendingTimestamps(t =>{if(t.create_time > t.operate_time){
t.create_time
}else{
t.operate_time
}})// data.print("清洗完的数据")val conf =new FlinkJedisPoolConfig.Builder().setHost("master").setPort(6379).build()val SinkToRedis =new RedisSink[(Int,Double)](conf,new MyRedis("totalprice"))val sideToRedis =new OutputTag[order]("Status1006")val sideToMySQL =new OutputTag[order]("Status1003")val endData = data
.process(new ProcessFunction[order,order]{overridedef processElement(i: order, context: ProcessFunction[order, order]#Context, collector: Collector[order]):Unit={if(i.status =="1006"){
context.output(sideToRedis,i)}elseif(i.status =="1003"){
context.output(sideToMySQL,i)}else{
collector.collect(i)}}})val end = endData
.filter(_.status =="1005").map(
data =>{(1,data.amount)}).keyBy(0).sum(1)// endData.print("正常数据:")
end.addSink(SinkToRedis)val status_1006 = endData.getSideOutput(sideToRedis).map(
data =>{(1,data.amount)}).keyBy(0).sum(1)// 将 1006 输出到 Redis 中
status_1006.print("status_1006")val SinkToRedis2 =new RedisSink[(Int,Double)](conf,new MyRedis("totalreduceprice"))
status_1006.addSink(SinkToRedis2)// 将 1003 的取出,输出到MySQL 中val status_1003 = endData.getSideOutput(sideToMySQL)
status_1003.print("status_1003")
status_1003.addSink(new ToMySQL)
env.execute()}}// 自定义 SinkRedis 类// 调用时需要传入一个值作为 Keyclass MyRedis(Key:String)extends RedisMapper[(Int,Double)]{overridedef getCommandDescription: RedisCommandDescription ={// 创建 Redis 描述器,使用 SET 方法new RedisCommandDescription(RedisCommand.SET)}// Key 的名字使用前方传入的值为 Key 值overridedef getKeyFromData(data:(Int,Double)):String= Key
// 将数据转换成 String 类型再存储到 Redis 中overridedef getValueFromData(data:(Int,Double)):String= data._2.toString
}class ToMySQL extends RichSinkFunction[order]{var conn:Connection = _
var insetStat:PreparedStatement = _
overridedef invoke(value: order):Unit={// 补充后面的预留位置,每个数字代表的是第几个 ?。
insetStat.setString(1,value.id)
insetStat.setString(2,value.consignee)
insetStat.setString(3,value.consignee_tel)
insetStat.setDouble(4,value.amount)
insetStat.setDouble(5,value.feight_fee)
insetStat.execute()}overridedef open(parameters: Configuration):Unit={// 连接 MySQL 驱动
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/shtd_result?useSSL=false","root","root")// 配置 insert SQL 语句
insetStat = conn.prepareStatement("insert into order_info values(?,?,?,?,?)")}overridedef close():Unit={// 关闭各个节点
conn.close()
insetStat.close()}}
pom
文件中需要的依赖:
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.10.1</flink.version><scala.binary.version>2.11</scala.binary.version><kafka.version>2.2.0</kafka.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_${scala.binary.version}</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version><exclusions><exclusion><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId></exclusion></exclusions></dependency><!-- 指定mysql-connector的依赖 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency></dependencies>
版权归原作者 爱睡觉的小于 所有, 如有侵权,请联系我们删除。