Skip to content

Commit

Permalink
[SPARK-21384][YARN] Spark + YARN fails with LocalFileSystem as defaul…
Browse files Browse the repository at this point in the history
…t FS

## What changes were proposed in this pull request?

When the libraries temp directory(i.e. __spark_libs__*.zip dir) file system and staging dir(destination) file systems are the same then the __spark_libs__*.zip is not copying to the staging directory. But after making this decision the libraries zip file is getting deleted immediately and becoming unavailable for the Node Manager's localization.

With this change, client copies the files to remote always when the source scheme is "file".

## How was this patch tested?

I have verified it manually in yarn/cluster and yarn/client modes with hdfs and local file systems.

Author: Devaraj K <[email protected]>

Closes #19141 from devaraj-kavali/SPARK-21384.

(cherry picked from commit 55d5fa7)
Signed-off-by: Marcelo Vanzin <[email protected]>
  • Loading branch information
Devaraj K authored and Marcelo Vanzin committed Sep 20, 2017
1 parent 5d10586 commit 401ac20
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,9 @@ private[spark] class Client(

/**
* Copy the given file to a remote file system (e.g. HDFS) if needed.
* The file is only copied if the source and destination file systems are different. This is used
* for preparing resources for launching the ApplicationMaster container. Exposed for testing.
* The file is only copied if the source and destination file systems are different or the source
* scheme is "file". This is used for preparing resources for launching the ApplicationMaster
* container. Exposed for testing.
*/
private[yarn] def copyFileToRemote(
destDir: Path,
Expand All @@ -350,7 +351,7 @@ private[spark] class Client(
val destFs = destDir.getFileSystem(hadoopConf)
val srcFs = srcPath.getFileSystem(hadoopConf)
var destPath = srcPath
if (force || !compareFs(srcFs, destFs)) {
if (force || !compareFs(srcFs, destFs) || "file".equals(srcFs.getScheme)) {
destPath = new Path(destDir, destName.getOrElse(srcPath.getName()))
logInfo(s"Uploading resource $srcPath -> $destPath")
FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
Expand Down

0 comments on commit 401ac20

Please sign in to comment.