0


数据湖(十六):Structured Streaming实时写入Iceberg

文章目录

​​​​​​​Structured Streaming实时写入Iceberg

目前Spark中Structured Streaming只支持实时向Iceberg中写入数据,不支持实时从Iceberg中读取数据,下面案例我们将使用Structured Streaming从Kafka中实时读取数据,然后将结果实时写入到Iceberg中。

一、创建Kafka topic

启动Kafka集群,创建“kafka-iceberg-topic”

  1. [root@node1 bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic kafka-iceberg-topic --partitions 3 --replication-factor 3

二、编写向Kafka生产数据代码

  1. /**
  2. * 向Kafka中写入数据
  3. */
  4. object WriteDataToKafka {
  5. def main(args: Array[String]): Unit = {
  6. val props = new Properties()
  7. props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092")
  8. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  9. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  10. val producer = new KafkaProducer[String,String](props)
  11. var counter = 0
  12. var keyFlag = 0
  13. while(true){
  14. counter +=1
  15. keyFlag +=1
  16. val content: String = userlogs()
  17. producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", content))
  18. //producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", s"key-$keyFlag", content))
  19. if(0 == counter%100){
  20. counter = 0
  21. Thread.sleep(5000)
  22. }
  23. }
  24. producer.close()
  25. }
  26. def userlogs()={
  27. val userLogBuffer = new StringBuffer("")
  28. val timestamp = new Date().getTime();
  29. var userID = 0L
  30. var pageID = 0L
  31. //随机生成的用户ID
  32. userID = Random.nextInt(2000)
  33. //随机生成的页面ID
  34. pageID = Random.nextInt(2000);
  35. //随机生成Channel
  36. val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML")
  37. val channel = channelNames(Random.nextInt(10))
  38. val actionNames = Array[String]("View", "Register")
  39. //随机生成action行为
  40. val action = actionNames(Random.nextInt(2))
  41. val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
  42. userLogBuffer.append(dateToday)
  43. .append("\t")
  44. .append(timestamp)
  45. .append("\t")
  46. .append(userID)
  47. .append("\t")
  48. .append(pageID)
  49. .append("\t")
  50. .append(channel)
  51. .append("\t")
  52. .append(action)
  53. System.out.println(userLogBuffer.toString())
  54. userLogBuffer.toString()
  55. }
  56. }

三、编写Structured Streaming读取Kafka数据实时写入Iceberg

  1. object StructuredStreamingSinkIceberg {
  2. def main(args: Array[String]): Unit = {
  3. //1.准备对象
  4. val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")
  5. //指定hadoop catalog,catalog名称为hadoop_prod
  6. .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  7. .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  8. .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/structuredstreaming")
  9. .getOrCreate()
  10. // spark.sparkContext.setLogLevel("Error")
  11. //2.创建Iceberg 表
  12. spark.sql(
  13. """
  14. |create table if not exists hadoop_prod.iceberg_db.iceberg_table (
  15. | current_day string,
  16. | user_id string,
  17. | page_id string,
  18. | channel string,
  19. | action string
  20. |) using iceberg
  21. """.stripMargin)
  22. val checkpointPath = "hdfs://mycluster/iceberg_table_checkpoint"
  23. val bootstrapServers = "node1:9092,node2:9092,node3:9092"
  24. //多个topic 逗号分开
  25. val topic = "kafka-iceberg-topic"
  26. //3.读取Kafka读取数据
  27. val df = spark.readStream
  28. .format("kafka")
  29. .option("kafka.bootstrap.servers", bootstrapServers)
  30. .option("auto.offset.reset", "latest")
  31. .option("group.id", "iceberg-kafka")
  32. .option("subscribe", topic)
  33. .load()
  34. import spark.implicits._
  35. import org.apache.spark.sql.functions._
  36. val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  37. .as[(String, String)].toDF("id", "data")
  38. val transDF: DataFrame = resDF.withColumn("current_day", split(col("data"), "\t")(0))
  39. .withColumn("ts", split(col("data"), "\t")(1))
  40. .withColumn("user_id", split(col("data"), "\t")(2))
  41. .withColumn("page_id", split(col("data"), "\t")(3))
  42. .withColumn("channel", split(col("data"), "\t")(4))
  43. .withColumn("action", split(col("data"), "\t")(5))
  44. .select("current_day", "user_id", "page_id", "channel", "action")
  45. //结果打印到控制台,Default trigger (runs micro-batch as soon as it can)
  46. // val query: StreamingQuery = transDF.writeStream
  47. // .outputMode("append")
  48. // .format("console")
  49. // .start()
  50. //4.流式写入Iceberg表
  51. val query = transDF.writeStream
  52. .format("iceberg")
  53. .outputMode("append")
  54. //每分钟触发一次Trigger.ProcessingTime(1, TimeUnit.MINUTES)
  55. //每10s 触发一次 Trigger.ProcessingTime(1, TimeUnit.MINUTES)
  56. .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
  57. .option("path", "hadoop_prod.iceberg_db.iceberg_table")
  58. .option("fanout-enabled", "true")
  59. .option("checkpointLocation", checkpointPath)
  60. .start()
  61. query.awaitTermination()
  62. }
  63. }

注意:以上代码执行时由于使用的Spark版本为3.1.2,其依赖的Hadoop版本为Hadoop3.2版本,所以需要在本地Window中配置Hadoop3.1.2的环境变量以及将对应的hadoop.dll放入window "C:\Windows\System32"路径下。

Structuerd Streaming向Iceberg实时写入数据有以下几个注意点:

  • 写Iceberg表写出数据支持两种模式:append和complete,append是将每个微批数据行追加到表中。complete是替换每个微批数据内容。
  • 向Iceberg中写出数据时指定的path可以是HDFS路径,可以是Iceberg表名,如果是表名,要预先创建好Iceberg表。
  • 写出参数fanout-enabled指的是如果Iceberg写出的表是分区表,在向表中写数据之前要求Spark每个分区的数据必须排序,但这样会带来数据延迟,为了避免这个延迟,可以设置“fanout-enabled”参数为true,可以针对每个Spark分区打开一个文件,直到当前task批次数据写完,这个文件再关闭。
  • 实时向Iceberg表中写数据时,建议trigger设置至少为1分钟提交一次,因为每次提交都会产生一个新的数据文件和元数据文件,这样可以减少一些小文件。为了进一步减少数据文件,建议定期合并“data files”(参照1.9.6.9)和删除旧的快照(1.9.6.10)。

四、查看Iceberg中数据结果

启动向Kafka生产数据代码,启动向Iceberg中写入数据的Structured Streaming程序,执行以下代码来查看对应的Iceberg结果:

  1. //1.准备对象
  2. val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")
  3. //指定hadoop catalog,catalog名称为hadoop_prod
  4. .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  5. .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  6. .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/structuredstreaming")
  7. .getOrCreate()
  8. //2.读取Iceberg 表中的数据结果
  9. spark.sql(
  10. """
  11. |select * from hadoop_prod.iceberg_db.iceberg_table
  12. """.stripMargin).show()

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
标签: 大数据 数据湖

本文转载自: https://blog.csdn.net/xiaoweite1/article/details/125708008
版权归原作者 Lansonli 所有, 如有侵权,请联系我们删除。

“数据湖(十六):Structured Streaming实时写入Iceberg”的评论:

还没有评论