0


2023_Spark_实验三十一:开发Kafka偏移量的公共方法

一、创建OffsetUtils

offsetutils代码

  1. package exams
  2. import org.apache.kafka.common.TopicPartition
  3. import org.apache.spark.streaming.kafka010.OffsetRange
  4. import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
  5. import scala.collection.mutable
  6. /**
  7. * @projectName sparkGNU2023
  8. * @package exams
  9. * @className exams.OffsetUtils
  10. * @description 将消费者组的offset信息存入mysql
  11. * @author pblh123
  12. * @date 2023/12/20 15:25
  13. * @version 1.0
  14. * @param groupId 消费者组名称
  15. * @param offsets 偏移量信息
  16. * CREATE TABLE `t_offset` (
  17. * `topic` varchar(255) NOT NULL,
  18. * `partition` int(11) NOT NULL,
  19. * `groupid` varchar(255) NOT NULL,
  20. * `offset` bigint(20) DEFAULT NULL,
  21. * PRIMARY KEY (`topic`,`partition`,`groupid`)
  22. * ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  23. */
  24. object OffsetUtils {
  25. def getOffsetMap(groupID: String, topic: String): mutable.Map[TopicPartition, Long] = {
  26. //1.获取连接
  27. val conn: Connection = DriverManager.getConnection("jdbc:mysql://192.168.137.110:3306/bigdata19?characterEncoding=UTF-8&serverTimezone=UTC", "lh", "Lh123456!")
  28. //2.编写SQL
  29. val sql: String = "select `partition`,`offset` from t_offset where groupid = ? and topic = ? "
  30. //3.获取ps
  31. val ps: PreparedStatement = conn.prepareStatement(sql)
  32. //4.设置参数并执行
  33. ps.setString(1, groupID)
  34. ps.setString(2, topic)
  35. val rs: ResultSet = ps.executeQuery()
  36. //5.获取返回值并封装成map
  37. val offsetMap: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()
  38. while (rs.next()) {
  39. val partition: Int = rs.getInt("partition")
  40. val offset: Int = rs.getInt("offset")
  41. offsetMap += new TopicPartition(topic, partition) -> offset
  42. }
  43. //6.关闭资源
  44. rs.close()
  45. ps.close()
  46. conn.close()
  47. //7.返回map
  48. offsetMap
  49. }
  50. def saveOffsets(groupId: String, offsets: Array[OffsetRange]) = {
  51. //1.加载驱动并获取连接
  52. val conn: Connection = DriverManager.getConnection("jdbc:mysql://192.168.137.110:3306/bigdata19?characterEncoding=UTF-8&serverTimezone=UTC", "lh", "Lh123456!")
  53. //2.编写SQL//jdbc
  54. val sql: String = "replace into t_offset (`topic`,`partition`,`groupid`,`offset`) values(?,?,?,?)"
  55. //3.创建预编译语句对象
  56. val ps: PreparedStatement = conn.prepareStatement(sql)
  57. //4.设置参数执行
  58. for (o <- offsets) {
  59. ps.setString(1, o.topic)
  60. ps.setInt(2, o.partition)
  61. ps.setString(3, groupId)
  62. ps.setLong(4, o.untilOffset)
  63. ps.executeUpdate()
  64. }
  65. //5.关闭资源
  66. ps.close()
  67. conn.close()
  68. }
  69. }
标签: spark kafka 大数据

本文转载自: https://blog.csdn.net/pblh123/article/details/135109147
版权归原作者 pblh123 所有, 如有侵权,请联系我们删除。

“2023_Spark_实验三十一:开发Kafka偏移量的公共方法”的评论:

还没有评论