Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Ray on spark #1449

Merged
merged 4 commits into from
Jun 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/docs/APIGuide/Ray/rayonspark.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ AnalyticsZoo has already provided a mechanism to deploy Python dependencies and
across yarn cluster,meaning python user would be able to run `Analytics-Zoo` or `Ray`
in a pythonic way on yarn without `spark-submit` or installing Analytics-Zoo or Ray across all cluster nodes.


## Here are the steps to run RayOnSpark:

1) You should install Conda first and create a conda-env named "ray36"
1) You should install [Conda](https://docs.conda.io/projects/conda/en/latest/commands/install.html) and create a conda-env named "ray36"

2) Install some essential dependencies inthe conda env

Expand Down Expand Up @@ -91,3 +92,5 @@ print([ray.get(actor.ip.remote()) for actor in actors])
ray_ctx.stop()

```


43 changes: 38 additions & 5 deletions docs/docs/PythonUserGuide/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ For Python users, Analytics Zoo can be installed either [from pip](#install-from
**NOTE**: Only __Python 2.7__, __Python 3.5__ and __Python 3.6__ are supported for now.

---
## **Install from pip**
## **Install from pip for local usage**
You can use the following command to install the latest release version of __analytics-zoo__ via pip easily:

```bash
Expand All @@ -24,13 +24,46 @@ sc = init_nncontext()
```

**Remarks:**

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleting this empty line will break the layout on website. Recovered it in #1512

1. We've tested this package with pip 9.0.1. `pip install --upgrade pip` if necessary.
2. Pip install supports __Mac__ and __Linux__ platforms.
3. Pip install only supports __local__ mode. Cluster mode might be supported in the future. For those who want to use Analytics Zoo in cluster mode, please try to [install without pip](#install-without-pip).
4. You need to install Java __>= JDK8__ before running Analytics Zoo, which is required by `pyspark`.
5. `pyspark==2.4.3`, `bigdl==0.8.0` and their dependencies will automatically be installed if they haven't been detected in the current Python environment.
3. You need to install Java __>= JDK8__ before running Analytics Zoo, which is required by `pyspark`.
4. `pyspark==2.4.3`, `bigdl==0.8.0` and their dependencies will automatically be installed if they haven't been detected in the current Python environment.

## **Install from pip for yarn cluster**

You only need to following these steps on your driver node and we only support yarn-client mode for now.

1) Install [Conda](https://docs.conda.io/projects/conda/en/latest/commands/install.html) and create a conda-env(i.e in the name of "zoo")

2) Install Analytics-Zoo into the created conda-env

```
source activate zoo
pip install analytics-zoo

```
3) Download JDK8 and set the environment variable: JAVA_HOME (recommended).
- You can also install JDK via conda without setting the JAVA_HOME manually:
`conda install -c anaconda openjdk=8.0.152`

4) Start python and then execute the following code for verification.

- Create a SparkContext on Yarn

``` python

from zoo import init_spark_on_yarn

sc = init_spark_on_yarn(
hadoop_conf="path to the yarn configuration folder",
conda_name="zoo", # The name of the created conda-env
num_executor=2,
executor_cores=4,
executor_memory="8g",
driver_memory="2g",
driver_cores=4,
extra_executor_memory_for_ray="10g")
```

---
## **Install without pip**
Expand Down
23 changes: 23 additions & 0 deletions docs/docs/PythonUserGuide/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ You need to first [install](install.md) analytics-zoo, either [from pip](install
**NOTE**: Only __Python 2.7__, __Python 3.5__ and __Python 3.6__ are supported for now.

---

## **Run after pip install**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rum on local node after pip install


**Important:**
Expand Down Expand Up @@ -48,6 +49,28 @@ export BIGDL_JARS=...
export BIGDL_PACKAGES=...
```

## **Run on yarn after pip install

Start python and then execute the following code:
Caveat: You should use `init_spark_on_yarn` rather than `init_nncontext()` here.
- Create a SparkContext on Yarn

``` python

from zoo import init_spark_on_yarn

sc = init_spark_on_yarn(
hadoop_conf="path to the yarn configuration folder",
conda_name="zoo", # The name of the created conda-env
num_executor=2,
executor_cores=4,
executor_memory="8g",
driver_memory="2g",
driver_cores=4,
extra_executor_memory_for_ray="10g")

```

---
## **Run without pip install**
- Note that __Python 3.6__ is only compatible with Spark 1.6.4, 2.0.3, 2.1.1 and >=2.2.0. See [this issue](https://issues.apache.org/jira/browse/SPARK-19019) for more discussion.
Expand Down
2 changes: 2 additions & 0 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pages:
- Relation: APIGuide/FeatureEngineering/relation.md
- Image: APIGuide/FeatureEngineering/image.md
- Text: APIGuide/FeatureEngineering/text.md
- Ray:
- RayOnSpark: APIGuide/Ray/rayonspark.md
- Models:
- Object Detection: APIGuide/Models/object-detection.md
- Image Classification: APIGuide/Models/image-classification.md
Expand Down
15 changes: 11 additions & 4 deletions pyzoo/dev/release/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,23 @@ echo $ANALYTICS_ZOO_HOME
ANALYTICS_ZOO_PYTHON_DIR="$(cd ${RUN_SCRIPT_DIR}/../../../pyzoo; pwd)"
echo $ANALYTICS_ZOO_PYTHON_DIR

if (( $# < 1)); then
echo "Bad parameters. Usage example: bash release.sh linux"
if (( $# < 2)); then
echo "Bad parameters"
echo "Usage example: bash release.sh linux default"
echo "Usage example: bash release.sh linux 0.6.0.dev0"
echo "If needed, you can also add other profiles such as: -Dspark.version=2.4.3 -Dbigdl.artifactId=bigdl-SPARK_2.4 -P spark_2.x"
exit -1
fi

platform=$1
profiles=${*:2}
version=`cat $ANALYTICS_ZOO_PYTHON_DIR/zoo/__init__.py | grep "__version__" | awk '{print $NF}' | tr -d '"'`
version=$2
profiles=${*:3}

if [ "${version}" == "default" ]; then
version=`cat $ANALYTICS_ZOO_PYTHON_DIR/zoo/__init__.py | grep "__version__" | awk '{print $NF}' | tr -d '"'`
fi

echo "Using version: ${version}"

cd ${ANALYTICS_ZOO_HOME}
if [ "$platform" == "mac" ]; then
Expand Down
2 changes: 1 addition & 1 deletion pyzoo/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def setup_package():
license='Apache License, Version 2.0',
url='https://github.com/intel-analytics/analytics-zoo',
packages=packages,
install_requires=['pyspark==2.4.3', 'bigdl==0.8.0'],
install_requires=['pyspark==2.4.3', 'bigdl==0.8.0', 'conda-pack==0.3.1'],
dependency_links=['https://d3kbcqa49mib13.cloudfront.net/spark-2.0.0-bin-hadoop2.7.tgz'],
include_package_data=True,
package_data={"zoo.share": ['lib/analytics-zoo*with-dependencies.jar', 'conf/*', 'bin/*',
Expand Down
4 changes: 3 additions & 1 deletion pyzoo/test/zoo/ray/integration/ray_on_yarn.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
executor_memory="10g",
driver_memory="2g",
driver_cores=4,
extra_executor_memory_for_ray="30g")
extra_executor_memory_for_ray="30g",
spark_conf={"hello": "world"})

ray_ctx = RayContext(sc=sc,
object_store_memory="25g",
extra_params={"temp-dir": "/tmp/hello/"},
env={"http_proxy": "http://child-prc.intel.com:913",
"http_proxys": "http://child-prc.intel.com:913"})
ray_ctx.init()
Expand Down
2 changes: 0 additions & 2 deletions pyzoo/test/zoo/ray/test_ray_on_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ def test_local(self):
for process_info in ray_ctx.ray_processesMonitor.process_infos:
for pid in process_info.pids:
assert not psutil.pid_exists(pid)
sc.stop()


if __name__ == "__main__":
pytest.main([__file__])
12 changes: 8 additions & 4 deletions pyzoo/zoo/common/nncontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def init_spark_on_yarn(hadoop_conf,
hadoop_user_name="root",
spark_yarn_archive=None,
spark_log_level="WARN",
redirect_spark_log=True):
redirect_spark_log=True,
spark_conf=None):
"""
Create a SparkContext with Zoo configuration on Yarn cluster on "Yarn-client" mode.
You should create a conda env and install the python dependencies in that env.
Expand All @@ -72,8 +73,10 @@ def init_spark_on_yarn(hadoop_conf,
`conda_name`, but you can also pass the path to a packed file in "tar.gz" format here.
:param hadoop_user_name: User name for running in yarn cluster. Default value is: root
:param spark_log_level: Log level of Spark
:param redirect_spark_log:
:return: Direct the Spark log to local file or not.
:param redirect_spark_log: Direct the Spark log to local file or not.
:param spark_conf: You can append extra spark conf here in key value format.
i.e spark_conf={"spark.executor.extraJavaOptions": "-XX:+PrintGCDetails"}
:return: SparkContext
"""
from zoo.ray.util.spark import SparkRunner
sparkrunner = SparkRunner(spark_log_level=spark_log_level,
Expand All @@ -91,7 +94,8 @@ def init_spark_on_yarn(hadoop_conf,
penv_archive=penv_archive,
hadoop_user_name=hadoop_user_name,
spark_yarn_archive=spark_yarn_archive,
jars=None)
jars=None,
spark_conf=spark_conf)
return sc


Expand Down
44 changes: 28 additions & 16 deletions pyzoo/zoo/ray/util/raycontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def _prepare_env(self, cores=None):
return modified_env

def __init__(self, python_loc, redis_port, ray_node_cpu_cores, mkl_cores,
password, object_store_memory, waitting_time_sec=6, verbose=False, env=None):
password, object_store_memory, waitting_time_sec=6, verbose=False, env=None,
extra_params=None):
"""object_store_memory: integer in bytes"""
self.env = env
self.python_loc = python_loc
Expand All @@ -103,6 +104,7 @@ def __init__(self, python_loc, redis_port, ray_node_cpu_cores, mkl_cores,
self.ray_exec = self._get_ray_exec()
self.object_store_memory = object_store_memory
self.waiting_time_sec = waitting_time_sec
self.extra_params = extra_params
self.verbose = verbose
self.labels = """--resources='{"trainer": %s, "ps": %s }' """ % (1, 1)

Expand All @@ -115,22 +117,25 @@ def _stop(iter):

return _stop

def _gen_master_command(self):
command = "{} start --head " \
"--include-webui --redis-port {} \
--redis-password {} --num-cpus {} ". \
format(self.ray_exec, self.redis_port, self.password, self.ray_node_cpu_cores)
def _enrich_command(self, command):
if self.object_store_memory:
command = command + "--object-store-memory {} ".format(str(self.object_store_memory))
if self.extra_params:
for pair in self.extra_params.items():
command = command + " --{} {} ".format(pair[0], pair[1])
return command

def _gen_master_command(self):
command = "{} start --head " \
"--include-webui --redis-port {} " \
"--redis-password {} --num-cpus {} ". \
format(self.ray_exec, self.redis_port, self.password, self.ray_node_cpu_cores)
return self._enrich_command(command)

def _get_raylet_command(self, redis_address):
command = "{} start --redis-address {} --redis-password {} --num-cpus {} {} ".format(
self.ray_exec, redis_address, self.password, self.ray_node_cpu_cores, self.labels)

if self.object_store_memory:
command = command + "--object-store-memory {} ".format(str(self.object_store_memory))
return command
return self._enrich_command(command)

def _start_ray_node(self, command, tag, wait_before=5, wait_after=5):
modified_env = self._prepare_env(self.mkl_cores)
Expand Down Expand Up @@ -180,18 +185,24 @@ def _start_ray_services(iter):

class RayContext(object):
def __init__(self, sc, redis_port=None, password="123456", object_store_memory=None,
verbose=False, env=None, local_ray_node_num=2, waitting_time_sec=8):
verbose=False, env=None, local_ray_node_num=2, waiting_time_sec=8,
extra_params=None):
"""
The RayContext would init a ray cluster on top of the configuration of the SparkContext.
The RayContext would init a ray cluster on top of the configuration of SparkContext.
For spark cluster mode: The number of raylets is equal to number of executors.
For Spark local mode: The number of raylets is controlled by local_ray_node_num.
CPU cores for each raylet equals to spark_cores/local_ray_node_num.
CPU cores for each is raylet equals to spark_cores/local_ray_node_num.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is?

:param sc:
:param redis_port: redis port for the "head" node.
The value would be randomly picked if not specified
:param local_ray_node_num number of raylets to be created.
The value would be randomly picked if not specified.
:param password: [optional] password for the redis.
:param object_store_memory: Memory size for the object_store.
:param verbose: True for more logs.
:param env: The environment variable dict for running Ray.
:param local_ray_node_num number of raylets to be created.
:param waiting_time_sec: Waiting time for the raylets before connecting to redis.
:param extra_params: key value dictionary for extra options to launch Ray.
i.e extra_params={"temp-dir": "/tmp/ray2/"}
"""
self.sc = sc
self.stopped = False
Expand All @@ -212,7 +223,8 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N
object_store_memory=self._enrich_object_sotre_memory(sc, object_store_memory),
verbose=verbose,
env=env,
waitting_time_sec=waitting_time_sec)
waitting_time_sec=waiting_time_sec,
extra_params=extra_params)
self._gather_cluster_ips()
from bigdl.util.common import init_executor_gateway
print("Start to launch the JVM guarding process")
Expand Down
32 changes: 21 additions & 11 deletions pyzoo/zoo/ray/util/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ def _detect_python_location(self):
"Cannot detect current python location. Please set it manually by python_location")
return process_info.out

def _gather_essential_jars(self):
from bigdl.util.engine import get_bigdl_classpath
from zoo.util.engine import get_analytics_zoo_classpath
bigdl_classpath = get_bigdl_classpath()
zoo_classpath = get_analytics_zoo_classpath()
assert bigdl_classpath, "Cannot find bigdl classpath"
assert zoo_classpath, "Cannot find Analytics-Zoo classpath"
if bigdl_classpath == zoo_classpath:
return [zoo_classpath]
else:
return [zoo_classpath, bigdl_classpath]

def init_spark_on_local(self, cores, conf=None, python_location=None):
print("Start to getOrCreate SparkContext")
os.environ['PYSPARK_PYTHON'] =\
Expand All @@ -124,29 +136,24 @@ def init_spark_on_yarn(self,
penv_archive=None,
hadoop_user_name="root",
spark_yarn_archive=None,
spark_conf=None,
jars=None):
os.environ["HADOOP_CONF_DIR"] = hadoop_conf
os.environ['HADOOP_USER_NAME'] = hadoop_user_name
os.environ['PYSPARK_PYTHON'] = "python_env/bin/python"

def _yarn_opt(jars):
from zoo.util.engine import get_analytics_zoo_classpath
command = " --archives {}#python_env --num-executors {} " \
" --executor-cores {} --executor-memory {}".\
format(penv_archive, num_executor, executor_cores, executor_memory)
path_to_zoo_jar = get_analytics_zoo_classpath()
jars_list = self._gather_essential_jars()
if jars:
jars_list.append(jars)

if extra_python_lib:
command = command + " --py-files {} ".format(extra_python_lib)

if jars:
command = command + " --jars {},{} ".format(jars, path_to_zoo_jar)
elif path_to_zoo_jar:
command = command + " --jars {} ".format(path_to_zoo_jar)

if path_to_zoo_jar:
command = command + " --conf spark.driver.extraClassPath={} ".\
format(get_analytics_zoo_classpath())
command = command + " --jars {}".format(",".join(jars_list))
return command

def _submit_opt():
Expand All @@ -158,7 +165,7 @@ def _submit_opt():
conf["spark.executor.memoryOverhead"] = extra_executor_memory_for_ray
if spark_yarn_archive:
conf.insert("spark.yarn.archive", spark_yarn_archive)
return " --master yarn " + _yarn_opt(jars) + 'pyspark-shell', conf
return " --master yarn --deploy-mode client" + _yarn_opt(jars) + ' pyspark-shell ', conf

pack_env = False
assert penv_archive or conda_name, \
Expand All @@ -169,6 +176,9 @@ def _submit_opt():
pack_env = True

submit_args, conf = _submit_opt()
if spark_conf:
for item in spark_conf.items():
conf[str(item[0])] = str(item[1])
sc = self._create_sc(submit_args, conf)
finally:
if conda_name and penv_archive and pack_env:
Expand Down