Skip to content

Commit

Permalink
Add wheel package support for PySpark
Browse files Browse the repository at this point in the history
  • Loading branch information
takaomag committed Apr 8, 2015
1 parent d138aa8 commit 369e96c
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| coordinates should be groupId:artifactId:version.
| --repositories Comma-separated list of additional remote repositories to
| search for the maven coordinates given with --packages.
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
| on the PYTHONPATH for Python apps.
| --py-files PY_FILES Comma-separated list of .whl, .egg, .zip or .py files to
| place on the PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor.
|
Expand Down
2 changes: 1 addition & 1 deletion docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ For a complete list of options, run `spark-shell --help`. Behind the scenes,

In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called `sc`. Making your own SparkContext will not work. You can set which master the
context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files
context connects to using the `--master` argument, and you can add Python .whl, .egg, .zip or .py files
to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies
(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType)
Expand Down
12 changes: 6 additions & 6 deletions docs/submitting-applications.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ as `provided` dependencies; these need not be bundled since they are provided by
the cluster manager at runtime. Once you have an assembled jar you can call the `bin/spark-submit`
script as shown here while passing your jar.

For Python, you can use the `--py-files` argument of `spark-submit` to add `.py`, `.zip` or `.egg`
files to be distributed with your application. If you depend on multiple Python files we recommend
packaging them into a `.zip` or `.egg`.
For Python, you can use the `--py-files` argument of `spark-submit` to add `.py`, `.whl`, `.egg`
or `.zip` files to be distributed with your application. If you depend on multiple Python files we
recommend packaging them into a `.whl`, `.egg` or `.zip`.

# Launching Applications with spark-submit

Expand Down Expand Up @@ -62,7 +62,7 @@ the drivers and the executors. Note that `cluster` mode is currently not support
Mesos clusters or Python applications.

For Python applications, simply pass a `.py` file in the place of `<application-jar>` instead of a JAR,
and add Python `.zip`, `.egg` or `.py` files to the search path with `--py-files`.
and add Python `.whl`, `.egg`, `.zip` or `.py` files to the search path with `--py-files`.

There are a few options available that are specific to the
[cluster manager](#cluster-overview.html#cluster-manager-types) that is being used.
Expand Down Expand Up @@ -179,8 +179,8 @@ with `--packages`. All transitive dependencies will be handled when using this c
repositories (or resolvers in SBT) can be added in a comma-delimited fashion with the flag `--repositories`.
These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to include Spark Packages.

For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries
to executors.
For Python, the equivalent `--py-files` option can be used to distribute `.whl`, `.egg`, `.zip`
and `.py` libraries to executors.

# More Information

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public SparkLauncher addFile(String file) {
}

/**
* Adds a python file / zip / egg to be submitted with the application.
* Adds a python file / zip / whl / egg to be submitted with the application.
*
* @param file Path to the file.
* @return This launcher.
Expand Down
104 changes: 80 additions & 24 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from threading import Lock
from tempfile import NamedTemporaryFile

from pip.commands.install import InstallCommand as pip_InstallCommand

from py4j.java_collections import ListConverter

from pyspark import accumulators
Expand Down Expand Up @@ -62,9 +64,9 @@ class SparkContext(object):
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
_python_includes = None # whl, egg, zip and jar files that need to be added to PYTHONPATH

PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
PACKAGE_EXTENSIONS = ('.whl', '.egg', '.zip', '.jar')

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
Expand All @@ -77,9 +79,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
(e.g. mesos://host:port, spark://host:port, local[4]).
:param appName: A name for your job, to display on the cluster web UI.
:param sparkHome: Location where Spark is installed on cluster nodes.
:param pyFiles: Collection of .zip or .py files to send to the cluster
and add to PYTHONPATH. These can be paths on the local file
system or HDFS, HTTP, HTTPS, or FTP URLs.
:param pyFiles: Collection of .py, .whl, .egg or .zip files to send
to the cluster and add to PYTHONPATH. These can be paths on
the local file system or HDFS, HTTP, HTTPS, or FTP URLs.
:param environment: A dictionary of environment variables to set on
worker nodes.
:param batchSize: The number of Python objects represented as a single
Expand Down Expand Up @@ -178,18 +180,24 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
sys.path.insert(1, root_dir)

# Deploy any code dependencies specified in the constructor
# Wheel files will be installed by pip later.
self._python_includes = list()
for path in (pyFiles or []):
self.addPyFile(path)
if pyFiles:
for path in pyFiles:
self.addFile(path)
self._include_python_packages(paths=pyFiles)
else:
pyFiles = []

# Deploy code dependencies set by spark-submit; these will already have been added
# with SparkContext.addFile, so we just need to add them to the PYTHONPATH
for path in self._conf.get("spark.submit.pyFiles", "").split(","):
if path != "":
(dirname, filename) = os.path.split(path)
if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
self._python_includes.append(filename)
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
# with SparkContext.addFile, so we just need to include them.
# Wheel files will be installed by pip later.
spark_submit_pyfiles = self._conf.get("spark.submit.pyFiles", "").split(",")
if spark_submit_pyfiles:
self._include_python_packages(paths=spark_submit_pyfiles)

# Install all wheel files at once.
self._install_wheel_files(paths=pyFiles + spark_submit_pyfiles)

# Create a temporary directory inside spark.local.dir:
local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
Expand Down Expand Up @@ -693,23 +701,71 @@ def clearFiles(self):
Clear the job's list of files added by L{addFile} or L{addPyFile} so
that they do not get downloaded to any new nodes.
"""
# TODO: remove added .py or .zip files from the PYTHONPATH?
# TODO: remove added .py, .whl, .egg or .zip files from the PYTHONPATH?
self._jsc.sc().clearFiles()

def addPyFile(self, path):
"""
Add a .py or .zip dependency for all tasks to be executed on this
SparkContext in the future. The C{path} passed can be either a local
file, a file in HDFS (or other Hadoop-supported filesystems), or an
HTTP, HTTPS or FTP URI.
Add a .py, .whl, .egg or .zip dependency for all tasks to be
executed on this SparkContext in the future. The C{path} passed can
be either a local file, a file in HDFS (or other Hadoop-supported
filesystems), or an HTTP, HTTPS or FTP URI.
"""
self.addFile(path)
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
self._include_python_packages(paths=(path,))
self._install_wheel_files(paths=(path,))

if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
self._python_includes.append(filename)
# for tests in local mode
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
def _include_python_packages(self, paths):
"""
Add Python package dependencies. Python packages (except for .whl) are
added to PYTHONPATH.
"""
root_dir = SparkFiles.getRootDirectory()
for path in paths:
basename = os.path.basename(path)
extname = os.path.splitext(basename)[1].lower()
if extname in self.PACKAGE_EXTENSIONS \
and basename not in self._python_includes:
self._python_includes.append(basename)
if extname != '.whl':
# Prepend the python package (except for *.whl) to sys.path
sys.path.insert(1, os.path.join(root_dir, basename))

def _install_wheel_files(
self,
paths,
quiet=True,
upgrade=True,
no_deps=True,
no_index=True,
):
"""
Install .whl files at once by pip install.
"""
root_dir = SparkFiles.getRootDirectory()
paths = {
os.path.join(root_dir, os.path.basename(path))
for path in paths
if os.path.splitext(path)[1].lower() == '.whl'
}
if not paths:
return

pip_args = [
'--find-links', root_dir,
'--target', os.path.join(root_dir, 'site-packages'),
]
if quiet:
pip_args.append('--quiet')
if upgrade:
pip_args.append('--upgrade')
if no_deps:
pip_args.append('--no-deps')
if no_index:
pip_args.append('--no-index')
pip_args.extend(paths)

pip_InstallCommand().main(args=pip_args)

def setCheckpointDir(self, dirName):
"""
Expand Down
16 changes: 16 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,22 @@ def func():
from userlib import UserClass
self.assertEqual("Hello World from inside a package!", UserClass().hello())

def test_add_whl_file_locally(self):
# To ensure that we're actually testing addPyFile's effects, check that
# this fails due to `testpackage1` or `testpackage2` not being on the
# Python path:
def func():
from testpackage2 import TestPackage1Class
self.assertRaises(ImportError, func)
paths = [
os.path.join(SPARK_HOME, "python/test_support/testpackage1-0.0.1-py2.py3-none-any.whl"),
os.path.join(SPARK_HOME, "python/test_support/testpackage2-0.0.1-py2.py3-none-any.whl"),
]
for path in paths:
self.sc.addPyFile(path)
from testpackage2 import TestPackage1Class
self.assertEqual("Hello World from inside a package!", TestPackage1Class().hello())

def test_overwrite_system_module(self):
self.sc.addPyFile(os.path.join(SPARK_HOME, "python/test_support/SimpleHTTPServer.py"))

Expand Down
41 changes: 39 additions & 2 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
"""
Worker that receives input from Piped RDD.
"""
import fcntl
import os
import sys
import time
import socket
import traceback

from pip.commands.install import InstallCommand as pip_InstallCommand

from pyspark.accumulators import _accumulatorRegistry
from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.files import SparkFiles
Expand Down Expand Up @@ -66,12 +69,46 @@ def main(infile, outfile):
SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True

# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
add_path(spark_files_dir) # *.py files that were added will be copied here

# fetch names of includes and construct PYTHONPATH if the file extension
# is not '.whl'
num_python_includes = read_int(infile)
wheel_files = set()
for _ in range(num_python_includes):
filename = utf8_deserializer.loads(infile)
add_path(os.path.join(spark_files_dir, filename))
path = os.path.join(spark_files_dir, filename)
if os.path.splitext(filename)[1].lower() == '.whl':
wheel_files.add(path)
else:
add_path(path)

if wheel_files:
# Install wheel files

local_site_packages_dir = os.path.join(
spark_files_dir,
'site-packages',
)
with open(os.path.join(
spark_files_dir,
'.pyspark_pip_install.lock'
), 'w') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
try:
if os.path.exists(local_site_packages_dir) is False:
# '--no-deps' is not set.
# All dependencies must be there.
pip_InstallCommand().main(args=[
'--quiet',
'--upgrade',
'--no-index',
'--target', local_site_packages_dir,
'--find-links', spark_files_dir,
] + list(wheel_files))
finally:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
add_path(local_site_packages_dir)

# fetch names and values of broadcast variables
num_broadcast_variables = read_int(infile)
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
| --jar JAR_PATH Path to your application's JAR file
| --class CLASS_NAME Name of your application's main class
| --primary-py-file A main Python file
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
| --py-files PY_FILES Comma-separated list of .whl, .egg, .zip or .py files to
| place on the PYTHONPATH for Python apps.
| --args ARGS Arguments to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
| 'default')
| --addJars jars Comma separated list of local jars that want SparkContext.addJar
| to work with.
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
| --py-files PY_FILES Comma-separated list of .whl, .egg, .zip or .py files to
| place on the PYTHONPATH for Python apps.
| --files files Comma separated list of files to be distributed with the job.
| --archives archives Comma separated list of archives to be distributed with the job.
Expand Down

0 comments on commit 369e96c

Please sign in to comment.