diff --git a/.github/workflows/PR_validation.yml b/.github/workflows/PR_validation.yml index 3dd2ace0ac8..d20dfabc30d 100644 --- a/.github/workflows/PR_validation.yml +++ b/.github/workflows/PR_validation.yml @@ -62,7 +62,6 @@ jobs: - name: Run test uses: ./.github/actions/dllib-scala-ut-action - Dllib-Scala-UT: needs: changes if: ${{ needs.changes.outputs.dllib == 'true' }} @@ -77,7 +76,6 @@ jobs: - name: Run test uses: ./.github/actions/dllib-scala-ut-action - Orca-Ray-Ctx-Example: needs: changes if: ${{ needs.changes.outputs.orca == 'true' }} diff --git a/python/orca/src/bigdl/orca/common.py b/python/orca/src/bigdl/orca/common.py index 435a99df04b..b98362bd788 100644 --- a/python/orca/src/bigdl/orca/common.py +++ b/python/orca/src/bigdl/orca/common.py @@ -341,7 +341,7 @@ def init_orca_context(cluster_mode=None, runtime="spark", cores=None, memory="2g "spark-submit or bigdl-submit, " "but got: %s".format(cluster_mode)) ray_args = {} - for key in ["redis_port", "password", "object_store_memory", "verbose", "env", + for key in ["redis_port", "redis_password", "object_store_memory", "verbose", "env", "extra_params", "num_ray_nodes", "ray_node_cpu_cores", "include_webui", "system_config"]: if key in kwargs: diff --git a/python/orca/src/bigdl/orca/data/ray_xshards.py b/python/orca/src/bigdl/orca/data/ray_xshards.py index 6f45e4b2058..2a7b6552bfd 100644 --- a/python/orca/src/bigdl/orca/data/ray_xshards.py +++ b/python/orca/src/bigdl/orca/data/ray_xshards.py @@ -85,9 +85,10 @@ def init_ray_if_not(redis_address, redis_password): if not ray.is_initialized(): init_params = dict( address=redis_address, - _redis_password=redis_password, ignore_reinit_error=True ) + if redis_password: + init_params["_redis_password"] = self.redis_password if version.parse(ray.__version__) >= version.parse("1.4.0"): init_params["namespace"] = "az" ray.init(**init_params) diff --git a/python/orca/src/bigdl/orca/ray/ray_on_spark_context.py b/python/orca/src/bigdl/orca/ray/ray_on_spark_context.py index 4e9200a5a77..a37e5b49468 100644 --- a/python/orca/src/bigdl/orca/ray/ray_on_spark_context.py +++ b/python/orca/src/bigdl/orca/ray/ray_on_spark_context.py @@ -107,14 +107,14 @@ def _prepare_env(self): print("The $PATH is: {}".format(modified_env["PATH"])) return modified_env - def __init__(self, python_loc, redis_port, ray_node_cpu_cores, - password, object_store_memory, verbose=False, env=None, + def __init__(self, python_loc, redis_port, redis_password, ray_node_cpu_cores, + object_store_memory, verbose=False, env=None, include_webui=False, extra_params=None, system_config=None): """object_store_memory: integer in bytes""" self.env = env self.python_loc = python_loc self.redis_port = redis_port - self.password = password + self.redis_password = redis_password self.ray_node_cpu_cores = ray_node_cpu_cores self.ray_exec = self._get_ray_exec() self.object_store_memory = object_store_memory @@ -150,17 +150,19 @@ def _enrich_command(command, object_store_memory, extra_params): if object_store_memory: command = command + " --object-store-memory {}".format(str(object_store_memory)) if extra_params: - for pair in extra_params.items(): - command = command + " --{} {}".format(pair[0], pair[1]) + for k, v in extra_params.items(): + kw = k.replace("_", "-") + command = command + " --{} {}".format(kw, v) return command def _gen_master_command(self): webui = "true" if self.include_webui else "false" command = "{} start --head " \ "--include-dashboard {} --dashboard-host 0.0.0.0 --port {} " \ - "--redis-password {} --num-cpus {}". \ - format(self.ray_exec, webui, self.redis_port, self.password, - self.ray_node_cpu_cores) + "--num-cpus {}". \ + format(self.ray_exec, webui, self.redis_port, self.ray_node_cpu_cores) + if self.redis_password: + command = command + " --redis-password {}".format(self.redis_password) if self.labels: command = command + " " + self.labels if self.system_config: @@ -173,13 +175,15 @@ def _gen_master_command(self): @staticmethod def _get_raylet_command(redis_address, ray_exec, - password, + redis_password, ray_node_cpu_cores, labels="", object_store_memory=None, extra_params=None): - command = "{} start --address {} --redis-password {} --num-cpus {}".format( - ray_exec, redis_address, password, ray_node_cpu_cores) + command = "{} start --address {} --num-cpus {}".format( + ray_exec, redis_address, ray_node_cpu_cores) + if redis_password: + command = command + " --redis-password {}".format(redis_password) if labels: command = command + " " + labels return RayServiceFuncGenerator._enrich_command(command=command, @@ -262,7 +266,7 @@ def _start_raylets(iter): command=RayServiceFuncGenerator._get_raylet_command( redis_address=redis_address, ray_exec=self.ray_exec, - password=self.password, + redis_password=self.redis_password, ray_node_cpu_cores=self.ray_node_cpu_cores, labels=self.labels, object_store_memory=self.object_store_memory, @@ -312,7 +316,7 @@ def _start_ray_services(iter): command=RayServiceFuncGenerator._get_raylet_command( redis_address=redis_address, ray_exec=self.ray_exec, - password=self.password, + redis_password=self.redis_password, ray_node_cpu_cores=self.ray_node_cpu_cores, labels=self.labels, object_store_memory=self.object_store_memory, @@ -329,7 +333,7 @@ def _start_ray_services(iter): class RayOnSparkContext(object): _active_ray_context = None - def __init__(self, sc, redis_port=None, password="123456", object_store_memory=None, + def __init__(self, sc, redis_port=None, redis_password=None, object_store_memory=None, verbose=False, env=None, extra_params=None, include_webui=True, num_ray_nodes=None, ray_node_cpu_cores=None, system_config=None): """ @@ -346,7 +350,7 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N :param sc: An instance of SparkContext. :param redis_port: The redis port for the ray head node. Default is None. The value would be randomly picked if not specified. - :param password: The password for redis. Default to be "123456" if not specified. + :param redis_password: The password for redis. Default to be None if not specified. :param object_store_memory: The memory size for ray object_store in string. This can be specified in bytes(b), kilobytes(k), megabytes(m) or gigabytes(g). For example, "50b", "100k", "250m", "30g". @@ -379,7 +383,7 @@ def __init__(self, sc, redis_port=None, password="123456", object_store_memory=N self.initialized = False self.is_local = is_local(sc) self.verbose = verbose - self.redis_password = password + self.redis_password = redis_password self.object_store_memory = resource_to_bytes(object_store_memory) self.ray_processesMonitor = None self.env = env @@ -462,8 +466,8 @@ def setup(self): self.ray_service = RayServiceFuncGenerator( python_loc=self.python_loc, redis_port=self.redis_port, + redis_password=self.redis_password, ray_node_cpu_cores=self.ray_node_cpu_cores, - password=self.redis_password, object_store_memory=self.object_store_memory, verbose=self.verbose, env=self.env, @@ -529,6 +533,14 @@ def _get_spark_local_cores(self): else: return int(local_symbol) + def _update_extra_params(self, extra_params): + kwargs = {} + if extra_params is not None: + for k, v in extra_params.items(): + kw = k.replace("-", "_") + kwargs[kw] = v + return kwargs + def init(self, driver_cores=0): """ Initiate the ray cluster. @@ -546,19 +558,16 @@ def init(self, driver_cores=0): if self.env: os.environ.update(self.env) import ray - kwargs = {} - if self.extra_params is not None: - for k, v in self.extra_params.items(): - kw = k.replace("-", "_") - kwargs[kw] = v + kwargs = self._update_extra_params(self.extra_params) init_params = dict( num_cpus=self.ray_node_cpu_cores, - _redis_password=self.redis_password, object_store_memory=self.object_store_memory, include_dashboard=self.include_webui, dashboard_host="0.0.0.0", _system_config=self.system_config ) + if self.redis_password: + init_params["_redis_password"] = self.redis_password init_params.update(kwargs) if version.parse(ray.__version__) >= version.parse("1.4.0"): init_params["namespace"] = "az" @@ -624,7 +633,7 @@ def _start_restricted_worker(self, num_cores, node_ip_address, redis_address): command = RayServiceFuncGenerator._get_raylet_command( redis_address=redis_address, ray_exec="ray", - password=self.redis_password, + redis_password=self.redis_password, ray_node_cpu_cores=num_cores, object_store_memory=self.object_store_memory, extra_params=extra_param) @@ -637,7 +646,7 @@ def _start_restricted_worker(self, num_cores, node_ip_address, redis_address): pgid_to_kill=process_info.pgid) def _start_driver(self, num_cores, redis_address): - print("Start to launch ray driver on local") + print("Start to launch ray driver") import ray._private.services node_ip = ray._private.services.get_node_ip_address(redis_address) self._start_restricted_worker(num_cores=num_cores, @@ -646,9 +655,10 @@ def _start_driver(self, num_cores, redis_address): ray.shutdown() init_params = dict( address=redis_address, - _redis_password=self.ray_service.password, _node_ip_address=node_ip ) + if self.redis_password: + init_params["_redis_password"] = self.redis_password if version.parse(ray.__version__) >= version.parse("1.4.0"): init_params["namespace"] = "az" return ray.init(**init_params)