From 4d6c379dc6428d7edae5ab9722bfee4dc90899cd Mon Sep 17 00:00:00 2001 From: Shaojun Liu <61072813+liu-shaojun@users.noreply.github.com> Date: Tue, 20 Dec 2022 15:03:30 +0800 Subject: [PATCH] orca: remove hardcode redis_password from branch-2.0 (#7019) --- python/orca/src/bigdl/orca/common.py | 2 +- .../orca/src/bigdl/orca/data/ray_xshards.py | 3 +- .../bigdl/orca/ray/ray_on_spark_context.py | 57 +++++++++++-------- 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/python/orca/src/bigdl/orca/common.py b/python/orca/src/bigdl/orca/common.py index 30deab1a09b..0c71bf98e2a 100644 --- a/python/orca/src/bigdl/orca/common.py +++ b/python/orca/src/bigdl/orca/common.py @@ -311,7 +311,7 @@ def init_orca_context(cluster_mode=None, runtime="spark", cores=2, memory="2g", "k8s-client or standalone, " "but got: %s".format(cluster_mode)) ray_args = {} - for key in ["redis_port", "password", "object_store_memory", "verbose", "env", + for key in ["redis_port", "redis_password", "object_store_memory", "verbose", "env", "extra_params", "num_ray_nodes", "ray_node_cpu_cores", "include_webui", "system_config"]: if key in kwargs: diff --git a/python/orca/src/bigdl/orca/data/ray_xshards.py b/python/orca/src/bigdl/orca/data/ray_xshards.py index 98f91b7f3f9..13bfc14419d 100644 --- a/python/orca/src/bigdl/orca/data/ray_xshards.py +++ b/python/orca/src/bigdl/orca/data/ray_xshards.py @@ -69,9 +69,10 @@ def init_ray_if_not(redis_address, redis_password): if not ray.is_initialized(): init_params = dict( address=redis_address, - _redis_password=redis_password, ignore_reinit_error=True ) + if redis_password: + init_params["_redis_password"] = redis_password if version.parse(ray.__version__) >= version.parse("1.4.0"): init_params["namespace"] = "az" ray.init(**init_params) diff --git a/python/orca/src/bigdl/orca/ray/ray_on_spark_context.py b/python/orca/src/bigdl/orca/ray/ray_on_spark_context.py index 93c82bfffa7..acdf8341159 100644 --- a/python/orca/src/bigdl/orca/ray/ray_on_spark_context.py +++ b/python/orca/src/bigdl/orca/ray/ray_on_spark_context.py @@ -107,13 +107,13 @@ def _prepare_env(self): return modified_env def __init__(self, python_loc, redis_port, ray_node_cpu_cores, - password, object_store_memory, verbose=False, env=None, + redis_password, object_store_memory, verbose=False, env=None, include_webui=False, extra_params=None, system_config=None): """object_store_memory: integer in bytes""" self.env = env self.python_loc = python_loc self.redis_port = redis_port - self.password = password + self.redis_password = redis_password self.ray_node_cpu_cores = ray_node_cpu_cores self.ray_exec = self._get_ray_exec() self.object_store_memory = object_store_memory @@ -149,17 +149,20 @@ def _enrich_command(command, object_store_memory, extra_params): if object_store_memory: command = command + " --object-store-memory {}".format(str(object_store_memory)) if extra_params: - for pair in extra_params.items(): - command = command + " --{} {}".format(pair[0], pair[1]) + for k, v in extra_params.items(): + kw = k.replace("_", "-") + command = command + " --{} {}".format(kw, v) return command def _gen_master_command(self): webui = "true" if self.include_webui else "false" command = "{} start --head " \ "--include-dashboard {} --dashboard-host 0.0.0.0 --port {} " \ - "--redis-password {} --num-cpus {}". \ - format(self.ray_exec, webui, self.redis_port, self.password, + "--num-cpus {}". \ + format(self.ray_exec, webui, self.redis_port, self.ray_node_cpu_cores) + if self.redis_password: + command = command + " --redis-password {}".format(self.redis_password) if self.labels: command = command + " " + self.labels if self.system_config: @@ -172,13 +175,15 @@ def _gen_master_command(self): @staticmethod def _get_raylet_command(redis_address, ray_exec, - password, + redis_password, ray_node_cpu_cores, labels="", object_store_memory=None, extra_params=None): - command = "{} start --address {} --redis-password {} --num-cpus {}".format( - ray_exec, redis_address, password, ray_node_cpu_cores) + command = "{} start --address {} --num-cpus {}".format( + ray_exec, redis_address, ray_node_cpu_cores) + if redis_password: + command = command + " --redis-password {}".format(redis_password) if labels: command = command + " " + labels return RayServiceFuncGenerator._enrich_command(command=command, @@ -261,7 +266,7 @@ def _start_raylets(iter): command=RayServiceFuncGenerator._get_raylet_command( redis_address=redis_address, ray_exec=self.ray_exec, - password=self.password, + redis_password=self.redis_password, ray_node_cpu_cores=self.ray_node_cpu_cores, labels=self.labels, object_store_memory=self.object_store_memory, @@ -311,7 +316,7 @@ def _start_ray_services(iter): command=RayServiceFuncGenerator._get_raylet_command( redis_address=redis_address, ray_exec=self.ray_exec, - password=self.password, + redis_password=self.redis_password, ray_node_cpu_cores=self.ray_node_cpu_cores, labels=self.labels, object_store_memory=self.object_store_memory, @@ -328,7 +333,7 @@ def _start_ray_services(iter): class RayOnSparkContext(object): _active_ray_context = None - def __init__(self, sc, redis_port=None, password="123456", object_store_memory=None, + def __init__(self, sc, redis_port=None, redis_password=None, object_store_memory=None, verbose=False, env=None, extra_params=None, include_webui=True, num_ray_nodes=None, ray_node_cpu_cores=None, system_config=None): """ @@ -345,7 +350,7 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N :param sc: An instance of SparkContext. :param redis_port: The redis port for the ray head node. Default is None. The value would be randomly picked if not specified. - :param password: The password for redis. Default to be "123456" if not specified. + :param redis_password: The password for redis. Default to be None if not specified. :param object_store_memory: The memory size for ray object_store in string. This can be specified in bytes(b), kilobytes(k), megabytes(m) or gigabytes(g). For example, "50b", "100k", "250m", "30g". @@ -377,7 +382,7 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N self.initialized = False self.is_local = is_local(sc) self.verbose = verbose - self.redis_password = password + self.redis_password = redis_password self.object_store_memory = resource_to_bytes(object_store_memory) self.ray_processesMonitor = None self.env = env @@ -454,7 +459,7 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N python_loc=self.python_loc, redis_port=self.redis_port, ray_node_cpu_cores=self.ray_node_cpu_cores, - password=self.redis_password, + redis_password=self.redis_password, object_store_memory=self.object_store_memory, verbose=self.verbose, env=self.env, @@ -520,6 +525,14 @@ def _get_spark_local_cores(self): else: return int(local_symbol) + def _update_extra_params(self, extra_params): + kwargs = {} + if extra_params is not None: + for k, v in extra_params.items(): + kw = k.replace("-", "_") + kwargs[kw] = v + return kwargs + def init(self, driver_cores=0): """ Initiate the ray cluster. @@ -536,19 +549,16 @@ def init(self, driver_cores=0): if self.env: os.environ.update(self.env) import ray - kwargs = {} - if self.extra_params is not None: - for k, v in self.extra_params.items(): - kw = k.replace("-", "_") - kwargs[kw] = v + kwargs = self._update_extra_params(self.extra_params) init_params = dict( num_cpus=self.ray_node_cpu_cores, - _redis_password=self.redis_password, object_store_memory=self.object_store_memory, include_dashboard=self.include_webui, dashboard_host="0.0.0.0", _system_config=self.system_config ) + if self.redis_password: + init_params["_redis_password"] = self.redis_password init_params.update(kwargs) if version.parse(ray.__version__) >= version.parse("1.4.0"): init_params["namespace"] = "az" @@ -613,7 +623,7 @@ def _start_restricted_worker(self, num_cores, node_ip_address, redis_address): command = RayServiceFuncGenerator._get_raylet_command( redis_address=redis_address, ray_exec="ray", - password=self.redis_password, + redis_password=self.redis_password, ray_node_cpu_cores=num_cores, object_store_memory=self.object_store_memory, extra_params=extra_param) @@ -635,9 +645,10 @@ def _start_driver(self, num_cores, redis_address): ray.shutdown() init_params = dict( address=redis_address, - _redis_password=self.ray_service.password, _node_ip_address=node_ip ) + if self.redis_password: + init_params["_redis_password"] = self.redis_password if version.parse(ray.__version__) >= version.parse("1.4.0"): init_params["namespace"] = "az" return ray.init(**init_params)