Skip to content

Commit

Permalink
Add init_orca_context (intel#2774)
Browse files Browse the repository at this point in the history
* initial imple

* update

* meet review

* review and style

* remove stopped

* add doc

* minor

* move import

* fix mxnet

* remove
  • Loading branch information
hkvision committed Aug 26, 2020
1 parent 2a64634 commit 311414e
Showing 1 changed file with 27 additions and 21 deletions.
48 changes: 27 additions & 21 deletions python/orca/src/bigdl/orca/ray/raycontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def register_pids(pids):
pids)
except Exception as err:
print(traceback.format_exc())
print("Cannot sucessfully register pid into JVMGuard")
print("Cannot successfully register pid into JVMGuard")
for pid in pids:
os.kill(pid, signal.SIGKILL)
raise err
Expand Down Expand Up @@ -247,7 +247,7 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N
"""
assert sc is not None, "sc cannot be None, please create a SparkContext first"
self.sc = sc
self.stopped = False
self.initialized = False
self.is_local = is_local(sc)
self.verbose = verbose
self.redis_password = password
Expand Down Expand Up @@ -327,9 +327,12 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N
RayContext._active_ray_context = self

@classmethod
def get(cls):
def get(cls, initialize=True):
if RayContext._active_ray_context:
return RayContext._active_ray_context
ray_ctx = RayContext._active_ray_context
if initialize and not ray_ctx.initialized:
ray_ctx.init()
return ray_ctx
else:
raise Exception("No active RayContext. Please create a RayContext and init it first")

Expand All @@ -347,8 +350,8 @@ def info_fn(iter):
return ips[0]

def stop(self):
if self.stopped:
print("This instance has been stopped.")
if not self.initialized:
print("The Ray cluster has not been launched.")
return
import ray
ray.shutdown()
Expand All @@ -357,14 +360,14 @@ def stop(self):
print("Please start the runner first before closing it")
else:
self.ray_processesMonitor.clean_fn()
self.stopped = True
self.initialized = False

def purge(self):
"""
Invoke ray stop to clean ray processes.
"""
if self.stopped:
print("This instance has been stopped.")
if not self.initialized:
print("The Ray cluster has not been launched.")
return
if self.is_local:
import ray
Expand All @@ -374,7 +377,7 @@ def purge(self):
self.num_ray_nodes,
numSlices=self.num_ray_nodes).barrier().mapPartitions(
self.ray_service.gen_stop()).collect()
self.stopped = True
self.initialized = False

def _get_spark_local_cores(self):
local_symbol = re.match(r"local\[(.*)\]", self.sc.master).group(1)
Expand All @@ -394,25 +397,28 @@ def init(self, driver_cores=0):
Information contains node_ip_address, redis_address, object_store_address,
raylet_socket_name, webui_url and session_dir.
"""
self.stopped = False
if self.is_local:
if self.env:
os.environ.update(self.env)
import ray
self._address_info = ray.init(num_cpus=self.ray_node_cpu_cores,
object_store_memory=self.object_store_memory,
resources=self.extra_params)
if self.initialized:
print("The Ray cluster has been launched.")
else:
self._start_cluster()
self._address_info = self._start_driver(num_cores=driver_cores)
if self.is_local:
if self.env:
os.environ.update(self.env)
import ray
self._address_info = ray.init(num_cpus=self.ray_node_cpu_cores,
object_store_memory=self.object_store_memory,
resources=self.extra_params)
else:
self._start_cluster()
self._address_info = self._start_driver(num_cores=driver_cores)
self.initialized = True
return self._address_info

@property
def address_info(self):
if self._address_info:
return self._address_info
else:
raise Exception("Ray cluster hasn't been initiated yet. Please call init first")
raise Exception("The Ray cluster has not been launched yet. Please call init first")

def _start_cluster(self):
print("Start to launch ray on cluster")
Expand Down

0 comments on commit 311414e

Please sign in to comment.