From c52d972e4ca09e0ede1bb9e60d3c07f80f605f88 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 23 May 2018 17:59:12 +0800 Subject: [PATCH 1/3] Prevent ListingFileCatalog from failing if file path doesn't exist --- .../datasources/InMemoryFileIndex.scala | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 739d1f456e3ec..8fc57fa2d3152 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -294,9 +294,12 @@ object InMemoryFileIndex extends Logging { if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles } - allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { + val missingFiles = mutable.ArrayBuffer.empty[String] + val filteredLeafStatuses = allLeafStatuses.filterNot( + status => shouldFilterOut(status.getPath.getName)) + val resolvedLeafStatuses = filteredLeafStatuses.flatMap { case f: LocatedFileStatus => - f + Some(f) // NOTE: // @@ -311,14 +314,27 @@ object InMemoryFileIndex extends Logging { // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). - val locations = fs.getFileBlockLocations(f, 0, f.getLen) - val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) - if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) + try { + val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, + f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { + lfs.setSymlink(f.getSymlink) + } + Some(lfs) + } catch { + case _: FileNotFoundException => + missingFiles += f.getPath.toString + None } - lfs } + + if (missingFiles.nonEmpty) { + logWarning(s"The paths [${missingFiles.mkString(", ")}] were not found. " + + "Were they deleted very recently?") + } + + resolvedLeafStatuses } /** Checks if we should filter out this path name. */ From 4d798e144fafd98b59ad12c8f1bd14791716497e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 24 May 2018 09:01:03 +0800 Subject: [PATCH 2/3] Fix error message format --- .../spark/sql/execution/datasources/InMemoryFileIndex.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 8fc57fa2d3152..171433f02e5a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -330,8 +330,8 @@ object InMemoryFileIndex extends Logging { } if (missingFiles.nonEmpty) { - logWarning(s"The paths [${missingFiles.mkString(", ")}] were not found. " + - "Were they deleted very recently?") + logWarning("the following files were missing during file " + + s"scan:\n ${missingFiles.mkString("\n ")}") } resolvedLeafStatuses From a5614f8fc1346fca321a413d107fddd70d8197c8 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 24 May 2018 09:01:50 +0800 Subject: [PATCH 3/3] Prettier --- .../spark/sql/execution/datasources/InMemoryFileIndex.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 171433f02e5a2..9d9f8bd5bb58e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -330,8 +330,8 @@ object InMemoryFileIndex extends Logging { } if (missingFiles.nonEmpty) { - logWarning("the following files were missing during file " + - s"scan:\n ${missingFiles.mkString("\n ")}") + logWarning( + s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}") } resolvedLeafStatuses