Skip to content

Commit

Permalink
[SPARK-22959][PYTHON] Configuration to select the modules for daemon …
Browse files Browse the repository at this point in the history
…and worker in PySpark

## What changes were proposed in this pull request?

We are now forced to use `pyspark/daemon.py` and `pyspark/worker.py` in PySpark.

This doesn't allow a custom modification for it (well, maybe we can still do this in a super hacky way though, for example, setting Python executable that has the custom modification). Because of this, for example, it's sometimes hard to debug what happens inside Python worker processes.

This is actually related with [SPARK-7721](https://issues.apache.org/jira/browse/SPARK-7721) too as somehow Coverage is unable to detect the coverage from `os.fork`. If we have some custom fixes to force the coverage, it works fine.

This is also related with [SPARK-20368](https://issues.apache.org/jira/browse/SPARK-20368). This JIRA describes Sentry support which (roughly) needs some changes within worker side.

With this configuration advanced users will be able to do a lot of pluggable workarounds and we can meet such potential needs in the future.

As an example, let's say if I configure the module `coverage_daemon` and had `coverage_daemon.py` in the python path:

```python
import os

from pyspark import daemon

if "COVERAGE_PROCESS_START" in os.environ:
    from pyspark.worker import main

    def _cov_wrapped(*args, **kwargs):
        import coverage
        cov = coverage.coverage(
            config_file=os.environ["COVERAGE_PROCESS_START"])
        cov.start()
        try:
            main(*args, **kwargs)
        finally:
            cov.stop()
            cov.save()
    daemon.worker_main = _cov_wrapped

if __name__ == '__main__':
    daemon.manager()
```

I can track the coverages in worker side too.

More importantly, we can leave the main code intact but allow some workarounds.

## How was this patch tested?

Manually tested.

Author: hyukjinkwon <[email protected]>

Closes #20151 from HyukjinKwon/configuration-daemon-worker.
  • Loading branch information
HyukjinKwon committed Jan 14, 2018
1 parent 0066d6f commit afae8f2
Showing 1 changed file with 32 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,39 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

import PythonWorkerFactory._

// Because forking processes from Java is expensive, we prefer to launch a single Python daemon
// (pyspark/daemon.py) and tell it to fork new workers for our tasks. This daemon currently
// only works on UNIX-based systems now because it uses signals for child management, so we can
// also fall back to launching workers (pyspark/worker.py) directly.
// Because forking processes from Java is expensive, we prefer to launch a single Python daemon,
// pyspark/daemon.py (by default) and tell it to fork new workers for our tasks. This daemon
// currently only works on UNIX-based systems now because it uses signals for child management,
// so we can also fall back to launching workers, pyspark/worker.py (by default) directly.
val useDaemon = {
val useDaemonEnabled = SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)

// This flag is ignored on Windows as it's unable to fork.
!System.getProperty("os.name").startsWith("Windows") && useDaemonEnabled
}

// WARN: Both configurations, 'spark.python.daemon.module' and 'spark.python.worker.module' are
// for very advanced users and they are experimental. This should be considered
// as expert-only option, and shouldn't be used before knowing what it means exactly.

// This configuration indicates the module to run the daemon to execute its Python workers.
val daemonModule = SparkEnv.get.conf.getOption("spark.python.daemon.module").map { value =>
logInfo(
s"Python daemon module in PySpark is set to [$value] in 'spark.python.daemon.module', " +
"using this to start the daemon up. Note that this configuration only has an effect when " +
"'spark.python.use.daemon' is enabled and the platform is not Windows.")
value
}.getOrElse("pyspark.daemon")

// This configuration indicates the module to run each Python worker.
val workerModule = SparkEnv.get.conf.getOption("spark.python.worker.module").map { value =>
logInfo(
s"Python worker module in PySpark is set to [$value] in 'spark.python.worker.module', " +
"using this to start the worker up. Note that this configuration only has an effect when " +
"'spark.python.use.daemon' is disabled or the platform is Windows.")
value
}.getOrElse("pyspark.worker")

var daemon: Process = null
val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
var daemonPort: Int = 0
Expand Down Expand Up @@ -74,8 +96,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}

/**
* Connect to a worker launched through pyspark/daemon.py, which forks python processes itself
* to avoid the high cost of forking from Java. This currently only works on UNIX-based systems.
* Connect to a worker launched through pyspark/daemon.py (by default), which forks python
* processes itself to avoid the high cost of forking from Java. This currently only works
* on UNIX-based systems.
*/
private def createThroughDaemon(): Socket = {

Expand Down Expand Up @@ -108,15 +131,15 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}

/**
* Launch a worker by executing worker.py directly and telling it to connect to us.
* Launch a worker by executing worker.py (by default) directly and telling it to connect to us.
*/
private def createSimpleWorker(): Socket = {
var serverSocket: ServerSocket = null
try {
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))

// Create and start the worker
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.worker"))
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", workerModule))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
Expand Down Expand Up @@ -159,7 +182,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

try {
// Create and start the daemon
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.daemon"))
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
Expand Down

0 comments on commit afae8f2

Please sign in to comment.