Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22959][PYTHON] Configuration to select the modules for daemon and worker in PySpark #20151

Closed

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jan 4, 2018

What changes were proposed in this pull request?

We are now forced to use pyspark/daemon.py and pyspark/worker.py in PySpark.

This doesn't allow a custom modification for it (well, maybe we can still do this in a super hacky way though, for example, setting Python executable that has the custom modification). Because of this, for example, it's sometimes hard to debug what happens inside Python worker processes.

This is actually related with SPARK-7721 too as somehow Coverage is unable to detect the coverage from os.fork. If we have some custom fixes to force the coverage, it works fine.

This is also related with SPARK-20368. This JIRA describes Sentry support which (roughly) needs some changes within worker side.

With this configuration advanced users will be able to do a lot of pluggable workarounds and we can meet such potential needs in the future.

As an example, let's say if I configure the module coverage_daemon and had coverage_daemon.py in the python path:

import os

from pyspark import daemon


if "COVERAGE_PROCESS_START" in os.environ:
    from pyspark.worker import main

    def _cov_wrapped(*args, **kwargs):
        import coverage
        cov = coverage.coverage(
            config_file=os.environ["COVERAGE_PROCESS_START"])
        cov.start()
        try:
            main(*args, **kwargs)
        finally:
            cov.stop()
            cov.save()
    daemon.worker_main = _cov_wrapped


if __name__ == '__main__':
    daemon.manager()

I can track the coverages in worker side too.

More importantly, we can leave the main code intact but allow some workarounds.

How was this patch tested?

Manually tested.

@HyukjinKwon
Copy link
Member Author

@holdenk, @rxin, @JoshRosen and @ueshin, as you all might already know, I am working on Python coverage. Based on the top of this PR, I think we can leave the main codes intact while we properly track the coverage within worker processes. I believe this also partly covers SPARK-20368 too.

What do you guys think about this configuration?

@SparkQA
Copy link

SparkQA commented Jan 4, 2018

Test build #85677 has finished for PR 20151 at commit f74df4b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally LGTM

val useDaemon = {
val useDaemonEnabled = SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)

// This flag is ignored on Windows as it's unable to fork.
!System.getProperty("os.name").startsWith("Windows") && useDaemonEnabled
}

// This configuration indicates the module to run the daemon to execute its Python workers.
val daemonModule = SparkEnv.get.conf.get("spark.python.daemon.module", "pyspark.daemon")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally, I thought we use the name "command" as what we call the thing to execute

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yup that's true in general. But please let me stick to "module" here as that's what we execute (python -m) describes:

python --help
...
-m mod : run library module as a script (terminates option list)
...

@HyukjinKwon
Copy link
Member Author

Hey @rxin, I think I need your sign-off too as it's related with SPARK-7721.

@ueshin
Copy link
Member

ueshin commented Jan 8, 2018

The changes LGTM.
Btw, what if we miss the module in python path? Can we see that the error is caused by the missing module from the exception message?

@HyukjinKwon
Copy link
Member Author

I manually tested after setting spark.python.daemon.module to nonexistantmodule. It shows the error message like this:

>>> spark.range(1).rdd.map(lambda x: x).collect()
...
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/rdd.py", line 824, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): org.apache.spark.SparkException:
Error from python worker:
  /usr/bin/python: No module named nonexistantmodule
PYTHONPATH was:
  /.../spark/python/lib/pyspark.zip:/.../spark/python/lib/py4j-0.10.6-src.zip:/.../spark/assembly/target/scala-2.11/jars/spark-core_2.11-2.3.0-SNAPSHOT.jar:/.../spark/python/lib/py4j-0.10.6-src.zip:/.../spark/python/:
java.io.EOFException
    ...

Driver stacktrace:
    ...
Caused by: org.apache.spark.SparkException:
Error from python worker:
  /usr/bin/python: No module named nonexistantmodule
PYTHONPATH was:
  /.../spark/python/lib/pyspark.zip:/.../spark/python/lib/py4j-0.10.6-src.zip:/.../spark/assembly/target/scala-2.11/jars/spark-core_2.11-2.3.0-SNAPSHOT.jar:/.../spark/python/lib/py4j-0.10.6-src.zip:/.../spark/python/:
java.io.EOFException
    ...
    ... 1 more

18/01/08 15:54:06 WARN TaskSetManager: Lost task 6.0 in stage 0.0 (TID 6, localhost, executor driver): TaskKilled (Stage cancelled)
...

@ueshin
Copy link
Member

ueshin commented Jan 8, 2018

Looks good. Let's wait for @rxin's response.

@HyukjinKwon
Copy link
Member Author

Yup, thanks for all review @felixcheung and @ueshin BTW

@HyukjinKwon
Copy link
Member Author

@rxin or @JoshRosen could you guys take a quick look and see if it makes sense?

@holdenk
Copy link
Contributor

holdenk commented Jan 9, 2018

So I think this could be the basis for solving a lot of related problems and I like the minimally invasive approach to it. I think the error message for setting it to a bad module rather than a nonexistent module is probably going to be very confusing. I think it would be good to make it clear that this is advanced setting we don't expect most users to modify directly.

@felixcheung
Copy link
Member

+1... this is "undocumented" conf, sooo it's an expert one :)

@HyukjinKwon
Copy link
Member Author

Yup, will write up some more warnings that says like it's expert only configuration, experimental and rather an internal configuration. Also, I will note that we should be super careful. Will update tonight (KST) :).

logInfo(
s"Python daemon module in PySpark is set to [$value] in 'spark.python.daemon.module', " +
"using this to start the daemon up. Note that this configuration only has an effect when " +
"'spark.python.use.daemon' is enabled and the platform is not Windows.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checked it shows the log only when the configuration is explicitly set:

18/01/10 21:23:24 INFO PythonWorkerFactory: Python daemon module in PySpark is set to [pyspark.daemon] in
 'spark.python.daemon.module', using this to start the daemon up. Note that this configuration only has an 
effect when 'spark.python.use.daemon' is enabled and the platform is not Windows.

@SparkQA
Copy link

SparkQA commented Jan 10, 2018

Test build #85917 has finished for PR 20151 at commit ea5b987.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 10, 2018

Test build #85918 has finished for PR 20151 at commit fc65803.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// as expert-only option, and shouldn't be used before knowing what it means exactly.

// This configuration indicates the module to run the daemon to execute its Python workers.
val daemonModule = SparkEnv.get.conf.getOption("spark.python.daemon.module").map { value =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to restrict the module's package to only allow something like pyspark.*?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, actually we could check like .. if it's empty string too. I wrote "shouldn't be used before knowing what it means exactly." above. So, I think it's fine.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I think there have been many times that this would have been incredibly useful for me, thanks!

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jan 11, 2018

Will merge this one in few days if there isn't any objection. I believe this doesn't affect the existing code path anyway ..

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 13, 2018

Test build #86075 has finished for PR 20151 at commit fc65803.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Merged to master.

@asfgit asfgit closed this in afae8f2 Jan 14, 2018
asfgit pushed a commit that referenced this pull request Jan 22, 2018
## What changes were proposed in this pull request?

Note that this PR was made based on the top of #20151. So, it almost leaves the main codes intact.

This PR proposes to add a script for the preparation of automatic PySpark coverage generation. Now, it's difficult to check the actual coverage in case of PySpark. With this script, it allows to run tests by the way we did via `run-tests` script before. The usage is exactly the same with `run-tests` script as this basically wraps it.

This script and PR alone should also be useful. I was asked about how to run this before, and seems some reviewers (including me) need this. It would be also useful to run it manually.

It usually requires a small diff in normal Python projects but PySpark cases are a bit different because apparently we are unable to track the coverage after it's forked. So, here, I made a custom worker that forces the coverage, based on the top of #20151.

I made a simple demo. Please take a look - https://spark-test.github.io/pyspark-coverage-site.

To show up the structure, this PR adds the files as below:

```
python
├── .coveragerc  # Runtime configuration when we run the script.
├── run-tests-with-coverage  # The script that has coverage support and wraps run-tests script.
└── test_coverage  # Directories that have files required when running coverage.
    ├── conf
    │   └── spark-defaults.conf  # Having the configuration 'spark.python.daemon.module'.
    ├── coverage_daemon.py  # A daemon having custom fix and wrapping our daemon.py
    └── sitecustomize.py  # Initiate coverage with COVERAGE_PROCESS_START
```

Note that this PR has a minor nit:

[This scope](https://github.com/apache/spark/blob/04e44b37cc04f62fbf9e08c7076349e0a4d12ea8/python/pyspark/daemon.py#L148-L169) in `daemon.py` is not in the coverage results as basically I am producing the coverage results in `worker.py` separately and then merging it. I believe it's not a big deal.

In a followup, I might have a site that has a single up-to-date PySpark coverage from the master branch as the fallback / default, or have a site that has multiple PySpark coverages and the site link will be left to each pull request.

## How was this patch tested?

Manually tested. Usage is the same with the existing Python test script - `./python/run-tests`. For example,

```
sh run-tests-with-coverage --python-executables=python3 --modules=pyspark-sql
```

Running this will generate HTMLs under `./python/test_coverage/htmlcov`.

Console output example:

```
sh run-tests-with-coverage --python-executables=python3,python --modules=pyspark-core
Running PySpark tests. Output is in /.../spark/python/unit-tests.log
Will test against the following Python executables: ['python3', 'python']
Will test the following Python modules: ['pyspark-core']
Starting test(python): pyspark.tests
Starting test(python3): pyspark.tests
...
Tests passed in 231 seconds
Combining collected coverage data under /.../spark/python/test_coverage/coverage_data
Reporting the coverage data at /...spark/python/test_coverage/coverage_data/coverage
Name                         Stmts   Miss Branch BrPart  Cover
--------------------------------------------------------------
pyspark/__init__.py             41      0      8      2    96%
...
pyspark/profiler.py             74     11     22      5    83%
pyspark/rdd.py                 871     40    303     32    93%
pyspark/rddsampler.py           68     10     32      2    82%
...
--------------------------------------------------------------
TOTAL                         8521   3077   2748    191    59%
Generating HTML files for PySpark coverage under /.../spark/python/test_coverage/htmlcov
```

Author: hyukjinkwon <[email protected]>

Closes #20204 from HyukjinKwon/python-coverage.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants