Skip to content

Commit

Permalink
[SPARK-24384][PYTHON][SPARK SUBMIT] Add .py files correctly into Pyth…
Browse files Browse the repository at this point in the history
…onRunner in submit with client mode in spark-submit

## What changes were proposed in this pull request?

In client side before context initialization specifically,  .py file doesn't work in client side before context initialization when the application is a Python file. See below:

```
$ cat /home/spark/tmp.py
def testtest():
    return 1
```

This works:

```
$ cat app.py
import pyspark
pyspark.sql.SparkSession.builder.getOrCreate()
import tmp
print("************************%s" % tmp.testtest())

$ ./bin/spark-submit --master yarn --deploy-mode client --py-files /home/spark/tmp.py app.py
...
************************1
```

but this doesn't:

```
$ cat app.py
import pyspark
import tmp
pyspark.sql.SparkSession.builder.getOrCreate()
print("************************%s" % tmp.testtest())

$ ./bin/spark-submit --master yarn --deploy-mode client --py-files /home/spark/tmp.py app.py
Traceback (most recent call last):
  File "/home/spark/spark/app.py", line 2, in <module>
    import tmp
ImportError: No module named tmp
```

### How did it happen?

In client mode specifically, the paths are being added into PythonRunner as are:

https://github.com/apache/spark/blob/628c7b517969c4a7ccb26ea67ab3dd61266073ca/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L430

https://github.com/apache/spark/blob/628c7b517969c4a7ccb26ea67ab3dd61266073ca/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala#L49-L88

The problem here is, .py file shouldn't be added as are since `PYTHONPATH` expects a directory or an archive like zip or egg.

### How does this PR fix?

We shouldn't simply just add its parent directory because other files in the parent directory could also be added into the `PYTHONPATH` in client mode before context initialization.

Therefore, we copy .py files into a temp directory for .py files and add it to `PYTHONPATH`.

## How was this patch tested?

Unit tests are added and manually tested in both standalond and yarn client modes with submit.

Author: hyukjinkwon <[email protected]>

Closes #21426 from HyukjinKwon/SPARK-24384.
  • Loading branch information
HyukjinKwon authored and Marcelo Vanzin committed May 30, 2018
1 parent 1e46f92 commit b142157
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
29 changes: 28 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy

import java.io.File
import java.net.{InetAddress, URI}
import java.nio.file.Files

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -48,7 +49,7 @@ object PythonRunner {

// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
val formattedPyFiles = formatPaths(pyFiles)
val formattedPyFiles = resolvePyFiles(formatPaths(pyFiles))

// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
Expand Down Expand Up @@ -153,4 +154,30 @@ object PythonRunner {
.map { p => formatPath(p, testWindows) }
}

/**
* Resolves the ".py" files. ".py" file should not be added as is because PYTHONPATH does
* not expect a file. This method creates a temporary directory and puts the ".py" files
* if exist in the given paths.
*/
private def resolvePyFiles(pyFiles: Array[String]): Array[String] = {
lazy val dest = Utils.createTempDir(namePrefix = "localPyFiles")
pyFiles.flatMap { pyFile =>
// In case of client with submit, the python paths should be set before context
// initialization because the context initialization can be done later.
// We will copy the local ".py" files because ".py" file shouldn't be added
// alone but its parent directory in PYTHONPATH. See SPARK-24384.
if (pyFile.endsWith(".py")) {
val source = new File(pyFile)
if (source.exists() && source.isFile && source.canRead) {
Files.copy(source.toPath, new File(dest, source.getName).toPath)
Some(dest.getAbsolutePath)
} else {
// Don't have to add it if it doesn't exist or isn't readable.
None
}
} else {
Some(pyFile)
}
}.distinct
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
"PYTHONPATH" -> pythonPath.mkString(File.pathSeparator)) ++ extraEnv

val moduleDir =
if (clientMode) {
// In client-mode, .py files added with --py-files are not visible in the driver.
// This is something that the launcher library would have to handle.
tempDir
} else {
val subdir = new File(tempDir, "pyModules")
subdir.mkdir()
subdir
}
val moduleDir = {
val subdir = new File(tempDir, "pyModules")
subdir.mkdir()
subdir
}
val pyModule = new File(moduleDir, "mod1.py")
Files.write(TEST_PYMODULE, pyModule, StandardCharsets.UTF_8)

Expand Down

0 comments on commit b142157

Please sign in to comment.