Skip to content

Commit

Permalink
Get the current RayContext (intel-analytics#2411)
Browse files Browse the repository at this point in the history
* initial

* update

* minor
  • Loading branch information
hkvision committed Jun 5, 2020
1 parent 48362a2 commit 1c7e63d
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions python/orca/src/bigdl/orca/ray/raycontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ def _start_ray_services(iter):


class RayContext(object):
_active_ray_context = None

def __init__(self, sc, redis_port=None, password="123456", object_store_memory=None,
verbose=False, env=None, extra_params=None):
"""
Expand Down Expand Up @@ -220,6 +222,7 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N
self.ray_processesMonitor = None
self.env = env
self.extra_params = extra_params
self._address_info = None
if self.is_local:
self.num_ray_nodes = 1
self.ray_node_cpu_cores = self._get_spark_local_cores()
Expand All @@ -245,6 +248,14 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N
print("Start to launch the JVM guarding process")
init_executor_gateway(sc)
print("JVM guarding process has been successfully launched")
RayContext._active_ray_context = self

@classmethod
def get(cls):
if RayContext._active_ray_context:
return RayContext._active_ray_context
else:
raise Exception("No active RayContext. Please create a RayContext and init it first")

def _gather_cluster_ips(self):
total_cores = int(self.num_ray_nodes) * int(self.ray_node_cpu_cores)
Expand Down Expand Up @@ -312,13 +323,20 @@ def init(self, driver_cores=0):
if self.env:
os.environ.update(self.env)
import ray
self.address_info = ray.init(num_cpus=self.ray_node_cpu_cores,
object_store_memory=self.object_store_memory,
resources=self.extra_params)
self._address_info = ray.init(num_cpus=self.ray_node_cpu_cores,
object_store_memory=self.object_store_memory,
resources=self.extra_params)
else:
self._start_cluster()
self.address_info = self._start_driver(num_cores=driver_cores)
return self.address_info
self._address_info = self._start_driver(num_cores=driver_cores)
return self._address_info

@property
def address_info(self):
if self._address_info:
return self._address_info
else:
raise Exception("Ray cluster hasn't been initiated yet. Please call init first")

def _start_cluster(self):
print("Start to launch ray on cluster")
Expand Down

0 comments on commit 1c7e63d

Please sign in to comment.