diff --git a/python/orca/src/bigdl/orca/ray/__init__.py b/python/orca/src/bigdl/orca/ray/__init__.py index 5976dc4df02..709625d93af 100644 --- a/python/orca/src/bigdl/orca/ray/__init__.py +++ b/python/orca/src/bigdl/orca/ray/__init__.py @@ -13,3 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +from .raycontext import RayContext diff --git a/python/orca/src/bigdl/orca/ray/util/process.py b/python/orca/src/bigdl/orca/ray/process.py similarity index 98% rename from python/orca/src/bigdl/orca/ray/util/process.py rename to python/orca/src/bigdl/orca/ray/process.py index 9e1256c62d8..16bef553c7c 100644 --- a/python/orca/src/bigdl/orca/ray/util/process.py +++ b/python/orca/src/bigdl/orca/ray/process.py @@ -21,7 +21,7 @@ import sys import psutil -from zoo.ray.util import gen_shutdown_per_node, is_local +from zoo.ray.utils import gen_shutdown_per_node, is_local class ProcessInfo(object): diff --git a/python/orca/src/bigdl/orca/ray/util/raycontext.py b/python/orca/src/bigdl/orca/ray/raycontext.py similarity index 98% rename from python/orca/src/bigdl/orca/ray/util/raycontext.py rename to python/orca/src/bigdl/orca/ray/raycontext.py index 5a514c32a41..d35a4711447 100755 --- a/python/orca/src/bigdl/orca/ray/util/raycontext.py +++ b/python/orca/src/bigdl/orca/ray/raycontext.py @@ -14,18 +14,16 @@ # limitations under the License. # +import multiprocessing import os +import random import re import signal -import random -import multiprocessing from pyspark import BarrierTaskContext - -from zoo.ray.util import is_local -from zoo.ray.util.process import session_execute, ProcessMonitor -from zoo.ray.util.utils import resource_to_bytes -import ray.services as rservices +from zoo.ray.process import session_execute, ProcessMonitor +from zoo.ray.utils import is_local +from zoo.ray.utils import resource_to_bytes class JVMGuard: @@ -146,6 +144,7 @@ def _start_ray_node(self, command, tag): print("Starting {} by running: {}".format(tag, command)) process_info = session_execute(command=command, env=modified_env, tag=tag) JVMGuard.register_pids(process_info.pids) + import ray.services as rservices process_info.node_ip = rservices.get_node_ip_address() return process_info diff --git a/python/orca/src/bigdl/orca/ray/util/__init__.py b/python/orca/src/bigdl/orca/ray/util/__init__.py deleted file mode 100755 index 0f0a3e748e1..00000000000 --- a/python/orca/src/bigdl/orca/ray/util/__init__.py +++ /dev/null @@ -1,46 +0,0 @@ -# -# Copyright 2018 Analytics Zoo Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os -import signal - -from zoo.ray.util.utils import to_list - - -def gen_shutdown_per_node(pgids, node_ips=None): - import ray.services as rservices - pgids = to_list(pgids) - - def _shutdown_per_node(iter): - print("Stopping pgids: {}".format(pgids)) - if node_ips: - current_node_ip = rservices.get_node_ip_address() - effect_pgids = [pair[0] for pair in zip(pgids, node_ips) if pair[1] == current_node_ip] - else: - effect_pgids = pgids - for pgid in effect_pgids: - print("Stopping by pgid {}".format(pgid)) - try: - os.killpg(pgid, signal.SIGTERM) - except Exception: - print("WARNING: cannot kill pgid: {}".format(pgid)) - - return _shutdown_per_node - - -def is_local(sc): - master = sc._conf.get("spark.master") - return master == "local" or master.startswith("local[") diff --git a/python/orca/src/bigdl/orca/ray/util/utils.py b/python/orca/src/bigdl/orca/ray/utils.py similarity index 67% rename from python/orca/src/bigdl/orca/ray/util/utils.py rename to python/orca/src/bigdl/orca/ray/utils.py index 3432f48e214..3941d5c32a1 100644 --- a/python/orca/src/bigdl/orca/ray/util/utils.py +++ b/python/orca/src/bigdl/orca/ray/utils.py @@ -15,6 +15,8 @@ # import re +import os +import signal def to_list(input): @@ -50,3 +52,29 @@ def resource_to_bytes(resource_str): raise Exception("Size must be specified as bytes(b)," "kilobytes(k), megabytes(m), gigabytes(g). " "E.g. 50b, 100k, 250m, 30g") + + +def gen_shutdown_per_node(pgids, node_ips=None): + import ray.services as rservices + pgids = to_list(pgids) + + def _shutdown_per_node(iter): + print("Stopping pgids: {}".format(pgids)) + if node_ips: + current_node_ip = rservices.get_node_ip_address() + effect_pgids = [pair[0] for pair in zip(pgids, node_ips) if pair[1] == current_node_ip] + else: + effect_pgids = pgids + for pgid in effect_pgids: + print("Stopping by pgid {}".format(pgid)) + try: + os.killpg(pgid, signal.SIGTERM) + except Exception: + print("WARNING: cannot kill pgid: {}".format(pgid)) + + return _shutdown_per_node + + +def is_local(sc): + master = sc.getConf().get("spark.master") + return master == "local" or master.startswith("local[") diff --git a/python/orca/test/bigdl/orca/ray/integration/ray_on_yarn.py b/python/orca/test/bigdl/orca/ray/integration/ray_on_yarn.py index 85193bf70c1..514a4768271 100644 --- a/python/orca/test/bigdl/orca/ray/integration/ray_on_yarn.py +++ b/python/orca/test/bigdl/orca/ray/integration/ray_on_yarn.py @@ -18,7 +18,7 @@ import ray from zoo import init_spark_on_yarn -from zoo.ray.util.raycontext import RayContext +from zoo.ray import RayContext slave_num = 2 diff --git a/python/orca/test/bigdl/orca/ray/integration/test_yarn_reinit_raycontext.py b/python/orca/test/bigdl/orca/ray/integration/test_yarn_reinit_raycontext.py index e66ca2c9d17..1776ad63485 100644 --- a/python/orca/test/bigdl/orca/ray/integration/test_yarn_reinit_raycontext.py +++ b/python/orca/test/bigdl/orca/ray/integration/test_yarn_reinit_raycontext.py @@ -13,22 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from unittest import TestCase + +import time import numpy as np -import psutil -import pytest import ray -import time from zoo import init_spark_on_yarn -from zoo.ray.util.raycontext import RayContext +from zoo.ray import RayContext np.random.seed(1337) # for reproducibility @ray.remote -class TestRay(): +class TestRay: def hostname(self): import socket return socket.gethostname() diff --git a/python/orca/test/bigdl/orca/ray/mxnet/conftest.py b/python/orca/test/bigdl/orca/ray/mxnet/conftest.py index 51f4491c282..6bc7ea82b30 100644 --- a/python/orca/test/bigdl/orca/ray/mxnet/conftest.py +++ b/python/orca/test/bigdl/orca/ray/mxnet/conftest.py @@ -22,7 +22,7 @@ @pytest.fixture(autouse=True, scope='package') def rayonspark_fixture(): from zoo import init_spark_on_local - from zoo.ray.util.raycontext import RayContext + from zoo.ray import RayContext sc = init_spark_on_local(cores=8, spark_log_level="INFO") ray_ctx = RayContext(sc=sc, object_store_memory="1g") ray_ctx.init() diff --git a/python/orca/test/bigdl/orca/ray/test_ray_on_local.py b/python/orca/test/bigdl/orca/ray/test_ray_on_local.py index 3ece5bc245b..34bc6b74e5d 100644 --- a/python/orca/test/bigdl/orca/ray/test_ray_on_local.py +++ b/python/orca/test/bigdl/orca/ray/test_ray_on_local.py @@ -19,7 +19,7 @@ import ray from zoo import init_spark_on_local -from zoo.ray.util.raycontext import RayContext +from zoo.ray import RayContext class TestRayLocal(TestCase): diff --git a/python/orca/test/bigdl/orca/ray/test_reinit_raycontext.py b/python/orca/test/bigdl/orca/ray/test_reinit_raycontext.py index a7aafcfc81c..9dfe7cbe037 100644 --- a/python/orca/test/bigdl/orca/ray/test_reinit_raycontext.py +++ b/python/orca/test/bigdl/orca/ray/test_reinit_raycontext.py @@ -13,16 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import time from unittest import TestCase import numpy as np import psutil import pytest import ray -import time from zoo import init_spark_on_local -from zoo.ray.util.raycontext import RayContext +from zoo.ray import RayContext np.random.seed(1337) # for reproducibility diff --git a/python/orca/test/bigdl/orca/ray/test_util.py b/python/orca/test/bigdl/orca/ray/test_util.py index 82dc810a7a9..cf478d65ad5 100644 --- a/python/orca/test/bigdl/orca/ray/test_util.py +++ b/python/orca/test/bigdl/orca/ray/test_util.py @@ -17,7 +17,7 @@ import pytest -import zoo.ray.util.utils as rutils +import zoo.ray.utils as rutils class TestUtil(TestCase):