Skip to content

Commit

Permalink
Enhance Ray on spark (#1449)
Browse files Browse the repository at this point in the history
* add more doc, spark_conf and extra_options

* fix pip install

* doc

* fix release.sh
  • Loading branch information
zhichao-li authored Jun 12, 2019
1 parent 7ab24e5 commit 545aa52
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 45 deletions.
5 changes: 4 additions & 1 deletion docs/docs/APIGuide/Ray/rayonspark.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ AnalyticsZoo has already provided a mechanism to deploy Python dependencies and
across yarn cluster,meaning python user would be able to run `Analytics-Zoo` or `Ray`
in a pythonic way on yarn without `spark-submit` or installing Analytics-Zoo or Ray across all cluster nodes.


## Here are the steps to run RayOnSpark:

1) You should install Conda first and create a conda-env named "ray36"
1) You should install [Conda](https://docs.conda.io/projects/conda/en/latest/commands/install.html) and create a conda-env named "ray36"

2) Install some essential dependencies inthe conda env

Expand Down Expand Up @@ -91,3 +92,5 @@ print([ray.get(actor.ip.remote()) for actor in actors])
ray_ctx.stop()

```


43 changes: 38 additions & 5 deletions docs/docs/PythonUserGuide/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ For Python users, Analytics Zoo can be installed either [from pip](#install-from
**NOTE**: Only __Python 2.7__, __Python 3.5__ and __Python 3.6__ are supported for now.

---
## **Install from pip**
## **Install from pip for local usage**
You can use the following command to install the latest release version of __analytics-zoo__ via pip easily:

```bash
Expand All @@ -24,13 +24,46 @@ sc = init_nncontext()
```

**Remarks:**

1. We've tested this package with pip 9.0.1. `pip install --upgrade pip` if necessary.
2. Pip install supports __Mac__ and __Linux__ platforms.
3. Pip install only supports __local__ mode. Cluster mode might be supported in the future. For those who want to use Analytics Zoo in cluster mode, please try to [install without pip](#install-without-pip).
4. You need to install Java __>= JDK8__ before running Analytics Zoo, which is required by `pyspark`.
5. `pyspark==2.4.3`, `bigdl==0.8.0` and their dependencies will automatically be installed if they haven't been detected in the current Python environment.
3. You need to install Java __>= JDK8__ before running Analytics Zoo, which is required by `pyspark`.
4. `pyspark==2.4.3`, `bigdl==0.8.0` and their dependencies will automatically be installed if they haven't been detected in the current Python environment.

## **Install from pip for yarn cluster**

You only need to following these steps on your driver node and we only support yarn-client mode for now.

1) Install [Conda](https://docs.conda.io/projects/conda/en/latest/commands/install.html) and create a conda-env(i.e in the name of "zoo")

2) Install Analytics-Zoo into the created conda-env

```
source activate zoo
pip install analytics-zoo
```
3) Download JDK8 and set the environment variable: JAVA_HOME (recommended).
- You can also install JDK via conda without setting the JAVA_HOME manually:
`conda install -c anaconda openjdk=8.0.152`

4) Start python and then execute the following code for verification.

- Create a SparkContext on Yarn

``` python

from zoo import init_spark_on_yarn

sc = init_spark_on_yarn(
hadoop_conf="path to the yarn configuration folder",
conda_name="zoo", # The name of the created conda-env
num_executor=2,
executor_cores=4,
executor_memory="8g",
driver_memory="2g",
driver_cores=4,
extra_executor_memory_for_ray="10g")
```

---
## **Install without pip**
Expand Down
23 changes: 23 additions & 0 deletions docs/docs/PythonUserGuide/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ You need to first [install](install.md) analytics-zoo, either [from pip](install
**NOTE**: Only __Python 2.7__, __Python 3.5__ and __Python 3.6__ are supported for now.

---

## **Run after pip install**

**Important:**
Expand Down Expand Up @@ -48,6 +49,28 @@ export BIGDL_JARS=...
export BIGDL_PACKAGES=...
```

## **Run on yarn after pip install

Start python and then execute the following code:
Caveat: You should use `init_spark_on_yarn` rather than `init_nncontext()` here.
- Create a SparkContext on Yarn

``` python

from zoo import init_spark_on_yarn

sc = init_spark_on_yarn(
hadoop_conf="path to the yarn configuration folder",
conda_name="zoo", # The name of the created conda-env
num_executor=2,
executor_cores=4,
executor_memory="8g",
driver_memory="2g",
driver_cores=4,
extra_executor_memory_for_ray="10g")

```

---
## **Run without pip install**
- Note that __Python 3.6__ is only compatible with Spark 1.6.4, 2.0.3, 2.1.1 and >=2.2.0. See [this issue](https://issues.apache.org/jira/browse/SPARK-19019) for more discussion.
Expand Down
2 changes: 2 additions & 0 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pages:
- Relation: APIGuide/FeatureEngineering/relation.md
- Image: APIGuide/FeatureEngineering/image.md
- Text: APIGuide/FeatureEngineering/text.md
- Ray:
- RayOnSpark: APIGuide/Ray/rayonspark.md
- Models:
- Object Detection: APIGuide/Models/object-detection.md
- Image Classification: APIGuide/Models/image-classification.md
Expand Down
15 changes: 11 additions & 4 deletions pyzoo/dev/release/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,23 @@ echo $ANALYTICS_ZOO_HOME
ANALYTICS_ZOO_PYTHON_DIR="$(cd ${RUN_SCRIPT_DIR}/../../../pyzoo; pwd)"
echo $ANALYTICS_ZOO_PYTHON_DIR

if (( $# < 1)); then
echo "Bad parameters. Usage example: bash release.sh linux"
if (( $# < 2)); then
echo "Bad parameters"
echo "Usage example: bash release.sh linux default"
echo "Usage example: bash release.sh linux 0.6.0.dev0"
echo "If needed, you can also add other profiles such as: -Dspark.version=2.4.3 -Dbigdl.artifactId=bigdl-SPARK_2.4 -P spark_2.x"
exit -1
fi

platform=$1
profiles=${*:2}
version=`cat $ANALYTICS_ZOO_PYTHON_DIR/zoo/__init__.py | grep "__version__" | awk '{print $NF}' | tr -d '"'`
version=$2
profiles=${*:3}

if [ "${version}" == "default" ]; then
version=`cat $ANALYTICS_ZOO_PYTHON_DIR/zoo/__init__.py | grep "__version__" | awk '{print $NF}' | tr -d '"'`
fi

echo "Using version: ${version}"

cd ${ANALYTICS_ZOO_HOME}
if [ "$platform" == "mac" ]; then
Expand Down
2 changes: 1 addition & 1 deletion pyzoo/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def setup_package():
license='Apache License, Version 2.0',
url='https://github.com/intel-analytics/analytics-zoo',
packages=packages,
install_requires=['pyspark==2.4.3', 'bigdl==0.8.0'],
install_requires=['pyspark==2.4.3', 'bigdl==0.8.0', 'conda-pack==0.3.1'],
dependency_links=['https://d3kbcqa49mib13.cloudfront.net/spark-2.0.0-bin-hadoop2.7.tgz'],
include_package_data=True,
package_data={"zoo.share": ['lib/analytics-zoo*with-dependencies.jar', 'conf/*', 'bin/*',
Expand Down
4 changes: 3 additions & 1 deletion pyzoo/test/zoo/ray/integration/ray_on_yarn.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
executor_memory="10g",
driver_memory="2g",
driver_cores=4,
extra_executor_memory_for_ray="30g")
extra_executor_memory_for_ray="30g",
spark_conf={"hello": "world"})

ray_ctx = RayContext(sc=sc,
object_store_memory="25g",
extra_params={"temp-dir": "/tmp/hello/"},
env={"http_proxy": "http://child-prc.intel.com:913",
"http_proxys": "http://child-prc.intel.com:913"})
ray_ctx.init()
Expand Down
2 changes: 0 additions & 2 deletions pyzoo/test/zoo/ray/test_ray_on_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ def test_local(self):
for process_info in ray_ctx.ray_processesMonitor.process_infos:
for pid in process_info.pids:
assert not psutil.pid_exists(pid)
sc.stop()


if __name__ == "__main__":
pytest.main([__file__])
12 changes: 8 additions & 4 deletions pyzoo/zoo/common/nncontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def init_spark_on_yarn(hadoop_conf,
hadoop_user_name="root",
spark_yarn_archive=None,
spark_log_level="WARN",
redirect_spark_log=True):
redirect_spark_log=True,
spark_conf=None):
"""
Create a SparkContext with Zoo configuration on Yarn cluster on "Yarn-client" mode.
You should create a conda env and install the python dependencies in that env.
Expand All @@ -72,8 +73,10 @@ def init_spark_on_yarn(hadoop_conf,
`conda_name`, but you can also pass the path to a packed file in "tar.gz" format here.
:param hadoop_user_name: User name for running in yarn cluster. Default value is: root
:param spark_log_level: Log level of Spark
:param redirect_spark_log:
:return: Direct the Spark log to local file or not.
:param redirect_spark_log: Direct the Spark log to local file or not.
:param spark_conf: You can append extra spark conf here in key value format.
i.e spark_conf={"spark.executor.extraJavaOptions": "-XX:+PrintGCDetails"}
:return: SparkContext
"""
from zoo.ray.util.spark import SparkRunner
sparkrunner = SparkRunner(spark_log_level=spark_log_level,
Expand All @@ -91,7 +94,8 @@ def init_spark_on_yarn(hadoop_conf,
penv_archive=penv_archive,
hadoop_user_name=hadoop_user_name,
spark_yarn_archive=spark_yarn_archive,
jars=None)
jars=None,
spark_conf=spark_conf)
return sc


Expand Down
44 changes: 28 additions & 16 deletions pyzoo/zoo/ray/util/raycontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def _prepare_env(self, cores=None):
return modified_env

def __init__(self, python_loc, redis_port, ray_node_cpu_cores, mkl_cores,
password, object_store_memory, waitting_time_sec=6, verbose=False, env=None):
password, object_store_memory, waitting_time_sec=6, verbose=False, env=None,
extra_params=None):
"""object_store_memory: integer in bytes"""
self.env = env
self.python_loc = python_loc
Expand All @@ -103,6 +104,7 @@ def __init__(self, python_loc, redis_port, ray_node_cpu_cores, mkl_cores,
self.ray_exec = self._get_ray_exec()
self.object_store_memory = object_store_memory
self.waiting_time_sec = waitting_time_sec
self.extra_params = extra_params
self.verbose = verbose
self.labels = """--resources='{"trainer": %s, "ps": %s }' """ % (1, 1)

Expand All @@ -115,22 +117,25 @@ def _stop(iter):

return _stop

def _gen_master_command(self):
command = "{} start --head " \
"--include-webui --redis-port {} \
--redis-password {} --num-cpus {} ". \
format(self.ray_exec, self.redis_port, self.password, self.ray_node_cpu_cores)
def _enrich_command(self, command):
if self.object_store_memory:
command = command + "--object-store-memory {} ".format(str(self.object_store_memory))
if self.extra_params:
for pair in self.extra_params.items():
command = command + " --{} {} ".format(pair[0], pair[1])
return command

def _gen_master_command(self):
command = "{} start --head " \
"--include-webui --redis-port {} " \
"--redis-password {} --num-cpus {} ". \
format(self.ray_exec, self.redis_port, self.password, self.ray_node_cpu_cores)
return self._enrich_command(command)

def _get_raylet_command(self, redis_address):
command = "{} start --redis-address {} --redis-password {} --num-cpus {} {} ".format(
self.ray_exec, redis_address, self.password, self.ray_node_cpu_cores, self.labels)

if self.object_store_memory:
command = command + "--object-store-memory {} ".format(str(self.object_store_memory))
return command
return self._enrich_command(command)

def _start_ray_node(self, command, tag, wait_before=5, wait_after=5):
modified_env = self._prepare_env(self.mkl_cores)
Expand Down Expand Up @@ -180,18 +185,24 @@ def _start_ray_services(iter):

class RayContext(object):
def __init__(self, sc, redis_port=None, password="123456", object_store_memory=None,
verbose=False, env=None, local_ray_node_num=2, waitting_time_sec=8):
verbose=False, env=None, local_ray_node_num=2, waiting_time_sec=8,
extra_params=None):
"""
The RayContext would init a ray cluster on top of the configuration of the SparkContext.
The RayContext would init a ray cluster on top of the configuration of SparkContext.
For spark cluster mode: The number of raylets is equal to number of executors.
For Spark local mode: The number of raylets is controlled by local_ray_node_num.
CPU cores for each raylet equals to spark_cores/local_ray_node_num.
CPU cores for each is raylet equals to spark_cores/local_ray_node_num.
:param sc:
:param redis_port: redis port for the "head" node.
The value would be randomly picked if not specified
:param local_ray_node_num number of raylets to be created.
The value would be randomly picked if not specified.
:param password: [optional] password for the redis.
:param object_store_memory: Memory size for the object_store.
:param verbose: True for more logs.
:param env: The environment variable dict for running Ray.
:param local_ray_node_num number of raylets to be created.
:param waiting_time_sec: Waiting time for the raylets before connecting to redis.
:param extra_params: key value dictionary for extra options to launch Ray.
i.e extra_params={"temp-dir": "/tmp/ray2/"}
"""
self.sc = sc
self.stopped = False
Expand All @@ -212,7 +223,8 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N
object_store_memory=self._enrich_object_sotre_memory(sc, object_store_memory),
verbose=verbose,
env=env,
waitting_time_sec=waitting_time_sec)
waitting_time_sec=waiting_time_sec,
extra_params=extra_params)
self._gather_cluster_ips()
from bigdl.util.common import init_executor_gateway
print("Start to launch the JVM guarding process")
Expand Down
32 changes: 21 additions & 11 deletions pyzoo/zoo/ray/util/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ def _detect_python_location(self):
"Cannot detect current python location. Please set it manually by python_location")
return process_info.out

def _gather_essential_jars(self):
from bigdl.util.engine import get_bigdl_classpath
from zoo.util.engine import get_analytics_zoo_classpath
bigdl_classpath = get_bigdl_classpath()
zoo_classpath = get_analytics_zoo_classpath()
assert bigdl_classpath, "Cannot find bigdl classpath"
assert zoo_classpath, "Cannot find Analytics-Zoo classpath"
if bigdl_classpath == zoo_classpath:
return [zoo_classpath]
else:
return [zoo_classpath, bigdl_classpath]

def init_spark_on_local(self, cores, conf=None, python_location=None):
print("Start to getOrCreate SparkContext")
os.environ['PYSPARK_PYTHON'] =\
Expand All @@ -124,29 +136,24 @@ def init_spark_on_yarn(self,
penv_archive=None,
hadoop_user_name="root",
spark_yarn_archive=None,
spark_conf=None,
jars=None):
os.environ["HADOOP_CONF_DIR"] = hadoop_conf
os.environ['HADOOP_USER_NAME'] = hadoop_user_name
os.environ['PYSPARK_PYTHON'] = "python_env/bin/python"

def _yarn_opt(jars):
from zoo.util.engine import get_analytics_zoo_classpath
command = " --archives {}#python_env --num-executors {} " \
" --executor-cores {} --executor-memory {}".\
format(penv_archive, num_executor, executor_cores, executor_memory)
path_to_zoo_jar = get_analytics_zoo_classpath()
jars_list = self._gather_essential_jars()
if jars:
jars_list.append(jars)

if extra_python_lib:
command = command + " --py-files {} ".format(extra_python_lib)

if jars:
command = command + " --jars {},{} ".format(jars, path_to_zoo_jar)
elif path_to_zoo_jar:
command = command + " --jars {} ".format(path_to_zoo_jar)

if path_to_zoo_jar:
command = command + " --conf spark.driver.extraClassPath={} ".\
format(get_analytics_zoo_classpath())
command = command + " --jars {}".format(",".join(jars_list))
return command

def _submit_opt():
Expand All @@ -158,7 +165,7 @@ def _submit_opt():
conf["spark.executor.memoryOverhead"] = extra_executor_memory_for_ray
if spark_yarn_archive:
conf.insert("spark.yarn.archive", spark_yarn_archive)
return " --master yarn " + _yarn_opt(jars) + 'pyspark-shell', conf
return " --master yarn --deploy-mode client" + _yarn_opt(jars) + ' pyspark-shell ', conf

pack_env = False
assert penv_archive or conda_name, \
Expand All @@ -169,6 +176,9 @@ def _submit_opt():
pack_env = True

submit_args, conf = _submit_opt()
if spark_conf:
for item in spark_conf.items():
conf[str(item[0])] = str(item[1])
sc = self._create_sc(submit_args, conf)
finally:
if conda_name and penv_archive and pack_env:
Expand Down

0 comments on commit 545aa52

Please sign in to comment.