0


Flink 获取 Kafka 中的数据,分流存储到 Redis、MySQL 中

文章目录

案例:实时处理电商订单信息

使用

Flink

消费

Kafka

中的数据,并进行相应的数据统计计算。
数据格式为:

"3443","严致","13207871570","1449.00","1001","2790","第4大街第5号楼4单元464门","描述345855","214537477223728","小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机等1件商品","2020/4/25 18:47:14","2020/4/26 18:59:01","2020/4/25 19:02:14","","","http://img.gmall.com/117814.jpg","20","0.00","1442.00","7.00"
"3444","慕容亨","13028730359","17805.00","1004","2015","第9大街第26号楼3单元383门","描述948496","226551358533723","Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待等2件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/353392.jpg","11","0.00","17800.00","5.00"
"3445","姚兰凤","13080315675","16180.00","1003","8263","第5大街第1号楼7单元722门","描述148518","754426449478474","联想(Lenovo)拯救者Y7000 英特尔酷睿i7 2019新款 15.6英寸发烧游戏本笔记本电脑(i7-9750H 8GB 512GB SSD GTX1650 4G 高色域等3件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/478856.jpg","26","3935.00","20097.00","18.00"
"3446","柏锦黛","13487267342","4922.00","1002","7031","第17大街第40号楼2单元564门","描述779464","262955273144195","十月稻田 沁州黄小米 (黄小米 五谷杂粮 山西特产 真空装 大米伴侣 粥米搭档) 2.5kg等4件商品","2020/4/25 18:47:14","2020/4/26 19:11:37","2020/4/25 19:02:14","","","http://img.gmall.com/144444.jpg","30","0.00","4903.00","19.00"

字段描述为:

其中 order_status 订单状态的描述为:1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。

提示:pom 文件依赖,放在在文章最后有需要自取。

需求一:统计商城实时订单实收金额

注意:(需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加)
代码实现:

importorg.apache.flink.api.common.serialization.SimpleStringSchema
importorg.apache.flink.streaming.api.TimeCharacteristic
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
importorg.apache.kafka.clients.consumer.ConsumerConfig

importjava.text.SimpleDateFormat
importjava.util.Properties

object test1 {// 封装数据用到的样例类caseclass order(id:String, consignee:String, consignee_tel:String, feight_fee:Double, amount:Double, status:String, create_time:Long, operate_time:Long)def main(args: Array[String]):Unit={// 创建流数据环境val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置时间语义 设置为时间时间,根据数据的时间戳来判断
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 创建 Kafka 消费者配置对象(这个需要导入的包是 java.util 里面的包)val properties =new Properties()// 配置 Kafka 服务端地址
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092")// 指定消费组
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"order_consumer")// 创建消费者val consumer =new FlinkKafkaConsumer[String]("order",new SimpleStringSchema(), properties)// 连接 Kafka 数据源,获取数据val dataStream = env.addSource(consumer)// 得到数据后,我们就可以开始处理元数据了val data = dataStream
      // 对每条数据使用逗号 "," 进行切分,去掉前后双引号.map(
        data =>{val arr = data.split(",")// 遍历数组,去点每个字段的前后双引号for(i <-0 until arr.length){
            arr(i)= arr(i).replace("\"","")// 因为有些字段是空的,所以我们判断如果是空的我们就把他赋值为 nullif(arr(i)==""){
              arr(i)="null"}}// 数据格式处理完后,开始对可用数据进行封装// 需要封装的字段为:id,consignee,consignee_tel,final_total_amount,order_status,create_time,operate_time// 使用 SimpleDateFormat 将时间字符串转换为 时间戳
          order(arr(0), arr(1), arr(2), arr(arr.length -1).toDouble, arr(3).toDouble, arr(4),new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(10)).getTime,new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(11)).getTime)})// 过滤不进入计算的数据.filter(_.status !="1003").filter(_.status !="1005").filter(_.status !="1006")// 将 时间戳设置为事件时间.assignAscendingTimestamps(t =>{if(t.create_time > t.operate_time){
          t.create_time
        }else{
          t.operate_time
        }})
    data.print("清洗完的数据")// 输出数据
    data.map(
      data =>{(1, data.amount)}).keyBy(0).sum(1).print("订单实收金额")
    env.execute()}}

测试:
这里我就不开启

Kafka 生产者

了,直接使用上一篇文章中写的 Flume 从端口获取数据,输出到多端(Kafka、HDFS)单文件 文件,去监控端口转存到

Kafka

中。

如果没有配置 Flume,可以直接使用

Kafka

开启一个生产者:

bin/kafka-console-producer.sh --topic order --broker-list master:9092

(注意需要进入到 Kafka 安装目录下)

开启 Flume 监控端口后,就可以直接用

telnet master 10050

登陆端口,进行发送数据。
发送数据:

"3443","严致","13207871570","1449.00","1001","2790","第4大街第5号楼4单元464门","描述345855","214537477223728","小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机等1件商品","2020/4/25 18:47:14","2020/4/26 18:59:01","2020/4/25 19:02:14","","","http://img.gmall.com/117814.jpg","20","0.00","1442.00","7.00"
"3444","慕容亨","13028730359","17805.00","1004","2015","第9大街第26号楼3单元383门","描述948496","226551358533723","Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待等2件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/353392.jpg","11","0.00","17800.00","5.00"
"3445","姚兰凤","13080315675","16180.00","1003","8263","第5大街第1号楼7单元722门","描述148518","754426449478474","联想(Lenovo)拯救者Y7000 英特尔酷睿i7 2019新款 15.6英寸发烧游戏本笔记本电脑(i7-9750H 8GB 512GB SSD GTX1650 4G 高色域等3件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/478856.jpg","26","3935.00","20097.00","18.00"
"3446","柏锦黛","13487267342","4922.00","1002","7031","第17大街第40号楼2单元564门","描述779464","262955273144195","十月稻田 沁州黄小米 (黄小米 五谷杂粮 山西特产 真空装 大米伴侣 粥米搭档) 2.5kg等4件商品","2020/4/25 18:47:14","2020/4/26 19:11:37","2020/4/25 19:02:14","","","http://img.gmall.com/144444.jpg","30","0.00","4903.00","19.00"
"3447","计娴瑾","13208002474","6665.00","1001","5903","第4大街第25号楼6单元338门","描述396659","689816418657611","荣耀10青春版 幻彩渐变 2400万AI自拍 全网通版4GB+64GB 渐变蓝 移动联通电信4G全面屏手机 双卡双待等3件商品","2020/4/25 18:47:14","2020/4/25 18:47:14","2020/4/25 19:02:14","","","http://img.gmall.com/793265.jpg","29","0.00","6660.00","5.00"
"3448","时友裕","13908519819","217.00","1004","5525","第19大街第27号楼9单元874门","描述286614","675628874311147","迪奥(Dior)烈艳蓝金唇膏 口红 3.5g 999号 哑光-经典正红等1件商品","2020/4/25 18:47:14","2020/4/26 18:55:02","2020/4/25 19:02:14","","","http://img.gmall.com/553516.jpg","32","55.00","252.00","20.00"
"3449","东郭妍","13289011809","164.00","1006","9321","第11大街第33号楼6单元645门","描述368435","489957278482334","北纯精制黄小米(小黄米 月子米 小米粥 粗粮杂粮 大米伴侣)2.18kg等1件商品","2020/4/25 18:47:14","2020/4/26 23:10:20","2020/4/25 19:02:14","","","http://img.gmall.com/235333.jpg","22","0.00","145.00","19.00"
"3450","汪毅","13419873912","1064.00","1001","1088","第7大街第9号楼2单元723门","描述774486","661124816482447","Dior迪奥口红唇膏送女友老婆礼物生日礼物 烈艳蓝金999+888两支装礼盒等3件商品","2020/4/25 18:47:14","2020/4/26 18:48:16","2020/4/25 19:02:14","","","http://img.gmall.com/552723.jpg","28","432.00","1488.00","8.00"

运行结果:

成功!!

需求二:将上面的最后计算的结果,存储到 Redis 中(Key 为:totalprice)

关键知识点:

RedisSink

自定义 RedisMapper 类
Redis Sink

Flink 有专门的 Sink 到 Redis 的对象(RedisSink):

public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper)

创建时,还需要设置输入数据的泛型:

new RedisSink[IN]()

需要传入的数据为 Redis 的基本配置:主机、端口…,redisSinkMapper 方法。
我们先创建一个

FlinkJedisPoolConfig

构造器:

val conf =new FlinkJedisPoolConfig.Builder()// 设置主机.setHost("master")// 设置端口.setPort(6379)// 构建.build()

创建一个 RedisSink:

val sinkToRedis = new RedisSink[(Int,Double)](conf,new MyRedis("totalprice"))

(MyRedis 见下方 自定义 RedisMapper 类)

自定义 RedisMapper 类

类源码:

publicinterfaceRedisMapper<T>extendsFunction,Serializable{/**
     * Returns descriptor which defines data type.
     *
     * @return data type descriptor
     */RedisCommandDescriptiongetCommandDescription();/**
     * Extracts key from data.
     *
     * @param data source data
     * @return key
     */StringgetKeyFromData(T data);/**
     * Extracts value from data.
     *
     * @param data source data
     * @return value
     */StringgetValueFromData(T data);}

通过源码我们可以知道,

T

表示传入数据的泛型,还需要重写三个方法:

getCommandDescription

(创建 Redis 描述器:使用什么方法写入 Redis,比如:HSET、SET…),

getKeyFromData

(传入 Redis 中键值对的 Key 值),

getValueFromData

(传入 Redis 中键值对的值)。

代码示例:

// 自定义 SinkRedis 类// 调用时需要传入一个值作为 Keyclass MyRedis(Key:String)extends RedisMapper[(Int,Double)]{overridedef getCommandDescription: RedisCommandDescription ={// 创建 Redis 描述器,使用 SET 方法new RedisCommandDescription(RedisCommand.SET)}// Key 的名字使用前方传入的值为 Key 值overridedef getKeyFromData(data:(Int,Double)):String= Key
  // 将数据转换成 String 类型再存储到 Redis 中overridedef getValueFromData(data:(Int,Double)):String= data._2.toString
}

整体代码如下:

importorg.apache.flink.api.common.serialization.SimpleStringSchema
importorg.apache.flink.streaming.api.TimeCharacteristic
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
importorg.apache.flink.streaming.connectors.redis.RedisSink
importorg.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
importorg.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}importorg.apache.kafka.clients.consumer.ConsumerConfig

importjava.text.SimpleDateFormat
importjava.util.Properties

object test1 {// 封装数据用到的样例类caseclass order(id:String, consignee:String, consignee_tel:String, feight_fee:Double, amount:Double, status:String, create_time:Long, operate_time:Long)def main(args: Array[String]):Unit={// 创建流数据环境val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置时间语义 设置为时间时间,根据数据的时间戳来判断
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 创建 Kafka 消费者配置对象(这个需要导入的包是 java.util 里面的包)val properties =new Properties()// 配置 Kafka 服务端地址
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092")// 指定消费组
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"order_consumer")// 创建消费者val consumer =new FlinkKafkaConsumer[String]("order",new SimpleStringSchema(), properties)// 连接 Kafka 数据源,获取数据val dataStream = env.addSource(consumer)// 得到数据后,我们就可以开始处理元数据了val data = dataStream
      // 对每条数据使用逗号 "," 进行切分,去掉前后双引号.map(
        data =>{val arr = data.split(",")// 遍历数组,去点每个字段的前后双引号for(i <-0 until arr.length){
            arr(i)= arr(i).replace("\"","")// 因为有些字段是空的,所以我们判断如果是空的我们就把他赋值为 nullif(arr(i)==""){
              arr(i)="null"}}// 数据格式处理完后,开始对可用数据进行封装// 需要封装的字段为:id,consignee,consignee_tel,final_total_amount,order_status,create_time,operate_time// 使用 SimpleDateFormat 将时间字符串转换为 时间戳
          order(arr(0), arr(1), arr(2), arr(arr.length -1).toDouble, arr(3).toDouble, arr(4),new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(10)).getTime,new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(11)).getTime)})// 过滤不进入计算的数据.filter(_.status !="1003").filter(_.status !="1005").filter(_.status !="1006")// 将 时间戳设置为事件时间.assignAscendingTimestamps(t =>{if(t.create_time > t.operate_time){
          t.create_time
        }else{
          t.operate_time
        }})
    data.print("清洗完的数据")// 输出数据val sinkData = data.map(
      data =>{(1, data.amount)}).keyBy(0).sum(1)

    sinkData.print("订单实收金额")// ----------------------------- 分割线------------------------------------------// 下面为 需求二 添加的代码val conf =new FlinkJedisPoolConfig.Builder().setHost("master").setPort(6379).build()val SinkToRedis =new RedisSink[(Int,Double)](conf,new MyRedis("totalprice"))

    sinkData.addSink(SinkToRedis)

    env.execute()}}// 自定义 SinkRedis 类// 调用时需要传入一个值作为 Keyclass MyRedis(Key:String)extends RedisMapper[(Int,Double)]{overridedef getCommandDescription: RedisCommandDescription ={// 创建 Redis 描述器,使用 SET 方法new RedisCommandDescription(RedisCommand.SET)}// Key 的名字使用前方传入的值为 Key 值overridedef getKeyFromData(data:(Int,Double)):String= Key
  // 将数据转换成 String 类型再存储到 Redis 中overridedef getValueFromData(data:(Int,Double)):String= data._2.toString
}

需要先启动 Redis:

redis-server &

(后台启动)
进入 Redis 交互界面:

redis-cli

输入数据:

"3443","严致","13207871570","1449.00","1001","2790","第4大街第5号楼4单元464门","描述345855","214537477223728","小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机等1件商品","2020/4/25 18:47:14","2020/4/26 18:59:01","2020/4/25 19:02:14","","","http://img.gmall.com/117814.jpg","20","0.00","1442.00","7.00"
"3444","慕容亨","13028730359","17805.00","1004","2015","第9大街第26号楼3单元383门","描述948496","226551358533723","Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待等2件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/353392.jpg","11","0.00","17800.00","5.00"
"3445","姚兰凤","13080315675","16180.00","1003","8263","第5大街第1号楼7单元722门","描述148518","754426449478474","联想(Lenovo)拯救者Y7000 英特尔酷睿i7 2019新款 15.6英寸发烧游戏本笔记本电脑(i7-9750H 8GB 512GB SSD GTX1650 4G 高色域等3件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/478856.jpg","26","3935.00","20097.00","18.00"
"3446","柏锦黛","13487267342","4922.00","1002","7031","第17大街第40号楼2单元564门","描述779464","262955273144195","十月稻田 沁州黄小米 (黄小米 五谷杂粮 山西特产 真空装 大米伴侣 粥米搭档) 2.5kg等4件商品","2020/4/25 18:47:14","2020/4/26 19:11:37","2020/4/25 19:02:14","","","http://img.gmall.com/144444.jpg","30","0.00","4903.00","19.00"

测试结果:

需求三:使用侧边流,监控发现 order_status 字段为退回完成,将退回总额存入到 Redis 中,将 order_status 字段为取消订单的存入到 MySQL 中(Sink 到 MySQL 的偷懒没有仔细写了,直接放在最后的代码里面了)。
侧输出流

需要创建一个为侧输出流的变量:

new outputTag[T](id:String)

参数说明:

  • T:输入数据类型;
  • id:String:设置创建侧边流的 id,必须为字符串类型。

然后我们就可以使用万能的

processFunction

进行分流操作:

val sideToRedis =new OutputTag[order]("Status1006")val sideToMySQL =new OutputTag[order]("Status1003")
    data
      .process(new ProcessFunction[order,order]{// i:输入数据;context:上下文,可以获取时间戳、输出数据到侧输出流;collector:抛出数据,设置类型为样例类 order;overridedef processElement(i: order, context: ProcessFunction[order, order]#Context, collector: Collector[order]):Unit={// 传入的数据状态为 1006 就放入侧输出流 sideToRedis 中。if(i.status =="1006"){
            context.output(sideToRedis,i)}elseif(i.status =="1003"){
            context.output(sideToMySQL,i)}else{// 其他的就抛出去正常输出
            collector.collect(i)}}})

注意我们这里使用了侧输出流,上面的 filter 过滤就可以去掉了,因为我们等于使用侧输出流的方式将数据过滤掉了。

完整代码:

importorg.apache.flink.api.common.serialization.SimpleStringSchema
importorg.apache.flink.configuration.Configuration
importorg.apache.flink.streaming.api.TimeCharacteristic
importorg.apache.flink.streaming.api.functions.ProcessFunction
importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
importorg.apache.flink.streaming.connectors.redis.RedisSink
importorg.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
importorg.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}importorg.apache.flink.util.Collector
importorg.apache.kafka.clients.consumer.ConsumerConfig

importjava.sql.{Connection, DriverManager, PreparedStatement}importjava.text.SimpleDateFormat
importjava.util.Properties

caseclass order(id:String, consignee:String, consignee_tel:String, feight_fee:Double, amount:Double, status:String, create_time:Long, operate_time:Long)object test1 {// 封装数据用到的样例类def main(args: Array[String]):Unit={// 创建流数据环境val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置时间语义 设置为时间时间,根据数据的时间戳来判断
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 创建 Kafka 消费者配置对象(这个需要导入的包是 java.util 里面的包)val properties =new Properties()// 配置 Kafka 服务端地址
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092")// 指定消费组
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"order_consumer")// 创建消费者val consumer =new FlinkKafkaConsumer[String]("order",new SimpleStringSchema(), properties)// 连接 Kafka 数据源,获取数据val dataStream = env.addSource(consumer)// 得到数据后,我们就可以开始处理元数据了val data = dataStream
      // 对每条数据使用逗号 "," 进行切分,去掉前后双引号.map(
        data =>{val arr = data.split(",")// 遍历数组,去点每个字段的前后双引号for(i <-0 until arr.length){
            arr(i)= arr(i).replace("\"","")// 因为有些字段是空的,所以我们判断如果是空的我们就把他赋值为 nullif(arr(i)==""){
              arr(i)="null"}}// 数据格式处理完后,开始对可用数据进行封装// 需要封装的字段为:id,consignee,consignee_tel,final_total_amount,order_status,create_time,operate_time// 使用 SimpleDateFormat 将时间字符串转换为 时间戳
          order(arr(0), arr(1), arr(2), arr(arr.length -1).toDouble, arr(3).toDouble, arr(4),new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(10)).getTime,new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(11)).getTime)})// 将 时间戳设置为事件时间.assignAscendingTimestamps(t =>{if(t.create_time > t.operate_time){
          t.create_time
        }else{
          t.operate_time
        }})//    data.print("清洗完的数据")val conf =new FlinkJedisPoolConfig.Builder().setHost("master").setPort(6379).build()val SinkToRedis =new RedisSink[(Int,Double)](conf,new MyRedis("totalprice"))val sideToRedis =new OutputTag[order]("Status1006")val sideToMySQL =new OutputTag[order]("Status1003")val endData = data
      .process(new ProcessFunction[order,order]{overridedef processElement(i: order, context: ProcessFunction[order, order]#Context, collector: Collector[order]):Unit={if(i.status =="1006"){
            context.output(sideToRedis,i)}elseif(i.status =="1003"){
            context.output(sideToMySQL,i)}else{
            collector.collect(i)}}})val end = endData
      .filter(_.status =="1005").map(
        data =>{(1,data.amount)}).keyBy(0).sum(1)//    endData.print("正常数据:")
    end.addSink(SinkToRedis)val status_1006 = endData.getSideOutput(sideToRedis).map(
      data =>{(1,data.amount)}).keyBy(0).sum(1)// 将 1006 输出到 Redis 中
    status_1006.print("status_1006")val SinkToRedis2 =new RedisSink[(Int,Double)](conf,new MyRedis("totalreduceprice"))
    status_1006.addSink(SinkToRedis2)// 将 1003 的取出,输出到MySQL 中val status_1003 = endData.getSideOutput(sideToMySQL)
    status_1003.print("status_1003")
    status_1003.addSink(new ToMySQL)

    env.execute()}}// 自定义 SinkRedis 类// 调用时需要传入一个值作为 Keyclass MyRedis(Key:String)extends RedisMapper[(Int,Double)]{overridedef getCommandDescription: RedisCommandDescription ={// 创建 Redis 描述器,使用 SET 方法new RedisCommandDescription(RedisCommand.SET)}// Key 的名字使用前方传入的值为 Key 值overridedef getKeyFromData(data:(Int,Double)):String= Key
  // 将数据转换成 String 类型再存储到 Redis 中overridedef getValueFromData(data:(Int,Double)):String= data._2.toString
}class ToMySQL extends RichSinkFunction[order]{var conn:Connection = _
  var insetStat:PreparedStatement = _
  overridedef invoke(value: order):Unit={// 补充后面的预留位置,每个数字代表的是第几个 ?。
    insetStat.setString(1,value.id)
    insetStat.setString(2,value.consignee)
    insetStat.setString(3,value.consignee_tel)
    insetStat.setDouble(4,value.amount)
    insetStat.setDouble(5,value.feight_fee)
    insetStat.execute()}overridedef open(parameters: Configuration):Unit={// 连接 MySQL 驱动
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/shtd_result?useSSL=false","root","root")// 配置 insert SQL 语句
    insetStat = conn.prepareStatement("insert into order_info values(?,?,?,?,?)")}overridedef close():Unit={// 关闭各个节点
    conn.close()
    insetStat.close()}}
pom

文件中需要的依赖:

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.10.1</flink.version><scala.binary.version>2.11</scala.binary.version><kafka.version>2.2.0</kafka.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_${scala.binary.version}</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version><exclusions><exclusion><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId></exclusion></exclusions></dependency><!-- 指定mysql-connector的依赖 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency></dependencies>
标签: kafka flink redis

本文转载自: https://blog.csdn.net/m0_58027884/article/details/126367815
版权归原作者 爱睡觉的小于 所有, 如有侵权,请联系我们删除。

“Flink 获取 Kafka 中的数据,分流存储到 Redis、MySQL 中”的评论:

还没有评论