0


Flink自定义Sink将数据存到MySQL

如有更佳的保存MySQL方法 欢迎私信或留言分享 相互学习~

importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction

// 自定义Sink
dataStream.addSink(new JDBCSink())// 继承RichSinkFunctionclass JDBCSink extends RichSinkFunction[输入的数据类型]{// 定义sql连接、插入预编译器、更新预编译器var conn: Connection = _
  var insertStatement: PreparedStatement = _
  var updateStatement: PreparedStatement = _

  // 重写open函数 在此函数初始化,创建连接和预编译语句overridedef open(parameters: Configuration):Unit={// 初始化连接
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/数据库","账号","密码")// 初始化插入预编译器
    insertStatement = conn.prepareStatement("INSERT INTO 表名 VALUES (?, ?, ?(占位符));")// 初始化更新与编译器
    updateStatement = conn.prepareStatement("UPDATE 表名 SET 字段 = ? WHERE 字段 = ?;")}// 重写close函数 关闭与编译器和sql连接overridedef close():Unit={
    insertStatement.close()
    updateStatement.close()
    conn.close()}// 重写invoke函数 overridedef invoke(value: 输入的数据类型, context: SinkFunction.Context[_]):Unit={// 先执行更新操作 给跟更新预编译器的占位符赋值
    updateStatement.setInt(1, value.count.toInt)
    updateStatement.setString(2, value.url)
    updateStatement.setDouble(3, value.windowEnd)// 执行更新
    updateStatement.execute()// 判断如果更新的行数为0 则执行插入if(updateStatement.getUpdateCount ==0){// 给插入预编译器的占位符赋值
      insertStatement.setDouble(1, value.windowEnd)
      insertStatement.setString(2, value.url)
      insertStatement.setInt(3, value.count.toInt)// 执行插入
      insertStatement.execute()}}}
标签: mysql flink 数据库

本文转载自: https://blog.csdn.net/weixin_44864260/article/details/122202755
版权归原作者 我不是秃头sheep 所有, 如有侵权,请联系我们删除。

“Flink自定义Sink将数据存到MySQL”的评论:

还没有评论