diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 6eb73c43470a5..294602a4638ae 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -447,8 +447,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | coordinates should be groupId:artifactId:version. | --repositories Comma-separated list of additional remote repositories to | search for the maven coordinates given with --packages. - | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place - | on the PYTHONPATH for Python apps. + | --py-files PY_FILES Comma-separated list of .whl, .egg, .zip or .py files to + | place on the PYTHONPATH for Python apps. | --files FILES Comma-separated list of files to be placed in the working | directory of each executor. | diff --git a/docs/programming-guide.md b/docs/programming-guide.md index f4fabb0927b66..3a6c6ffb1f9bf 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -204,7 +204,7 @@ For a complete list of options, run `spark-shell --help`. Behind the scenes, In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the -context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files +context connects to using the `--master` argument, and you can add Python .whl, .egg, .zip or .py files to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies (e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType) diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index 3ecbf2308cd44..47904ff5bf94b 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -18,9 +18,9 @@ as `provided` dependencies; these need not be bundled since they are provided by the cluster manager at runtime. Once you have an assembled jar you can call the `bin/spark-submit` script as shown here while passing your jar. -For Python, you can use the `--py-files` argument of `spark-submit` to add `.py`, `.zip` or `.egg` -files to be distributed with your application. If you depend on multiple Python files we recommend -packaging them into a `.zip` or `.egg`. +For Python, you can use the `--py-files` argument of `spark-submit` to add `.py`, `.whl`, `.egg` +or `.zip` files to be distributed with your application. If you depend on multiple Python files we +recommend packaging them into a `.whl`, `.egg` or `.zip`. # Launching Applications with spark-submit @@ -62,7 +62,7 @@ the drivers and the executors. Note that `cluster` mode is currently not support Mesos clusters or Python applications. For Python applications, simply pass a `.py` file in the place of `` instead of a JAR, -and add Python `.zip`, `.egg` or `.py` files to the search path with `--py-files`. +and add Python `.whl`, `.egg`, `.zip` or `.py` files to the search path with `--py-files`. There are a few options available that are specific to the [cluster manager](#cluster-overview.html#cluster-manager-types) that is being used. @@ -179,8 +179,8 @@ with `--packages`. All transitive dependencies will be handled when using this c repositories (or resolvers in SBT) can be added in a comma-delimited fashion with the flag `--repositories`. These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to include Spark Packages. -For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries -to executors. +For Python, the equivalent `--py-files` option can be used to distribute `.whl`, `.egg`, `.zip` +and `.py` libraries to executors. # More Information diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index b566507ee6061..5f03ec8c62c14 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -225,7 +225,7 @@ public SparkLauncher addFile(String file) { } /** - * Adds a python file / zip / egg to be submitted with the application. + * Adds a python file / zip / whl / egg to be submitted with the application. * * @param file Path to the file. * @return This launcher. diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 78dccc40470e3..bbbe0b02c8eb8 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -21,6 +21,8 @@ from threading import Lock from tempfile import NamedTemporaryFile +from pip.commands.install import InstallCommand as pip_InstallCommand + from py4j.java_collections import ListConverter from pyspark import accumulators @@ -62,9 +64,9 @@ class SparkContext(object): _next_accum_id = 0 _active_spark_context = None _lock = Lock() - _python_includes = None # zip and egg files that need to be added to PYTHONPATH + _python_includes = None # whl, egg, zip and jar files that need to be added to PYTHONPATH - PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar') + PACKAGE_EXTENSIONS = ('.whl', '.egg', '.zip', '.jar') def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, @@ -77,9 +79,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, (e.g. mesos://host:port, spark://host:port, local[4]). :param appName: A name for your job, to display on the cluster web UI. :param sparkHome: Location where Spark is installed on cluster nodes. - :param pyFiles: Collection of .zip or .py files to send to the cluster - and add to PYTHONPATH. These can be paths on the local file - system or HDFS, HTTP, HTTPS, or FTP URLs. + :param pyFiles: Collection of .py, .whl, .egg or .zip files to send + to the cluster and add to PYTHONPATH. These can be paths on + the local file system or HDFS, HTTP, HTTPS, or FTP URLs. :param environment: A dictionary of environment variables to set on worker nodes. :param batchSize: The number of Python objects represented as a single @@ -178,18 +180,24 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, sys.path.insert(1, root_dir) # Deploy any code dependencies specified in the constructor + # Wheel files will be installed by pip later. self._python_includes = list() - for path in (pyFiles or []): - self.addPyFile(path) + if pyFiles: + for path in pyFiles: + self.addFile(path) + self._include_python_packages(paths=pyFiles) + else: + pyFiles = [] # Deploy code dependencies set by spark-submit; these will already have been added - # with SparkContext.addFile, so we just need to add them to the PYTHONPATH - for path in self._conf.get("spark.submit.pyFiles", "").split(","): - if path != "": - (dirname, filename) = os.path.split(path) - if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: - self._python_includes.append(filename) - sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) + # with SparkContext.addFile, so we just need to include them. + # Wheel files will be installed by pip later. + spark_submit_pyfiles = self._conf.get("spark.submit.pyFiles", "").split(",") + if spark_submit_pyfiles: + self._include_python_packages(paths=spark_submit_pyfiles) + + # Install all wheel files at once. + self._install_wheel_files(paths=pyFiles + spark_submit_pyfiles) # Create a temporary directory inside spark.local.dir: local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) @@ -693,23 +701,71 @@ def clearFiles(self): Clear the job's list of files added by L{addFile} or L{addPyFile} so that they do not get downloaded to any new nodes. """ - # TODO: remove added .py or .zip files from the PYTHONPATH? + # TODO: remove added .py, .whl, .egg or .zip files from the PYTHONPATH? self._jsc.sc().clearFiles() def addPyFile(self, path): """ - Add a .py or .zip dependency for all tasks to be executed on this - SparkContext in the future. The C{path} passed can be either a local - file, a file in HDFS (or other Hadoop-supported filesystems), or an - HTTP, HTTPS or FTP URI. + Add a .py, .whl, .egg or .zip dependency for all tasks to be + executed on this SparkContext in the future. The C{path} passed can + be either a local file, a file in HDFS (or other Hadoop-supported + filesystems), or an HTTP, HTTPS or FTP URI. """ self.addFile(path) - (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix + self._include_python_packages(paths=(path,)) + self._install_wheel_files(paths=(path,)) - if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: - self._python_includes.append(filename) - # for tests in local mode - sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) + def _include_python_packages(self, paths): + """ + Add Python package dependencies. Python packages (except for .whl) are + added to PYTHONPATH. + """ + root_dir = SparkFiles.getRootDirectory() + for path in paths: + basename = os.path.basename(path) + extname = os.path.splitext(basename)[1].lower() + if extname in self.PACKAGE_EXTENSIONS \ + and basename not in self._python_includes: + self._python_includes.append(basename) + if extname != '.whl': + # Prepend the python package (except for *.whl) to sys.path + sys.path.insert(1, os.path.join(root_dir, basename)) + + def _install_wheel_files( + self, + paths, + quiet=True, + upgrade=True, + no_deps=True, + no_index=True, + ): + """ + Install .whl files at once by pip install. + """ + root_dir = SparkFiles.getRootDirectory() + paths = { + os.path.join(root_dir, os.path.basename(path)) + for path in paths + if os.path.splitext(path)[1].lower() == '.whl' + } + if not paths: + return + + pip_args = [ + '--find-links', root_dir, + '--target', os.path.join(root_dir, 'site-packages'), + ] + if quiet: + pip_args.append('--quiet') + if upgrade: + pip_args.append('--upgrade') + if no_deps: + pip_args.append('--no-deps') + if no_index: + pip_args.append('--no-index') + pip_args.extend(paths) + + pip_InstallCommand().main(args=pip_args) def setCheckpointDir(self, dirName): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index dd8d3b1c53733..c93f948707c19 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -367,6 +367,22 @@ def func(): from userlib import UserClass self.assertEqual("Hello World from inside a package!", UserClass().hello()) + def test_add_whl_file_locally(self): + # To ensure that we're actually testing addPyFile's effects, check that + # this fails due to `testpackage1` or `testpackage2` not being on the + # Python path: + def func(): + from testpackage2 import TestPackage1Class + self.assertRaises(ImportError, func) + paths = [ + os.path.join(SPARK_HOME, "python/test_support/testpackage1-0.0.1-py2.py3-none-any.whl"), + os.path.join(SPARK_HOME, "python/test_support/testpackage2-0.0.1-py2.py3-none-any.whl"), + ] + for path in paths: + self.sc.addPyFile(path) + from testpackage2 import TestPackage1Class + self.assertEqual("Hello World from inside a package!", TestPackage1Class().hello()) + def test_overwrite_system_module(self): self.sc.addPyFile(os.path.join(SPARK_HOME, "python/test_support/SimpleHTTPServer.py")) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 8a93c320ec5d3..30671726c8f0d 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -18,12 +18,15 @@ """ Worker that receives input from Piped RDD. """ +import fcntl import os import sys import time import socket import traceback +from pip.commands.install import InstallCommand as pip_InstallCommand + from pyspark.accumulators import _accumulatorRegistry from pyspark.broadcast import Broadcast, _broadcastRegistry from pyspark.files import SparkFiles @@ -66,12 +69,46 @@ def main(infile, outfile): SparkFiles._root_directory = spark_files_dir SparkFiles._is_running_on_worker = True - # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH add_path(spark_files_dir) # *.py files that were added will be copied here + + # fetch names of includes and construct PYTHONPATH if the file extension + # is not '.whl' num_python_includes = read_int(infile) + wheel_files = set() for _ in range(num_python_includes): filename = utf8_deserializer.loads(infile) - add_path(os.path.join(spark_files_dir, filename)) + path = os.path.join(spark_files_dir, filename) + if os.path.splitext(filename)[1].lower() == '.whl': + wheel_files.add(path) + else: + add_path(path) + + if wheel_files: + # Install wheel files + + local_site_packages_dir = os.path.join( + spark_files_dir, + 'site-packages', + ) + with open(os.path.join( + spark_files_dir, + '.pyspark_pip_install.lock' + ), 'w') as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX) + try: + if os.path.exists(local_site_packages_dir) is False: + # '--no-deps' is not set. + # All dependencies must be there. + pip_InstallCommand().main(args=[ + '--quiet', + '--upgrade', + '--no-index', + '--target', local_site_packages_dir, + '--find-links', spark_files_dir, + ] + list(wheel_files)) + finally: + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + add_path(local_site_packages_dir) # fetch names and values of broadcast variables num_broadcast_variables = read_int(infile) diff --git a/python/test_support/testpackage1-0.0.1-py2.py3-none-any.whl b/python/test_support/testpackage1-0.0.1-py2.py3-none-any.whl new file mode 100644 index 0000000000000..76b9ff834a1bb Binary files /dev/null and b/python/test_support/testpackage1-0.0.1-py2.py3-none-any.whl differ diff --git a/python/test_support/testpackage2-0.0.1-py2.py3-none-any.whl b/python/test_support/testpackage2-0.0.1-py2.py3-none-any.whl new file mode 100644 index 0000000000000..8dca7fd801cd5 Binary files /dev/null and b/python/test_support/testpackage2-0.0.1-py2.py3-none-any.whl differ diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index e1a992af3aae7..35bfa3ef1b494 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -92,7 +92,7 @@ class ApplicationMasterArguments(val args: Array[String]) { | --jar JAR_PATH Path to your application's JAR file | --class CLASS_NAME Name of your application's main class | --primary-py-file A main Python file - | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to + | --py-files PY_FILES Comma-separated list of .whl, .egg, .zip or .py files to | place on the PYTHONPATH for Python apps. | --args ARGS Arguments to be passed to your application's main class. | Multiple invocations are possible, each will be passed in order. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 3bc7eb1abf341..a519813579ab0 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -252,7 +252,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) | 'default') | --addJars jars Comma separated list of local jars that want SparkContext.addJar | to work with. - | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to + | --py-files PY_FILES Comma-separated list of .whl, .egg, .zip or .py files to | place on the PYTHONPATH for Python apps. | --files files Comma separated list of files to be distributed with the job. | --archives archives Comma separated list of archives to be distributed with the job.