Skip to content

Commit

Permalink
fix ray and add more test (intel-analytics#1596)
Browse files Browse the repository at this point in the history
* fix ray and add more test

Signed-off-by: Jieru Hong <[email protected]>

* modify raycontext and move test file to func

Signed-off-by: Jieru Hong <[email protected]>

* modify process and add sc.stop in the end

Signed-off-by: Jieru Hong <[email protected]>

* delete one repeat and check PEP8

Signed-off-by: Jieru Hong <[email protected]>

* change file name and remove some useless code

Signed-off-by: Jieru Hong <[email protected]>

* rename test yarn reinit file

Signed-off-by: Jieru Hong <[email protected]>

* ignore test reinit raycontext

Signed-off-by: Jieru Hong <[email protected]>
  • Loading branch information
respecteverything committed Sep 5, 2019
1 parent eee4561 commit e2cc857
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 6 deletions.
2 changes: 1 addition & 1 deletion python/orca/src/bigdl/orca/ray/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def _shutdown_per_node(iter):
effect_pgids = [pair[0] for pair in zip(pgids, node_ips) if pair[1] == current_node_ip]
else:
effect_pgids = pgids
for pgid in pgids:
for pgid in effect_pgids:
print("Stopping by pgid {}".format(pgid))
try:
os.killpg(pgid, signal.SIGTERM)
Expand Down
9 changes: 7 additions & 2 deletions python/orca/src/bigdl/orca/ray/util/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ def session_execute(command, env=None, tag=None, fail_fast=False, timeout=120):


class ProcessMonitor:
def __init__(self, process_infos, sc, ray_rdd, verbose=False):
def __init__(self, process_infos, sc, ray_rdd, raycontext, verbose=False):
self.sc = sc
self.raycontext = raycontext
self.verbose = verbose
self.ray_rdd = ray_rdd
self.master = []
Expand Down Expand Up @@ -123,9 +124,13 @@ def print_ray_remote_err_out(self):
print(slave)

def clean_fn(self):
if self.raycontext.stopped:
return
import ray
ray.shutdown()
if not is_local(self.sc):
if not self.sc:
print("WARNING: SparkContext has been stopped before cleaning the Ray resources")
if self.sc and (not is_local(self.sc)):
self.ray_rdd.map(gen_shutdown_per_node(self.pgids, self.node_ips)).collect()
else:
gen_shutdown_per_node(self.pgids, self.node_ips)([])
Expand Down
4 changes: 2 additions & 2 deletions python/orca/src/bigdl/orca/ray/util/raycontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def init(self, object_store_memory=None,
:param extra_params: key value dictionary for extra options to launch Ray.
i.e extra_params={"temp-dir": "/tmp/ray2/"}
"""

self.stopped = False
self._start_cluster()
if object_store_memory is None:
object_store_memory = self._get_ray_plasma_memory_local()
Expand All @@ -372,7 +372,7 @@ def _start_cluster(self):
process_infos = ray_rdd.barrier().mapPartitions(
self.ray_service.gen_ray_start()).collect()

self.ray_processesMonitor = ProcessMonitor(process_infos, self.sc, ray_rdd,
self.ray_processesMonitor = ProcessMonitor(process_infos, self.sc, ray_rdd, self,
verbose=self.verbose)
self.redis_address = self.ray_processesMonitor.master.master_addr
return self
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from unittest import TestCase

import numpy as np
import psutil
import pytest
import ray
import time

from zoo import init_spark_on_yarn
from zoo.ray.util.raycontext import RayContext

np.random.seed(1337) # for reproducibility


@ray.remote
class TestRay():
def hostname(self):
import socket
return socket.gethostname()


node_num = 4
sc = init_spark_on_yarn(
hadoop_conf="/opt/work/hadoop-2.7.2/etc/hadoop/",
conda_name="rayexample",
num_executor=node_num,
executor_cores=28,
executor_memory="10g",
driver_memory="2g",
driver_cores=4,
extra_executor_memory_for_ray="30g")
ray_ctx = RayContext(sc=sc, object_store_memory="2g")
ray_ctx.init()
actors = [TestRay.remote() for i in range(0, node_num)]
print([ray.get(actor.hostname.remote()) for actor in actors])
ray_ctx.stop()
# repeat
ray_ctx = RayContext(sc=sc, object_store_memory="1g")
ray_ctx.init()
actors = [TestRay.remote() for i in range(0, node_num)]
print([ray.get(actor.hostname.remote()) for actor in actors])
ray_ctx.stop()

sc.stop()
time.sleep(3)
1 change: 1 addition & 0 deletions python/orca/test/bigdl/orca/ray/test_ray_on_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ def test_local(self):
for pid in process_info.pids:
assert not psutil.pid_exists(pid)


if __name__ == "__main__":
pytest.main([__file__])
62 changes: 62 additions & 0 deletions python/orca/test/bigdl/orca/ray/test_reinit_raycontext.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from unittest import TestCase

import numpy as np
import psutil
import pytest
import ray
import time

from zoo import init_spark_on_local
from zoo.ray.util.raycontext import RayContext

np.random.seed(1337) # for reproducibility


@ray.remote
class TestRay():
def hostname(self):
import socket
return socket.gethostname()


class TestUtil(TestCase):

def test_local(self):
node_num = 4
sc = init_spark_on_local(cores=node_num)
ray_ctx = RayContext(sc=sc, object_store_memory="1g")
ray_ctx.init()
actors = [TestRay.remote() for i in range(0, node_num)]
print([ray.get(actor.hostname.remote()) for actor in actors])
ray_ctx.stop()
time.sleep(3)
# repeat
print("-------------------first repeat begin!------------------")
ray_ctx = RayContext(sc=sc, object_store_memory="1g")
ray_ctx.init()
actors = [TestRay.remote() for i in range(0, node_num)]
print([ray.get(actor.hostname.remote()) for actor in actors])
ray_ctx.stop()
sc.stop()
time.sleep(3)
for process_info in ray_ctx.ray_processesMonitor.process_infos:
for pid in process_info.pids:
assert not psutil.pid_exists(pid)

if __name__ == "__main__":
pytest.main([__file__])
3 changes: 2 additions & 1 deletion python/orca/test/dev/run-pytests-ray
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ export PYSPARK_PYTHON=python
export PYSPARK_DRIVER_PYTHON=python

python -m pytest -v ../test/zoo/ray/ \
--ignore=../test/zoo/ray/integration/
--ignore=../test/zoo/ray/integration/ \
--ignore=../test/zoo/ray/test_reinit_raycontext.py
exit_status_2=$?
if [ $exit_status_2 -ne 0 ];
then
Expand Down

0 comments on commit e2cc857

Please sign in to comment.