-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13587] [PYSPARK] Support virtualenv in pyspark #13599
Conversation
Test build #60293 has finished for PR 13599 at commit
|
Test build #60294 has finished for PR 13599 at commit
|
|
||
private[spark] class PythonWorkerFactory(pythonExec: String, | ||
envVars: Map[String, String], | ||
conf: SparkConf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meybe as below..
private[spark] class PythonWorkerFactory(
pythonExec: String,
envVars: Map[String, String],
conf: SparkConf)
@davies @JoshRosen Do you have time to review it ? @stibbons has another PR #14180 based on this to support wheelhouse, but I just afraid putting these together would be too big to review. So it would be better to finish this PR and then work on wheelhouse support. |
6334b0b
to
31cb42c
Compare
Test build #64616 has finished for PR 13599 at commit
|
Test build #64617 has finished for PR 13599 at commit
|
@zjffdu any news? |
ping @davies @JoshRosen, do you have time to review this approach of virtualenv support for pyspark ? Very appreciated. |
One point that I'm reminded of looking at this is the lack of tests for important functionality is probably not an acceptable solution. I understand testing this in Jenkins could be complicated but a test suite that requires a Yarn resource and is marked as skip for Jenkins usage would still be better than nothing. |
Test build #91437 has finished for PR 13599 at commit
|
retest this please |
Test build #91438 has finished for PR 13599 at commit
|
Hi, thanks for all the work on this! I see requirements.txt mentioned here and there and, browsing this and other JIRAs, it seems to be the proposed way to specify dependencies in PySpark. As you probably know, the community has rallied around Pipfiles as a replacement for requirements.txt. This has a few upsides (including a lock file), the main one being that the reference implementation (Pipenv) allows for installing packages into a new virtualenv directly, without having to activate it or run other commands. So that combines dependency management, reproducibility, and environment isolation. (Also, if one doesn't want said packages to be installed in a venv, there's an argument to install them system-wide.) I'm not proposing this PR gets extended to support Pipfiles, I just wanted to ask if this has been considered and is on the roadmap, since it seems to be the successor to requirements.txt. (We stumbled upon this as we were thinking of moving to Kubernetes and didn't know how dependencies were handled there [they aren't, yet, see #21092]. We could install dependencies in our target Docker images using Pipfiles, but submitting a Pipfile with our individual jobs would be a much cleaner solution.) Thanks! |
Thanks for the interest on this PR and the info about |
@holdenk and @zjffdu, I believe manual tests are a-okay if it's difficult to write a test. We can manually test and expose this as an experimental feature too. BTW, I believe we can still have some tests to check if, for example, at least the string is properly constructed - https://github.com/apache/spark/pull/13599/files#r175670974? I think that could be enough for now. Somehow I happened to look into this multiple times over few years and I think it's better to go ahead than just blocking here. |
@JoshRosen, I roughly heard that you took a look about this before. Do you have a concern to address maybe? |
@@ -218,6 +218,115 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to | |||
For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries | |||
to executors. | |||
|
|||
# VirtualEnv for PySpark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zjffdu, mind if I ask to describe this is an experimental feature and it's very likely to be unstable and it's still evolving?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @HyukjinKwon , doc is updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will take a closer look within few days.
docs/submitting-applications.md
Outdated
Here are several ways to install packages | ||
|
||
{% highlight python %} | ||
sc.install_packages("numpy") # install the latest numpy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems there are tabs here. Shall we replace them to spaces?
in 2 scenarios: | ||
* Batch mode (submit spark app via spark-submit) | ||
* Interactive mode (PySpark shell or other third party Spark Notebook) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, maybe we can leave a note at the end instead of adding it in the title.
Note that this is an experimental feature added from Spark 2.4.0 and may evolve in the future version.
python/pyspark/context.py
Outdated
Install python packages on all executors and driver through pip. pip will be installed | ||
by default no matter using native virtualenv or conda. So it is guaranteed that pip is | ||
available if virtualenv is enabled. | ||
:param packages: string for single package or a list of string for multiple packages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add:
.. versionadded:: 2.3.0
.. note:: Experimental
python/pyspark/context.py
Outdated
else: | ||
self._conf.set("spark.pyspark.virtualenv.packages", ":".join(packages)) | ||
|
||
import functools |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this up within this function?
Test build #91519 has finished for PR 13599 at commit
|
Test build #91892 has finished for PR 13599 at commit
|
Test build #91891 has finished for PR 13599 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for this!!!
I would like to see appropriate changes to account for the 2 cluster-managers that are in need of such a solution. For Kubernetes, there need to be come considerations made towards options for baking in the requirements.txt
file into the Docker image itself and maybe setting a conf value instead of user.home
. I also think that if touching the SchedulerBackend could be avoided, as the change doesn't necessarily carry-over to Kubernetes, that would be best, but I don't see a work-around atm. Furthermore, this work is missing unit and integration tests that I think are important to show completeness and correctness. @holdenk to comment. I also included some NITs.
If it is alright, I might add some commits to this PR to allow for Kubernetes support (including the appropriate integration tests)
require(virtualEnvType == "native" || virtualEnvType == "conda", | ||
s"VirtualEnvType: $virtualEnvType is not supported." ) | ||
require(new File(virtualEnvBinPath).exists(), | ||
s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition how are we handling the case for an existing s"$virtualEnvBinPath/$virtualEnvName"
?
// 2. created outside yarn container. Spark need to create temp directory and clean it after app | ||
// finish. | ||
// - driver of PySpark shell | ||
// - driver of yarn-client mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of Kubernetes:
This will be created in the base spark-py Docker image, which is shared between the driver and executors and the containers will be cleaned up upon termination of the job via owner-labels (for the executor) and the k8s API-Server (for the driver).
As such, (hopefully with client-mode support being completed soon), the below logic should hold as well.
Is this work going to be cluster-manage agnostic? Or is this supposed to only support Yarn? I would like to see this be applicable to all first-class cluster-management systems.
I can help with appending to this PR: k8s Support and the appropriate integration tests.
// Use the absolute path of requirement file in the following cases | ||
// 1. driver of pyspark shell | ||
// 2. driver of yarn-client mode | ||
// otherwise just use filename as it would be downloaded to the working directory of Executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Kubernetes world, I might want to use a requirements.txt
file that is stored locally in the base docker image, regardless of client or cluster mode. Is that something that you think should be supported? Maybe a config variable spark.pyspark.virtualenv.kubernetes.localrequirements
that points to a file stored as local:///var/files/requirements.txt
for example.
Furthermore, when we introduce a Resource Staging Server that allows us to stage files locally, this setting will be inter-changable between something that is locally baked in vs. staged.
// 2. requirement file is not specified. (Interactive mode). | ||
// In this case `spark.pyspark.virtualenv.python_version` must be specified. | ||
|
||
if (pysparkRequirements.isDefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please re-write as a .map(...).getOrElse(..)
as if(.isDefined
) not idiomatic Scala
"install", "-r", pysparkRequirements.get)) | ||
} | ||
// install additional packages | ||
if (initPythonPackages.isDefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.forEach
not isDefined
// requirement file for native is not mandatory, run this only when requirement file | ||
// is specified. | ||
execCommand(List(virtualPythonExec, "-m", "pip", | ||
"--cache-dir", System.getProperty("user.home"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this logic carry across cluster managers? Has this been considered for use-cases for Mesos? In Kubernetes, this should be fine but we also should have some documentation about this somewhere and test this with an integration test,
* Create virtualenv using native virtualenv or conda | ||
* | ||
*/ | ||
def setupVirtualEnv(): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be better to pass args into this function so that is could be properly unit-tested. It seems that there are no unit-tests for this class, so that seems to be a necessary addition.
.orElse(sparkConf.get(PYSPARK_PYTHON)) | ||
.orElse(sys.env.get("PYSPARK_DRIVER_PYTHON")) | ||
.orElse(sys.env.get("PYSPARK_PYTHON")) | ||
.getOrElse("python") | ||
|
||
if (sparkConf.getBoolean("spark.pyspark.virtualenv.enabled", false)) { | ||
val virtualEnvFactory = new VirtualEnvFactory(pythonExec, sparkConf, true) | ||
pythonExec = virtualEnvFactory.setupVirtualEnv() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
if (args.isPython) { | ||
if (clusterManager != YARN && | ||
args.sparkProperties.getOrElse("spark.pyspark.virtualenv.enabled", "false") == "true") { | ||
printErrorAndExit("virtualenv is only supported in yarn mode") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for Kubernetes
""" | ||
Install python packages on all executors and driver through pip. pip will be installed | ||
by default no matter using native virtualenv or conda. So it is guaranteed that pip is | ||
available if virtualenv is enabled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will only be the case if in Kubernetes you specify the spark-py
image. So this will be need to be expanded per cluster-manager.
Is there any work being done on this PR at this point in time? |
Hi. I think until we get a core developer really interested in packaging python dependencies correctly, this PR won’t evolve a lot |
I'm interested in us fixing this, especially after yesterday when I spent several hours working with workaround hacks. But I want us to do something not YARN specific and not involve a large slow down on worker creation. |
@holdenk What we're doing in some of our products currently is that we require that users create their Python environments up front and that they be stored on a file system that is accessible to all physical nodes. This is partly for performance and partly because our compute nodes don't have external network connectivity i.e. we can't resolve dependencies from our workers. Then when we spin up containers we volume mount the appropriate file system into our containers and have logic in our entry point scripts that activates the relevant environment prior to starting Spark, Dask Distributed or whatever Python job we're actually launching. We're doing this with Spark standalone clusters currently but I expect much the same approach would work for Kubernetes and other resource managers. |
if (isLauncher || | ||
(isDriver && conf.get("spark.submit.deployMode") == "client")) { | ||
val virtualenvBasedir = Files.createTempDir() | ||
virtualenvBasedir.deleteOnExit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the temporary directory is not being deleted on exit.
Since there hasn't been any updates for ~ 6 months, and it seems people haven't really agreed about the scope of the feature, I'm closing the PR. If can be reopened by just updating the branch. |
Hi, |
It's supported via a different approach now. See https://databricks.com/blog/2020/12/22/how-to-manage-python-dependencies-in-pyspark.html |
Yes, but this is a less dynamic approach since the user is expected to pre-pack all packages(~800MB file) instead of supply 10 line text file |
Sure, but no approach is going to be perfect. If you want dynamic package resolution your best option would be to run in containers and build that capability into your containers entry points somehow. Even then this has drawbacks in that you have every driver/executor needing to download and install packages at startup which can create big increases in startup time and/or lead to application failure if the driver/executors take different amounts of time to start up leading to connection timeouts. The dynamic approach also fails for air-gapped environments (very common in my $dayjob). With the "official" Spark approach you only have to pay that cost once, and can do so somewhere with the necessary network connectivity to download all the packages you need. Once your environment is packaged that cost is paid and you can re-use it as many times as you need. Your only real gotcha is that the OS environment where you build the environment needs to match the OS environment where you want to run it or you can find you has OS dependent packages that won't work. |
What changes were proposed in this pull request?
Support virtualenv in pyspark as described in SPARK-13587
https://docs.google.com/document/d/1KB9RYW8_bSeOzwVqZFc_zy_vXqqqctwrU5TROP_16Ds/edit?usp=sharing
How was this patch tested?
Manually verified on centos and mac os, but not on windows yet.
Here's scenarios I have verified.
Run conda on yarn-client mode
Run virtualenv on yarn-client mode