Skip to content

Commit

Permalink
Estimate cluster instances and generate cost savings (#803)
Browse files Browse the repository at this point in the history
* Parse cluster informtion and create CPU cluster object in user tools
* Rename dict fields to camel case
* Refactor code to use mustache templating
* Refactor instance type detection in dataproc
* Update template files with number of driver nodes


---------

Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa authored Feb 27, 2024
1 parent fe1e59c commit 6aa7cd8
Show file tree
Hide file tree
Showing 19 changed files with 369 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,6 +43,7 @@ class DBAWSPlatform(EMRPlatform):

def __post_init__(self):
self.type_id = CspEnv.DATABRICKS_AWS
self.cluster_inference_supported = True
super(EMRPlatform, self).__post_init__()

def _construct_cli_object(self) -> CMDDriverBase:
Expand All @@ -51,8 +52,8 @@ def _construct_cli_object(self) -> CMDDriverBase:
def _install_storage_driver(self):
self.storage = S3StorageDriver(self.cli)

def _construct_cluster_from_props(self, cluster: str, props: str = None):
return DatabricksCluster(self).set_connection(cluster_id=cluster, props=props)
def _construct_cluster_from_props(self, cluster: str, props: str = None, is_inferred: bool = False):
return DatabricksCluster(self, is_inferred=is_inferred).set_connection(cluster_id=cluster, props=props)

def set_offline_cluster(self, cluster_args: dict = None):
pass
Expand Down
15 changes: 12 additions & 3 deletions user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,6 +43,7 @@ class DBAzurePlatform(PlatformBase):
"""
def __post_init__(self):
self.type_id = CspEnv.DATABRICKS_AZURE
self.cluster_inference_supported = True
super().__post_init__()

def _construct_cli_object(self) -> CMDDriverBase:
Expand All @@ -51,8 +52,8 @@ def _construct_cli_object(self) -> CMDDriverBase:
def _install_storage_driver(self):
self.storage = AzureStorageDriver(self.cli)

def _construct_cluster_from_props(self, cluster: str, props: str = None):
return DatabricksAzureCluster(self).set_connection(cluster_id=cluster, props=props)
def _construct_cluster_from_props(self, cluster: str, props: str = None, is_inferred: bool = False):
return DatabricksAzureCluster(self, is_inferred=is_inferred).set_connection(cluster_id=cluster, props=props)

def set_offline_cluster(self, cluster_args: dict = None):
pass
Expand Down Expand Up @@ -103,6 +104,14 @@ def get_supported_gpus(self) -> dict:
gpu_scopes[mc_prof] = NodeHWInfo(sys_info=hw_info_ob, gpu_info=gpu_info_obj)
return gpu_scopes

def generate_cluster_configuration(self, render_args: dict):
executor_names = ','.join([
f'{{"node_id": "12345678900{i}"}}'
for i in range(render_args['NUM_EXECUTOR_NODES'])
])
render_args['EXECUTOR_NAMES'] = f'[{executor_names}]'
return super().generate_cluster_configuration(render_args)


@dataclass
class DBAzureCMDDriver(CMDDriverBase):
Expand Down
20 changes: 17 additions & 3 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,7 @@ class DataprocPlatform(PlatformBase):

def __post_init__(self):
self.type_id = CspEnv.DATAPROC
self.cluster_inference_supported = True
super().__post_init__()

def _set_remaining_configuration_list(self) -> None:
Expand Down Expand Up @@ -81,8 +82,8 @@ def _construct_cli_object(self) -> CMDDriverBase:
def _install_storage_driver(self):
self.storage = GStorageDriver(self.cli)

def _construct_cluster_from_props(self, cluster: str, props: str = None):
return DataprocCluster(self).set_connection(cluster_id=cluster, props=props)
def _construct_cluster_from_props(self, cluster: str, props: str = None, is_inferred: bool = False):
return DataprocCluster(self, is_inferred=is_inferred).set_connection(cluster_id=cluster, props=props)

def set_offline_cluster(self, cluster_args: dict = None):
pass
Expand Down Expand Up @@ -153,6 +154,19 @@ def calc_num_gpus(gpus_criteria_conf: List[dict], num_cores: int) -> int:
gpu_scopes[prof_name] = NodeHWInfo(sys_info=sys_info_obj, gpu_info=gpu_info_obj)
return gpu_scopes

def get_matching_executor_instance(self, cores_per_executor):
executors_from_config = self.configs.get_value('clusterInference', 'defaultCpuInstances', 'executor')
# TODO: Currently only single series is supported. Change this to a loop when using multiple series.
series_name, unit_info = list(executors_from_config.items())[0]
if cores_per_executor in unit_info['vCPUs']:
return f'{series_name}-{cores_per_executor}'
return None

def generate_cluster_configuration(self, render_args: dict):
executor_names = ','.join([f'"test-node-e{i}"' for i in range(render_args['NUM_EXECUTOR_NODES'])])
render_args['EXECUTOR_NAMES'] = f'[{executor_names}]'
return super().generate_cluster_configuration(render_args)


@dataclass
class DataprocCMDDriver(CMDDriverBase): # pylint: disable=abstract-method
Expand Down
7 changes: 4 additions & 3 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,7 @@ class DataprocGkePlatform(DataprocPlatform):
def __post_init__(self):
super().__post_init__()
self.type_id = CspEnv.DATAPROC_GKE
self.cluster_inference_supported = False

@classmethod
def get_spark_node_type_fromstring(cls, value: str):
Expand All @@ -54,8 +55,8 @@ def get_spark_node_type_fromstring(cls, value: str):
def _construct_cli_object(self) -> CMDDriverBase:
return DataprocGkeCMDDriver(timeout=0, cloud_ctxt=self.ctxt)

def _construct_cluster_from_props(self, cluster: str, props: str = None):
return DataprocGkeCluster(self).set_connection(cluster_id=cluster, props=props)
def _construct_cluster_from_props(self, cluster: str, props: str = None, is_inferred: bool = False):
return DataprocGkeCluster(self, is_inferred=is_inferred).set_connection(cluster_id=cluster, props=props)

def migrate_cluster_to_gpu(self, orig_cluster):
"""
Expand Down
24 changes: 16 additions & 8 deletions user_tools/src/spark_rapids_pytools/cloud_api/emr.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,6 +61,7 @@ def process_raw_cluster_prop(cls, prop_container: AbstractPropertiesContainer) -

def __post_init__(self):
self.type_id = CspEnv.EMR
self.cluster_inference_supported = True
super().__post_init__()

def _construct_cli_object(self) -> CMDDriverBase:
Expand All @@ -69,10 +70,8 @@ def _construct_cli_object(self) -> CMDDriverBase:
def _install_storage_driver(self):
self.storage = S3StorageDriver(self.cli)

def _construct_cluster_from_props(self,
cluster: str,
props: str = None):
return EMRCluster(self).set_connection(cluster_id=cluster, props=props)
def _construct_cluster_from_props(self, cluster: str, props: str = None, is_inferred: bool = False):
return EMRCluster(self, is_inferred=is_inferred).set_connection(cluster_id=cluster, props=props)

def migrate_cluster_to_gpu(self, orig_cluster):
"""
Expand Down Expand Up @@ -316,9 +315,18 @@ def __create_ec2_list_by_group(self, group_arg):
else:
group_id = group_arg
group_obj = None
query_args = {'instance-group-id': group_id}
raw_instance_list = self.cli.exec_platform_list_cluster_instances(self, query_args=query_args)
instances_list = json.loads(raw_instance_list).get('Instances')
if self.is_inferred:
# If cluster settings are inferred, create a list of instances with default configuration
render_args = {
'INSTANCE_GROUP_ID': f'"{group_id}"',
'INSTANCE_TYPE': f'"{group_obj.instance_type}"'
}
node_config = json.loads(self.generate_node_configuration(render_args))
instances_list = [node_config for _ in range(group_obj.count)]
else:
query_args = {'instance-group-id': group_id}
raw_instance_list = self.cli.exec_platform_list_cluster_instances(self, query_args=query_args)
instances_list = json.loads(raw_instance_list).get('Instances')
ec2_instances = []
for raw_inst in instances_list:
parsed_state = raw_inst['Status']['State']
Expand Down
11 changes: 8 additions & 3 deletions user_tools/src/spark_rapids_pytools/cloud_api/onprem.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@ class OnPremPlatform(PlatformBase):
def __post_init__(self):
self.type_id = CspEnv.ONPREM
self.platform = self.ctxt_args.get('targetPlatform')
self.cluster_inference_supported = False
super().__post_init__()

def _construct_cli_object(self):
Expand All @@ -49,12 +50,16 @@ def _install_storage_driver(self):
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return OnPremLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def _construct_cluster_from_props(self, cluster: str, props: str = None):
def _construct_cluster_from_props(self, cluster: str, props: str = None, is_inferred: bool = False):
if self.platform is not None:
onprem_cluster = OnPremCluster(self).set_connection(cluster_id=cluster, props=props)
onprem_cluster = OnPremCluster(self, is_inferred=is_inferred).set_connection(
cluster_id=cluster, props=props)
return onprem_cluster
return None

def _construct_cluster_config(self, cluster_info: dict, default_config: dict):
raise NotImplementedError

def migrate_cluster_to_gpu(self, orig_cluster):
"""
given a cluster, convert it to run NVIDIA Gpu based on mapping instance types
Expand Down
31 changes: 27 additions & 4 deletions user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ class PlatformBase:
ctxt: dict = field(default_factory=dict, init=False)
configs: JSONPropertiesContainer = field(default=None, init=False)
logger: Logger = field(default=ToolLogging.get_and_setup_logger('rapids.tools.csp'), init=False)
cluster_inference_supported: bool = field(default=False, init=False)

@classmethod
def list_supported_gpus(cls):
Expand Down Expand Up @@ -780,17 +781,22 @@ def setup_and_validate_env(self):

def _construct_cluster_from_props(self,
cluster: str,
props: str = None):
props: str = None,
is_inferred: bool = False):
raise NotImplementedError

def set_offline_cluster(self, cluster_args: dict = None):
raise NotImplementedError

def load_cluster_by_prop(self, cluster_prop: JSONPropertiesContainer, is_inferred=False):
cluster = cluster_prop.get_value_silent('cluster_id')
return self._construct_cluster_from_props(cluster=cluster,
props=json.dumps(cluster_prop.props),
is_inferred=is_inferred)

def load_cluster_by_prop_file(self, cluster_prop_path: str):
prop_container = JSONPropertiesContainer(prop_arg=cluster_prop_path)
cluster = prop_container.get_value_silent('cluster_id')
return self._construct_cluster_from_props(cluster=cluster,
props=json.dumps(prop_container.props))
return self.load_cluster_by_prop(prop_container)

def connect_cluster_by_name(self, cluster: str):
"""
Expand Down Expand Up @@ -850,6 +856,17 @@ def get_platform_name(self) -> str:
def get_footer_message(self) -> str:
return 'To support acceleration with T4 GPUs, switch the worker node instance types'

def get_matching_executor_instance(self, cores_per_executor):
default_instances = self.configs.get_value('clusterInference', 'defaultCpuInstances', 'executor')
return next((instance['name'] for instance in default_instances if instance['vCPUs'] == cores_per_executor),
None)

def generate_cluster_configuration(self, render_args: dict):
if not self.cluster_inference_supported:
return None
template_path = Utils.resource_path(f'templates/cluster_template/{self.type_id}.ms')
return TemplateGenerator.render_template_file(template_path, render_args)


@dataclass
class ClusterBase(ClusterGetAccessor):
Expand All @@ -867,6 +884,7 @@ class ClusterBase(ClusterGetAccessor):
nodes: dict = field(default_factory=dict, init=False)
props: AbstractPropertiesContainer = field(default=None, init=False)
logger: Logger = field(default=ToolLogging.get_and_setup_logger('rapids.tools.cluster'), init=False)
is_inferred: bool = field(default=False, init=True)

@staticmethod
def _verify_workers_exist(has_no_workers_cb: Callable[[], bool]):
Expand Down Expand Up @@ -1115,6 +1133,11 @@ def generate_bootstrap_script(self, overridden_args: dict = None) -> str:
render_args = self._set_render_args_bootstrap_template(overridden_args)
return TemplateGenerator.render_template_file(template_path, render_args)

def generate_node_configuration(self, render_args: dict) -> str:
platform_name = CspEnv.pretty_print(self.platform.type_id)
template_path = Utils.resource_path(f'templates/cluster_template/{platform_name}_node.ms')
return TemplateGenerator.render_template_file(template_path, render_args)


@dataclass
class ClusterReshape(ClusterGetAccessor):
Expand Down
77 changes: 77 additions & 0 deletions user_tools/src/spark_rapids_pytools/common/cluster_inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This module provides functionality for cluster inference"""

from dataclasses import dataclass, field

from typing import Optional
from logging import Logger

from spark_rapids_pytools.cloud_api.sp_types import PlatformBase, ClusterBase
from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.utilities import ToolLogging


@dataclass
class ClusterInference:
"""
Class for inferring cluster information and constructing CPU clusters.
:param platform: The platform on which the cluster inference is performed.
"""
platform: PlatformBase = field(default=None, init=True)
logger: Logger = field(default=ToolLogging.get_and_setup_logger('rapids.tools.cluster_inference'), init=False)

def get_cluster_template_args(self, cluster_info_json: JSONPropertiesContainer) -> Optional[dict]:
"""
Extract information about drivers and executors from input json
"""
# Currently we support only single driver node for all CSPs
num_driver_nodes = 1
driver_instance = cluster_info_json.get_value_silent('driverInstance')
# If driver instance is not set, use the default value from platform configurations
if driver_instance is None:
driver_instance = self.platform.configs.get_value('clusterInference', 'defaultCpuInstances', 'driver')
num_executor_nodes = cluster_info_json.get_value_silent('numExecutorNodes')
executor_instance = cluster_info_json.get_value_silent('executorInstance')
if executor_instance is None:
# If executor instance is not set, use the default value based on the number of cores
cores_per_executor = cluster_info_json.get_value_silent('coresPerExecutor')
executor_instance = self.platform.get_matching_executor_instance(cores_per_executor)
if executor_instance is None:
self.logger.info('Unable to infer CPU cluster. No matching executor instance found for vCPUs = %s',
cores_per_executor)
return None
return {
'DRIVER_INSTANCE': f'"{driver_instance}"',
'NUM_DRIVER_NODES': num_driver_nodes,
'EXECUTOR_INSTANCE': f'"{executor_instance}"',
'NUM_EXECUTOR_NODES': num_executor_nodes
}

def infer_cpu_cluster(self, cluster_info: JSONPropertiesContainer) -> Optional[ClusterBase]:
"""
Infer CPU cluster configuration based on json input and return the constructed cluster object.
"""
# Extract cluster information from parsed logs
cluster_template_args = self.get_cluster_template_args(cluster_info)
if cluster_template_args is None:
return None
# Construct cluster configuration using platform-specific logic
cluster_conf = self.platform.generate_cluster_configuration(cluster_template_args)
if cluster_conf is None:
return None
cluster_props_new = JSONPropertiesContainer(cluster_conf, file_load=False)
return self.platform.load_cluster_by_prop(cluster_props_new, is_inferred=True)
Loading

0 comments on commit 6aa7cd8

Please sign in to comment.