Skip to content

Commit

Permalink
SPARK-1004. PySpark on YARN
Browse files Browse the repository at this point in the history
This reopens https://github.com/apache/incubator-spark/pull/640 against the new repo

Author: Sandy Ryza <[email protected]>

Closes #30 from sryza/sandy-spark-1004 and squashes the following commits:

89889d4 [Sandy Ryza] Move unzipping py4j to the generate-resources phase so that it gets included in the jar the first time
5165a02 [Sandy Ryza] Fix docs
fd0df79 [Sandy Ryza] PySpark on YARN
(cherry picked from commit ff5be9a)

Signed-off-by: Patrick Wendell <[email protected]>
  • Loading branch information
sryza authored and pwendell committed Apr 30, 2014
1 parent b0ded1f commit 177361c
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 19 deletions.
1 change: 1 addition & 0 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
Expand Down
1 change: 1 addition & 0 deletions bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ rem Figure out which Python to use.
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python

set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
Expand Down
42 changes: 42 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,48 @@
</environmentVariables>
</configuration>
</plugin>
<!-- Unzip py4j so we can include its files in the jar -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>unzip</executable>
<workingDirectory>../python</workingDirectory>
<arguments>
<argument>-o</argument>
<argument>lib/py4j*.zip</argument>
<argument>-d</argument>
<argument>build</argument>
</arguments>
</configuration>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>../python</directory>
<includes>
<include>pyspark/*.py</include>
</includes>
</resource>
<resource>
<directory>../python/build</directory>
<includes>
<include>py4j/*.py</include>
</includes>
</resource>
</resources>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))

// Create and start the worker
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()

// Redirect the worker's stderr to ours
Expand Down Expand Up @@ -154,12 +151,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

try {
// Create and start the daemon
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()

// Redirect the stderr to ours
Expand Down
3 changes: 3 additions & 0 deletions docs/python-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.
Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`.

# Running PySpark on YARN

To run PySpark against a YARN cluster, simply set the MASTER environment variable to "yarn-client".

# Interactive Use

Expand Down
3 changes: 3 additions & 0 deletions python/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
*.pyc
docs/
pyspark.egg-info
build/
dist/
1 change: 0 additions & 1 deletion python/lib/PY4J_VERSION.txt

This file was deleted.

7 changes: 0 additions & 7 deletions python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@
Main entry point for accessing data stored in Apache Hive..
"""



import sys
import os
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j-0.8.1-src.zip"))


from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
Expand Down
29 changes: 27 additions & 2 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
from py4j.java_gateway import java_import, JavaGateway, GatewayClient


SPARK_HOME = os.environ["SPARK_HOME"]
def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]

set_env_vars_for_yarn()

def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
Expand Down Expand Up @@ -70,3 +71,27 @@ def run(self):
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
java_import(gateway.jvm, "scala.Tuple2")
return gateway

def set_env_vars_for_yarn():
# Add the spark jar, which includes the pyspark files, to the python path
env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", ""))
if "PYTHONPATH" in env_map:
env_map["PYTHONPATH"] += ":spark.jar"
else:
env_map["PYTHONPATH"] = "spark.jar"

os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items())

def parse_env(env_str):
# Turns a comma-separated of env settings into a dict that maps env vars to
# their values.
env = {}
for var_str in env_str.split(","):
parts = var_str.split("=")
if len(parts) == 2:
env[parts[0]] = parts[1]
elif len(var_str) > 0:
print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str
sys.exit(1)

return env
4 changes: 3 additions & 1 deletion python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@

from pyspark.context import SparkContext
from pyspark.files import SparkFiles
from pyspark.java_gateway import SPARK_HOME
from pyspark.serializers import read_int


SPARK_HOME = os.environ["SPARK_HOME"]


class PySparkTestCase(unittest.TestCase):

def setUp(self):
Expand Down
3 changes: 3 additions & 0 deletions sbin/spark-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ this="$config_bin/$script"
export SPARK_PREFIX=`dirname "$this"`/..
export SPARK_HOME=${SPARK_PREFIX}
export SPARK_CONF_DIR="$SPARK_HOME/conf"
# Add the PySpark classes to the PYTHONPATH:
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH

0 comments on commit 177361c

Please sign in to comment.