Skip to content

Commit

Permalink
Expose more option for driver in RayContext (intel-analytics#1541)
Browse files Browse the repository at this point in the history
* expose more option for driver

* minor

* more
  • Loading branch information
zhichao-li committed Jul 30, 2019
1 parent 784742c commit b3d24d4
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 25 deletions.
97 changes: 75 additions & 22 deletions python/orca/src/bigdl/orca/ray/util/raycontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,12 @@ def _stop(iter):

return _stop

def _enrich_command(self, command):
if self.object_store_memory:
command = command + "--object-store-memory {} ".format(str(self.object_store_memory))
if self.extra_params:
for pair in self.extra_params.items():
@staticmethod
def _enrich_command(command, object_store_memory, extra_params):
if object_store_memory:
command = command + "--object-store-memory {} ".format(str(object_store_memory))
if extra_params:
for pair in extra_params.items():
command = command + " --{} {} ".format(pair[0], pair[1])
return command

Expand All @@ -130,12 +131,23 @@ def _gen_master_command(self):
"--include-webui --redis-port {} " \
"--redis-password {} --num-cpus {} ". \
format(self.ray_exec, self.redis_port, self.password, self.ray_node_cpu_cores)
return self._enrich_command(command)
return RayServiceFuncGenerator._enrich_command(command=command,
object_store_memory=self.object_store_memory,
extra_params=self.extra_params)

def _get_raylet_command(self, redis_address):
@staticmethod
def _get_raylet_command(redis_address,
ray_exec,
password,
ray_node_cpu_cores,
labels,
object_store_memory,
extra_params):
command = "{} start --redis-address {} --redis-password {} --num-cpus {} {} ".format(
self.ray_exec, redis_address, self.password, self.ray_node_cpu_cores, self.labels)
return self._enrich_command(command)
ray_exec, redis_address, password, ray_node_cpu_cores, labels)
return RayServiceFuncGenerator._enrich_command(command=command,
object_store_memory=object_store_memory,
extra_params=extra_params)

def _start_ray_node(self, command, tag, wait_before=5, wait_after=5):
modified_env = self._prepare_env(self.mkl_cores)
Expand Down Expand Up @@ -174,7 +186,14 @@ def _start_ray_services(iter):
else:
print("partition id is : {}".format(tc.partitionId()))
process_info = self._start_ray_node(
command=self._get_raylet_command(redis_address=redis_address),
command=RayServiceFuncGenerator._get_raylet_command(
redis_address=redis_address,
ray_exec=self.ray_exec,
password=self.password,
ray_node_cpu_cores=self.ray_node_cpu_cores,
labels=self.labels,
object_store_memory=self.object_store_memory,
extra_params=self.extra_params),
tag="raylet",
wait_before=self.waiting_time_sec)
yield process_info
Expand Down Expand Up @@ -213,6 +232,8 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N
self.python_loc = os.environ['PYSPARK_PYTHON']
self.ray_processesMonitor = None
self.verbose = verbose
self.redis_password = password
self.object_store_memory = object_store_memory
self.redis_port = self._new_port() if not redis_port else redis_port
self.ray_service = RayServiceFuncGenerator(
python_loc=self.python_loc,
Expand All @@ -236,8 +257,9 @@ def _new_port(self):

def _enrich_object_sotre_memory(self, sc, object_store_memory):
if is_local(sc):
assert not object_store_memory, "you should not set object_store_memory on spark local"
return resourceToBytes(self._get_ray_plasma_memory_local())
if self.object_store_memory is None:
self.object_store_memory = self._get_ray_plasma_memory_local()
return resourceToBytes(self.object_store_memory)
else:
return resourceToBytes(
str(object_store_memory)) if object_store_memory else None
Expand Down Expand Up @@ -324,9 +346,24 @@ def _get_num_ray_nodes(self):
else:
return int(self.sc._conf.get("spark.executor.instances"))

def init(self):
def init(self, object_store_memory=None,
num_cores=0,
labels="",
extra_params={}):
"""
:param object_store_memory: Memory size of object_store for local driver. e.g 10g
:param num_cores set the cpu cores for local driver which 0 by default.
:param extra_params: key value dictionary for extra options to launch Ray.
i.e extra_params={"temp-dir": "/tmp/ray2/"}
"""

self._start_cluster()
self._start_driver(object_store_memory=self._get_ray_plasma_memory_local())
if object_store_memory is None:
object_store_memory = self._get_ray_plasma_memory_local()
self._start_driver(object_store_memory=object_store_memory,
num_cores=num_cores,
labels=labels,
extra_params=extra_params)

def _start_cluster(self):
print("Start to launch ray on cluster")
Expand All @@ -340,21 +377,37 @@ def _start_cluster(self):
self.redis_address = self.ray_processesMonitor.master.master_addr
return self

def _start_restricted_worker(self, redis_address, redis_password, object_store_memory):
num_cores = 0
command = "ray start --redis-address {} " \
"--redis-password {} --num-cpus {} --object-store-memory {}".\
format(redis_address, redis_password, num_cores, object_store_memory)
def _start_restricted_worker(self,
object_store_memory,
num_cores=0,
labels="",
extra_params={}):
command = RayServiceFuncGenerator._get_raylet_command(
redis_address=self.redis_address,
ray_exec="ray ",
password=self.redis_password,
ray_node_cpu_cores=num_cores,
labels=labels,
object_store_memory=object_store_memory,
extra_params=extra_params)
print("Executing command: {}".format(command))
process_info = session_execute(command=command, fail_fast=True)
ProcessMonitor.register_shutdown_hook(pgid=process_info.pgid)

def _start_driver(self, object_store_memory="10g"):
def _start_driver(self,
object_store_memory,
num_cores=0,
labels="",
extra_params={}):
print("Start to launch ray on local")
import ray
if not self.is_local:
self._start_restricted_worker(self.redis_address, self.ray_service.password,
object_store_memory=resourceToBytes(object_store_memory))
self._start_restricted_worker(
object_store_memory=resourceToBytes(object_store_memory),
num_cores=num_cores,
labels=labels,
extra_params=extra_params
)
ray.shutdown()
ray.init(redis_address=self.redis_address,
redis_password=self.ray_service.password)
7 changes: 5 additions & 2 deletions python/orca/test/bigdl/orca/ray/integration/ray_on_yarn.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

sc = init_spark_on_yarn(
hadoop_conf="/opt/work/almaren-yarn-config/",
conda_name="ray36-dev",
conda_name="ray_train",
num_executor=slave_num,
executor_cores=28,
executor_memory="10g",
Expand All @@ -38,7 +38,10 @@
extra_params={"temp-dir": "/tmp/hello/"},
env={"http_proxy": "http://child-prc.intel.com:913",
"http_proxys": "http://child-prc.intel.com:913"})
ray_ctx.init()
ray_ctx.init(object_store_memory="2g",
num_cores=0,
labels="",
extra_params={})


@ray.remote
Expand Down
2 changes: 1 addition & 1 deletion python/orca/test/bigdl/orca/ray/test_ray_on_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TestUtil(TestCase):
def test_local(self):
node_num = 4
sc = init_spark_on_local(cores=node_num)
ray_ctx = RayContext(sc=sc)
ray_ctx = RayContext(sc=sc, object_store_memory="1g")
ray_ctx.init()
actors = [TestRay.remote() for i in range(0, node_num)]
print([ray.get(actor.hostname.remote()) for actor in actors])
Expand Down

0 comments on commit b3d24d4

Please sign in to comment.