Skip to content

Commit

Permalink
SPARK-21056: Reorg code
Browse files Browse the repository at this point in the history
  • Loading branch information
bbossy committed Jun 18, 2017
1 parent 2150f1c commit 579c567
Showing 1 changed file with 23 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,25 +255,22 @@ object InMemoryFileIndex extends Logging {
// [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses = paths.flatMap { path =>
val filteredStatuses = paths.flatMap { path =>
try {
Some(path -> fs.get.listStatus(path))
val fStatuses = fs.get.listStatus(path)
val filtered = fStatuses.filterNot(status => shouldFilterOut(status.getPath.getName))
if (filtered.nonEmpty) {
Some(path -> filtered)
} else {
None
}
} catch {
case _: FileNotFoundException =>
logWarning(s"The directory $paths was not found. Was it deleted very recently?")
None
}
}

val filteredStatuses = statuses.flatMap { case (path, fStatuses) =>
val filtered = fStatuses.filterNot(status => shouldFilterOut(status.getPath.getName))
if (filtered.isEmpty) {
None
} else {
Some(path -> filtered)
}
}

val allLeafStatuses = {
val (dirs, topLevelFiles) = filteredStatuses.flatMap { case (path, fStatuses) =>
fStatuses.map {f => path -> f }
Expand All @@ -287,20 +284,21 @@ object InMemoryFileIndex extends Logging {
listLeafFiles(pathsToList, hadoopConf, filter, sessionOpt)
}
} else Seq.empty[(Path, Seq[FileStatus])]
val allFiles = topLevelFiles
.map { case (path, fStatus) => path -> Seq(fStatus) } ++ nestedFiles
allFiles.flatMap { case (path, fStatuses) =>
val accepted = if (filter != null) {
fStatuses.filter(f => filter.accept(f.getPath))
} else {
fStatuses
}.filterNot(status => shouldFilterOut(status.getPath.getName))
if (accepted.nonEmpty) {
Some(path -> accepted)
} else {
None
}
}
val allFiles = topLevelFiles.groupBy { case (path, _) => path }
.flatMap { case (path, pAndStatuses) =>
val fStatuses = pAndStatuses.map { case (_, f) => f }
val accepted = if (filter != null) {
fStatuses.filter(f => filter.accept(f.getPath))
} else {
fStatuses
}
if (accepted.nonEmpty) {
Some(path -> accepted)
} else {
None
}
}.toSeq
nestedFiles ++ allFiles
}

allLeafStatuses.map { case (path, fStatuses) =>
Expand Down

0 comments on commit 579c567

Please sign in to comment.