一.引言
有一个需求要求定时获取距离目前时间 Interval 范围之内的文件并读取,例如现在是 7:00,interval 为 30 min,则我们需要读取 6:30 - 7:00 的全部文件并读取。这里思路是通过 FileSystem 获取文件的 modofiyTime 然后计算其与当前时间的 interval,满足则保留文件名。
二.获取 Interval 内文件
1.获取 FileSystem
val conf = new SparkConf().setAppName("Init Spark")
val spark = SparkSession
.builder
.config(conf)
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("error")
// 获取 FileSystem
val fileSystem = new Path("viewfs:/user/...").getFileSystem(sc.hadoopConfiguration)
这里输入自己 Hdfs 任意对应目录即可获取当前 FileSystem。
2.获取全部 File
// 保存全部满足的 File
val satisfiedFiles = new ArrayBuffer[String]()
// 待遍历的地址
val basePath = new Path(s"/$target/$year/$month/$day/$hour")
// 获取当前时间戳
val curTime = System.currentTimeMillis()
// 保留 interval=30 min 内文件
val interval = 30
fs.listStatus(basePath).filter(dir => {
val updateTime = dir.getModificationTime
val delay = getInterval(curTime, updateTime)
delay < interval
}).foreach(file => {
val path = basePath + File.separator + file.getPath.getName
satisfiedFiles.append(path)
})
file.getPath.getName 可以获取对应 File 的单独路径,而非完整路径,所以要添加 BasePath。
getInterval 函数:
// 获取两个时间戳 Min 级别间隔
def getInterval(now: Long, fileTime: Long): Long = {
val delayMin = (now - fileTime) / 1000 / 60
delayMin
}
3.读取 Hdfs File
val allFile = satisfiedFiles.mkString(",")
sc.textFile(allFile)
多个文件可以通过 ',' 分隔的形式供 Spark 读取。
Tips:
如果为了防止没有满足间隔的 File 导致 allFile 为空,可以在提交 spark 的脚本里增加忽略空文件的配置,避免任务异常:
--conf spark.files.ignoreMissingFiles=true \
版权归原作者 BIT_666 所有, 如有侵权,请联系我们删除。