Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6764] Add wheel package support for PySpark #5408

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| coordinates should be groupId:artifactId:version.
| --repositories Comma-separated list of additional remote repositories to
| search for the maven coordinates given with --packages.
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
| on the PYTHONPATH for Python apps.
| --py-files PY_FILES Comma-separated list of .whl, .egg, .zip or .py files to
| place on the PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor.
|
Expand Down
2 changes: 1 addition & 1 deletion docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ For a complete list of options, run `spark-shell --help`. Behind the scenes,

In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called `sc`. Making your own SparkContext will not work. You can set which master the
context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files
context connects to using the `--master` argument, and you can add Python .whl, .egg, .zip or .py files
to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies
(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType)
Expand Down
12 changes: 6 additions & 6 deletions docs/submitting-applications.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ as `provided` dependencies; these need not be bundled since they are provided by
the cluster manager at runtime. Once you have an assembled jar you can call the `bin/spark-submit`
script as shown here while passing your jar.

For Python, you can use the `--py-files` argument of `spark-submit` to add `.py`, `.zip` or `.egg`
files to be distributed with your application. If you depend on multiple Python files we recommend
packaging them into a `.zip` or `.egg`.
For Python, you can use the `--py-files` argument of `spark-submit` to add `.py`, `.whl`, `.egg`
or `.zip` files to be distributed with your application. If you depend on multiple Python files we
recommend packaging them into a `.whl`, `.egg` or `.zip`.

# Launching Applications with spark-submit

Expand Down Expand Up @@ -62,7 +62,7 @@ the drivers and the executors. Note that `cluster` mode is currently not support
Mesos clusters or Python applications.

For Python applications, simply pass a `.py` file in the place of `<application-jar>` instead of a JAR,
and add Python `.zip`, `.egg` or `.py` files to the search path with `--py-files`.
and add Python `.whl`, `.egg`, `.zip` or `.py` files to the search path with `--py-files`.

There are a few options available that are specific to the
[cluster manager](#cluster-overview.html#cluster-manager-types) that is being used.
Expand Down Expand Up @@ -179,8 +179,8 @@ with `--packages`. All transitive dependencies will be handled when using this c
repositories (or resolvers in SBT) can be added in a comma-delimited fashion with the flag `--repositories`.
These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to include Spark Packages.

For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries
to executors.
For Python, the equivalent `--py-files` option can be used to distribute `.whl`, `.egg`, `.zip`
and `.py` libraries to executors.

# More Information

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public SparkLauncher addFile(String file) {
}

/**
* Adds a python file / zip / egg to be submitted with the application.
* Adds a python file / zip / whl / egg to be submitted with the application.
*
* @param file Path to the file.
* @return This launcher.
Expand Down
104 changes: 80 additions & 24 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from threading import Lock
from tempfile import NamedTemporaryFile

from pip.commands.install import InstallCommand as pip_InstallCommand
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduce a dependency on pip, which is not available by default. We should put it in a try.


from py4j.java_collections import ListConverter

from pyspark import accumulators
Expand Down Expand Up @@ -62,9 +64,9 @@ class SparkContext(object):
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
_python_includes = None # whl, egg, zip and jar files that need to be added to PYTHONPATH

PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
PACKAGE_EXTENSIONS = ('.whl', '.egg', '.zip', '.jar')

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
Expand All @@ -77,9 +79,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
(e.g. mesos://host:port, spark://host:port, local[4]).
:param appName: A name for your job, to display on the cluster web UI.
:param sparkHome: Location where Spark is installed on cluster nodes.
:param pyFiles: Collection of .zip or .py files to send to the cluster
and add to PYTHONPATH. These can be paths on the local file
system or HDFS, HTTP, HTTPS, or FTP URLs.
:param pyFiles: Collection of .py, .whl, .egg or .zip files to send
to the cluster and add to PYTHONPATH. These can be paths on
the local file system or HDFS, HTTP, HTTPS, or FTP URLs.
:param environment: A dictionary of environment variables to set on
worker nodes.
:param batchSize: The number of Python objects represented as a single
Expand Down Expand Up @@ -178,18 +180,24 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
sys.path.insert(1, root_dir)

# Deploy any code dependencies specified in the constructor
# Wheel files will be installed by pip later.
self._python_includes = list()
for path in (pyFiles or []):
self.addPyFile(path)
if pyFiles:
for path in pyFiles:
self.addFile(path)
self._include_python_packages(paths=pyFiles)
else:
pyFiles = []

# Deploy code dependencies set by spark-submit; these will already have been added
# with SparkContext.addFile, so we just need to add them to the PYTHONPATH
for path in self._conf.get("spark.submit.pyFiles", "").split(","):
if path != "":
(dirname, filename) = os.path.split(path)
if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
self._python_includes.append(filename)
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
# with SparkContext.addFile, so we just need to include them.
# Wheel files will be installed by pip later.
spark_submit_pyfiles = self._conf.get("spark.submit.pyFiles", "").split(",")
if spark_submit_pyfiles:
self._include_python_packages(paths=spark_submit_pyfiles)

# Install all wheel files at once.
self._install_wheel_files(paths=pyFiles + spark_submit_pyfiles)

# Create a temporary directory inside spark.local.dir:
local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
Expand Down Expand Up @@ -693,23 +701,71 @@ def clearFiles(self):
Clear the job's list of files added by L{addFile} or L{addPyFile} so
that they do not get downloaded to any new nodes.
"""
# TODO: remove added .py or .zip files from the PYTHONPATH?
# TODO: remove added .py, .whl, .egg or .zip files from the PYTHONPATH?
self._jsc.sc().clearFiles()

def addPyFile(self, path):
"""
Add a .py or .zip dependency for all tasks to be executed on this
SparkContext in the future. The C{path} passed can be either a local
file, a file in HDFS (or other Hadoop-supported filesystems), or an
HTTP, HTTPS or FTP URI.
Add a .py, .whl, .egg or .zip dependency for all tasks to be
executed on this SparkContext in the future. The C{path} passed can
be either a local file, a file in HDFS (or other Hadoop-supported
filesystems), or an HTTP, HTTPS or FTP URI.
"""
self.addFile(path)
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
self._include_python_packages(paths=(path,))
self._install_wheel_files(paths=(path,))

if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
self._python_includes.append(filename)
# for tests in local mode
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
def _include_python_packages(self, paths):
"""
Add Python package dependencies. Python packages (except for .whl) are
added to PYTHONPATH.
"""
root_dir = SparkFiles.getRootDirectory()
for path in paths:
basename = os.path.basename(path)
extname = os.path.splitext(basename)[1].lower()
if extname in self.PACKAGE_EXTENSIONS \
and basename not in self._python_includes:
self._python_includes.append(basename)
if extname != '.whl':
# Prepend the python package (except for *.whl) to sys.path
sys.path.insert(1, os.path.join(root_dir, basename))

def _install_wheel_files(
self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change this to match the style with others?

paths,
quiet=True,
upgrade=True,
no_deps=True,
no_index=True,
):
"""
Install .whl files at once by pip install.
"""
root_dir = SparkFiles.getRootDirectory()
paths = {
os.path.join(root_dir, os.path.basename(path))
for path in paths
if os.path.splitext(path)[1].lower() == '.whl'
}
if not paths:
return

pip_args = [
'--find-links', root_dir,
'--target', os.path.join(root_dir, 'site-packages'),
]
if quiet:
pip_args.append('--quiet')
if upgrade:
pip_args.append('--upgrade')
if no_deps:
pip_args.append('--no-deps')
if no_index:
pip_args.append('--no-index')
pip_args.extend(paths)

pip_InstallCommand().main(args=pip_args)

def setCheckpointDir(self, dirName):
"""
Expand Down
16 changes: 16 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,22 @@ def func():
from userlib import UserClass
self.assertEqual("Hello World from inside a package!", UserClass().hello())

def test_add_whl_file_locally(self):
# To ensure that we're actually testing addPyFile's effects, check that
# this fails due to `testpackage1` or `testpackage2` not being on the
# Python path:
def func():
from testpackage2 import TestPackage1Class
self.assertRaises(ImportError, func)
paths = [
os.path.join(SPARK_HOME, "python/test_support/testpackage1-0.0.1-py2.py3-none-any.whl"),
os.path.join(SPARK_HOME, "python/test_support/testpackage2-0.0.1-py2.py3-none-any.whl"),
]
for path in paths:
self.sc.addPyFile(path)
from testpackage2 import TestPackage1Class
self.assertEqual("Hello World from inside a package!", TestPackage1Class().hello())

def test_overwrite_system_module(self):
self.sc.addPyFile(os.path.join(SPARK_HOME, "python/test_support/SimpleHTTPServer.py"))

Expand Down
41 changes: 39 additions & 2 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
"""
Worker that receives input from Piped RDD.
"""
import fcntl
import os
import sys
import time
import socket
import traceback

from pip.commands.install import InstallCommand as pip_InstallCommand

from pyspark.accumulators import _accumulatorRegistry
from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.files import SparkFiles
Expand Down Expand Up @@ -66,12 +69,46 @@ def main(infile, outfile):
SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True

# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
add_path(spark_files_dir) # *.py files that were added will be copied here

# fetch names of includes and construct PYTHONPATH if the file extension
# is not '.whl'
num_python_includes = read_int(infile)
wheel_files = set()
for _ in range(num_python_includes):
filename = utf8_deserializer.loads(infile)
add_path(os.path.join(spark_files_dir, filename))
path = os.path.join(spark_files_dir, filename)
if os.path.splitext(filename)[1].lower() == '.whl':
wheel_files.add(path)
else:
add_path(path)

if wheel_files:
# Install wheel files

local_site_packages_dir = os.path.join(
spark_files_dir,
'site-packages',
)
with open(os.path.join(
spark_files_dir,
'.pyspark_pip_install.lock'
), 'w') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
try:
if os.path.exists(local_site_packages_dir) is False:
# '--no-deps' is not set.
# All dependencies must be there.
pip_InstallCommand().main(args=[
'--quiet',
'--upgrade',
'--no-index',
'--target', local_site_packages_dir,
'--find-links', spark_files_dir,
] + list(wheel_files))
finally:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
add_path(local_site_packages_dir)

# fetch names and values of broadcast variables
num_broadcast_variables = read_int(infile)
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
| --jar JAR_PATH Path to your application's JAR file
| --class CLASS_NAME Name of your application's main class
| --primary-py-file A main Python file
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
| --py-files PY_FILES Comma-separated list of .whl, .egg, .zip or .py files to
| place on the PYTHONPATH for Python apps.
| --args ARGS Arguments to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
| 'default')
| --addJars jars Comma separated list of local jars that want SparkContext.addJar
| to work with.
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
| --py-files PY_FILES Comma-separated list of .whl, .egg, .zip or .py files to
| place on the PYTHONPATH for Python apps.
| --files files Comma separated list of files to be distributed with the job.
| --archives archives Comma separated list of archives to be distributed with the job.
Expand Down