diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 955d78f400198..fe9a87f4f6a28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -255,9 +255,15 @@ object InMemoryFileIndex extends Logging { // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. - val statuses = paths.flatMap { path => + val filteredStatuses = paths.flatMap { path => try { - Some(path -> fs.get.listStatus(path)) + val fStatuses = fs.get.listStatus(path) + val filtered = fStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)) + if (filtered.nonEmpty) { + Some(path -> filtered) + } else { + None + } } catch { case _: FileNotFoundException => logWarning(s"The directory $paths was not found. Was it deleted very recently?") @@ -265,15 +271,6 @@ object InMemoryFileIndex extends Logging { } } - val filteredStatuses = statuses.flatMap { case (path, fStatuses) => - val filtered = fStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)) - if (filtered.isEmpty) { - None - } else { - Some(path -> filtered) - } - } - val allLeafStatuses = { val (dirs, topLevelFiles) = filteredStatuses.flatMap { case (path, fStatuses) => fStatuses.map {f => path -> f } @@ -287,20 +284,21 @@ object InMemoryFileIndex extends Logging { listLeafFiles(pathsToList, hadoopConf, filter, sessionOpt) } } else Seq.empty[(Path, Seq[FileStatus])] - val allFiles = topLevelFiles - .map { case (path, fStatus) => path -> Seq(fStatus) } ++ nestedFiles - allFiles.flatMap { case (path, fStatuses) => - val accepted = if (filter != null) { - fStatuses.filter(f => filter.accept(f.getPath)) - } else { - fStatuses - }.filterNot(status => shouldFilterOut(status.getPath.getName)) - if (accepted.nonEmpty) { - Some(path -> accepted) - } else { - None - } - } + val allFiles = topLevelFiles.groupBy { case (path, _) => path } + .flatMap { case (path, pAndStatuses) => + val fStatuses = pAndStatuses.map { case (_, f) => f } + val accepted = if (filter != null) { + fStatuses.filter(f => filter.accept(f.getPath)) + } else { + fStatuses + } + if (accepted.nonEmpty) { + Some(path -> accepted) + } else { + None + } + }.toSeq + nestedFiles ++ allFiles } allLeafStatuses.map { case (path, fStatuses) =>