0


Spark - 获取一定时间内的 Hdfs 全部文件并读取

一.引言

有一个需求要求定时获取距离目前时间 Interval 范围之内的文件并读取,例如现在是 7:00,interval 为 30 min,则我们需要读取 6:30 - 7:00 的全部文件并读取。这里思路是通过 FileSystem 获取文件的 modofiyTime 然后计算其与当前时间的 interval,满足则保留文件名。

二.获取 Interval 内文件

1.获取 FileSystem

    val conf = new SparkConf().setAppName("Init Spark")
    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("error")    

    // 获取 FileSystem
    val fileSystem = new Path("viewfs:/user/...").getFileSystem(sc.hadoopConfiguration)
    

这里输入自己 Hdfs 任意对应目录即可获取当前 FileSystem。

2.获取全部 File

    // 保存全部满足的 File
    val satisfiedFiles = new ArrayBuffer[String]()

    // 待遍历的地址
    val basePath = new Path(s"/$target/$year/$month/$day/$hour")

    // 获取当前时间戳
    val curTime = System.currentTimeMillis()

    // 保留 interval=30 min 内文件
    val interval = 30 
    fs.listStatus(basePath).filter(dir => {
      val updateTime = dir.getModificationTime
      val delay = getInterval(curTime, updateTime)
      delay < interval
    }).foreach(file => {
      val path = basePath + File.separator + file.getPath.getName
      satisfiedFiles.append(path)
    })

file.getPath.getName 可以获取对应 File 的单独路径,而非完整路径,所以要添加 BasePath。

getInterval 函数:

  // 获取两个时间戳 Min 级别间隔
  def getInterval(now: Long, fileTime: Long): Long = {
    val delayMin = (now - fileTime) / 1000 / 60
    delayMin
  }

3.读取 Hdfs File

    val allFile = satisfiedFiles.mkString(",")
    sc.textFile(allFile)

多个文件可以通过 ',' 分隔的形式供 Spark 读取。

Tips:

如果为了防止没有满足间隔的 File 导致 allFile 为空,可以在提交 spark 的脚本里增加忽略空文件的配置,避免任务异常:

--conf spark.files.ignoreMissingFiles=true \
标签: spark hadoop

本文转载自: https://blog.csdn.net/BIT_666/article/details/130450014
版权归原作者 BIT_666 所有, 如有侵权,请联系我们删除。

“Spark - 获取一定时间内的 Hdfs 全部文件并读取”的评论:

还没有评论