diff --git a/python/orca/src/bigdl/orca/ray/raycontext.py b/python/orca/src/bigdl/orca/ray/raycontext.py index ba5e159c8ad..066603eeef2 100755 --- a/python/orca/src/bigdl/orca/ray/raycontext.py +++ b/python/orca/src/bigdl/orca/ray/raycontext.py @@ -165,7 +165,7 @@ def _enrich_command(command, object_store_memory, extra_params): def _gen_master_command(self): webui = "true" if self.include_webui else "false" command = "{} start --head " \ - "--include-webui {} --redis-port {} " \ + "--include-dashboard {} --dashboard-host 0.0.0.0 --port {} " \ "--redis-password {} --num-cpus {}". \ format(self.ray_exec, webui, self.redis_port, self.password, self.ray_node_cpu_cores) @@ -196,7 +196,7 @@ def _start_ray_node(self, command, tag): print("Starting {} by running: {}".format(tag, command)) process_info = session_execute(command=command, env=modified_env, tag=tag) JVMGuard.register_pids(process_info.pids) - import ray.services as rservices + import ray._private.services as rservices process_info.node_ip = rservices.get_node_ip_address() return process_info @@ -263,7 +263,7 @@ class RayContext(object): _active_ray_context = None def __init__(self, sc, redis_port=None, password="123456", object_store_memory=None, - verbose=False, env=None, extra_params=None, include_webui=False, + verbose=False, env=None, extra_params=None, include_webui=True, num_ray_nodes=None, ray_node_cpu_cores=None): """ The RayContext would initiate a ray cluster on top of the configuration of SparkContext. @@ -460,11 +460,17 @@ def init(self, driver_cores=0): if self.env: os.environ.update(self.env) import ray + kwargs = {} + if self.extra_params is not None: + for k, v in self.extra_params.items(): + kw = k.replace("-", "_") + kwargs[kw] = v self._address_info = ray.init(num_cpus=self.ray_node_cpu_cores, - redis_password=self.redis_password, + _redis_password=self.redis_password, object_store_memory=self.object_store_memory, - include_webui=self.include_webui, - resources=self.extra_params) + include_dashboard=self.include_webui, + dashboard_host="0.0.0.0", + *kwargs) else: self.cluster_ips = self._gather_cluster_ips() from bigdl.util.common import init_executor_gateway @@ -521,12 +527,12 @@ def _start_restricted_worker(self, num_cores, node_ip_address, redis_address): def _start_driver(self, num_cores, redis_address): print("Start to launch ray driver on local") - import ray.services - node_ip = ray.services.get_node_ip_address(redis_address) + import ray._private.services + node_ip = ray._private.services.get_node_ip_address(redis_address) self._start_restricted_worker(num_cores=num_cores, node_ip_address=node_ip, redis_address=redis_address) ray.shutdown() return ray.init(address=redis_address, - redis_password=self.ray_service.password, - node_ip_address=node_ip) + _redis_password=self.ray_service.password, + _node_ip_address=node_ip) diff --git a/python/orca/src/bigdl/orca/ray/utils.py b/python/orca/src/bigdl/orca/ray/utils.py index 3941d5c32a1..cdaa06ce3ee 100644 --- a/python/orca/src/bigdl/orca/ray/utils.py +++ b/python/orca/src/bigdl/orca/ray/utils.py @@ -55,7 +55,7 @@ def resource_to_bytes(resource_str): def gen_shutdown_per_node(pgids, node_ips=None): - import ray.services as rservices + import ray._private.services as rservices pgids = to_list(pgids) def _shutdown_per_node(iter): diff --git a/python/orca/test/bigdl/orca/ray/integration/ray_on_yarn.py b/python/orca/test/bigdl/orca/ray/integration/ray_on_yarn.py index 7a09289e148..815588d377c 100644 --- a/python/orca/test/bigdl/orca/ray/integration/ray_on_yarn.py +++ b/python/orca/test/bigdl/orca/ray/integration/ray_on_yarn.py @@ -53,8 +53,7 @@ def check_cv2(self): return cv2.__version__ def ip(self): - import ray.services as rservices - return rservices.get_node_ip_address() + return ray._private.services.get_node_ip_address() def network(self): from urllib.request import urlopen