From 5c01aa48bc8a2748b00a81fc5a63130e44c91cc2 Mon Sep 17 00:00:00 2001 From: Cindy Jiang <47068112+cindyyuanjiang@users.noreply.github.com> Date: Wed, 7 Aug 2024 16:23:58 -0700 Subject: [PATCH] cleaned up qual tool code after removing cli dependency (#1256) Signed-off-by: cindyyuanjiang --- .../cloud_api/databricks_aws.py | 17 ------ .../cloud_api/databricks_azure.py | 51 ------------------ .../cloud_api/dataproc.py | 29 ---------- .../src/spark_rapids_pytools/cloud_api/emr.py | 35 +----------- .../spark_rapids_pytools/cloud_api/onprem.py | 4 +- .../cloud_api/sp_types.py | 53 +++++-------------- .../rapids/qualification.py | 4 +- user_tools/tests/test_diagnostic.py | 18 ------- 8 files changed, 17 insertions(+), 194 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py index 467ec05b0..82bd40133 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py @@ -169,26 +169,9 @@ def _build_cmd_scp_from_node(self, node: ClusterNode, src: str, dest: str) -> st dest] return Utils.gen_joined_str(' ', prefix_args) - # TODO: to be deprecated - def _build_platform_describe_node_instance(self, node: ClusterNode) -> list: - cmd_params = ['aws ec2 describe-instance-types', - '--region', f'{self.get_region()}', - '--instance-types', f'{node.instance_type}'] - return cmd_params - - def _get_instance_description_cache_key(self, node: ClusterNode) -> tuple: - return node.instance_type, self.get_region() - def get_submit_spark_job_cmd_for_cluster(self, cluster_name: str, submit_args: dict) -> List[str]: raise NotImplementedError - # TODO: to be deprecated - def _exec_platform_describe_node_instance(self, node: ClusterNode) -> str: - raw_instance_descriptions = super()._exec_platform_describe_node_instance(node) - instance_descriptions = JSONPropertiesContainer(raw_instance_descriptions, file_load=False) - # Return the instance description of node type. Convert to valid JSON string for type matching. - return json.dumps(instance_descriptions.get_value('InstanceTypes')[0]) - def init_instance_descriptions(self) -> None: instance_description_file_path = Utils.resource_path('emr-instance-catalog.json') self.logger.info('Loading instance descriptions from file: %s', instance_description_file_path) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py index 7b0d27f0d..c746e26b3 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py @@ -14,9 +14,7 @@ """Implementation specific to DATABRICKS_AZURE""" -import datetime import json -import os from dataclasses import dataclass, field from typing import Any, List @@ -26,7 +24,6 @@ from spark_rapids_pytools.cloud_api.sp_types import CMDDriverBase, ClusterBase, ClusterNode, \ PlatformBase, SysInfo, GpuHWInfo, ClusterState, SparkNodeType, ClusterGetAccessor, NodeHWInfo, GpuDevice from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer -from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import Utils from spark_rapids_pytools.pricing.databricks_azure_pricing import DatabricksAzurePriceProvider from spark_rapids_pytools.pricing.price_provider import SavingsEstimator @@ -111,8 +108,6 @@ class DBAzureCMDDriver(CMDDriverBase): """Represents the command interface that will be used by DATABRICKS_AZURE""" configs: JSONPropertiesContainer = None - cache_expiration_secs: int = field(default=604800, init=False) # update the file once a week - # logger: Logger = field(default=ToolLogging.get_and_setup_logger('rapids.tools.databricks.azure'), init=False) def _list_inconsistent_configurations(self) -> list: incorrect_envs = super()._list_inconsistent_configurations() @@ -198,36 +193,6 @@ def _process_instance_description(self, instance_descriptions: str) -> dict: def get_instance_description_cli_params(self): return ['az vm list-skus', '--location', f'{self.get_region()}'] - # TODO: to be deprecated - def _build_platform_describe_node_instance(self, node: ClusterNode) -> list: - pass - - def _caches_expired(self, cache_file) -> bool: - if not os.path.exists(cache_file): - return True - modified_time = os.path.getmtime(cache_file) - diff_time = int(datetime.datetime.now().timestamp() - modified_time) - if diff_time > self.cache_expiration_secs: - return True - return False - - # TODO: to be deprecated - def init_instances_description(self) -> str: - cache_dir = Utils.get_rapids_tools_env('CACHE_FOLDER') - fpath = FSUtil.build_path(cache_dir, 'azure-instances-catalog.json') - if self._caches_expired(fpath): - self.logger.info('Downloading the Azure instance type descriptions catalog') - self.generate_instance_description(fpath) - else: - self.logger.info('The Azure instance type descriptions catalog is loaded from the cache') - return fpath - - # TODO: to be deprecated - def _exec_platform_describe_node_instance(self, node: ClusterNode) -> str: - instance_descriptions = JSONPropertiesContainer(self.init_instances_description()) - # Return the instance description of node type. Convert to valid JSON string for type matching. - return json.dumps(instance_descriptions.get_value_silent(node.instance_type)) - def get_submit_spark_job_cmd_for_cluster(self, cluster_name: str, submit_args: dict) -> List[str]: raise NotImplementedError @@ -236,12 +201,6 @@ def get_region(self) -> str: return self.env_vars.get('location') return self.env_vars.get('region') - def init_instance_descriptions(self) -> None: - platform = CspEnv.pretty_print(self.cloud_ctxt['platformType']) - instance_description_file_path = Utils.resource_path(f'{platform}-instance-catalog.json') - self.logger.info('Loading instance descriptions from file: %s', instance_description_file_path) - self.instance_descriptions = JSONPropertiesContainer(instance_description_file_path) - @dataclass class DatabricksAzureNode(ClusterNode): @@ -252,12 +211,6 @@ class DatabricksAzureNode(ClusterNode): def _set_fields_from_props(self): self.name = self.props.get_value_silent('public_dns') - def _pull_sys_info(self, cli=None) -> SysInfo: - cpu_mem = self.mc_props.get_value('MemoryInMB') - num_cpus = self.mc_props.get_value('VCpuCount') - - return SysInfo(num_cpus=num_cpus, cpu_mem=cpu_mem) - def _pull_gpu_hw_info(self, cli=None) -> GpuHWInfo or None: gpu_info = cli.configs.get_value('gpuConfigs', 'user-tools', 'supportedGpuInstances') if gpu_info is None: @@ -270,10 +223,6 @@ def _pull_gpu_hw_info(self, cli=None) -> GpuHWInfo or None: gpu_device=gpu_device, gpu_mem=gpu_instance['MemoryInfo']['SizeInMiB']) - def _pull_and_set_mc_props(self, cli=None): - instances_description = cli.describe_node_instance(self.instance_type) if cli else None - self.mc_props = JSONPropertiesContainer(prop_arg=instances_description, file_load=False) - @dataclass class DatabricksAzureCluster(ClusterBase): diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py index a916469f7..5c32466fa 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py @@ -192,20 +192,6 @@ def _list_inconsistent_configurations(self) -> list: incorrect_envs.append(f'Property {prop_entry} is not set.') return incorrect_envs - # TODO: to be deprecated - def _build_platform_describe_node_instance(self, node: ClusterNode) -> list: - cmd_params = ['gcloud', - 'compute', - 'machine-types', - 'describe', - f'{node.instance_type}', - '--zone', - f'{node.zone}'] - return cmd_params - - def _get_instance_description_cache_key(self, node: ClusterNode) -> tuple: - return node.instance_type, node.zone - def _build_platform_list_cluster(self, cluster, query_args: dict = None) -> list: @@ -368,12 +354,6 @@ def extract_gpu_name(gpu_description: str) -> str: def get_instance_description_cli_params(self) -> list: return ['gcloud compute machine-types list', '--zones', f'{self.get_zone()}'] - def init_instance_descriptions(self) -> None: - platform = CspEnv.pretty_print(self.cloud_ctxt['platformType']) - instance_description_file_path = Utils.resource_path(f'{platform}-instance-catalog.json') - self.logger.info('Loading instance descriptions from file: %s', instance_description_file_path) - self.instance_descriptions = JSONPropertiesContainer(instance_description_file_path) - @dataclass class DataprocNode(ClusterNode): @@ -381,10 +361,6 @@ class DataprocNode(ClusterNode): zone: str = field(default=None, init=False) - def _pull_and_set_mc_props(self, cli=None): - instances_description = cli.describe_node_instance(self.instance_type) if cli else None - self.mc_props = JSONPropertiesContainer(prop_arg=instances_description, file_load=False) - @staticmethod def __extract_info_from_value(conf_val: str): if '/' in conf_val: @@ -440,11 +416,6 @@ def get_gpu_device(accelerator_name: str) -> GpuDevice: gpu_mem=gpu_mem) return None - def _pull_sys_info(self, cli=None) -> SysInfo: - cpu_mem = self.mc_props.get_value('MemoryInMB') - num_cpus = self.mc_props.get_value('VCpuCount') - return SysInfo(num_cpus=num_cpus, cpu_mem=cpu_mem) - def _set_fields_from_props(self): # set the machine type if not self.props: diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/emr.py b/user_tools/src/spark_rapids_pytools/cloud_api/emr.py index 1343fddaf..e9a4c8524 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/emr.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/emr.py @@ -23,8 +23,7 @@ from spark_rapids_pytools.cloud_api.emr_job import EmrLocalRapidsJob from spark_rapids_pytools.cloud_api.s3storage import S3StorageDriver from spark_rapids_pytools.cloud_api.sp_types import PlatformBase, ClusterBase, CMDDriverBase, \ - ClusterState, SparkNodeType, ClusterNode, GpuHWInfo, SysInfo, GpuDevice, \ - ClusterGetAccessor + ClusterState, SparkNodeType, ClusterNode, GpuHWInfo, GpuDevice, ClusterGetAccessor from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer, \ AbstractPropertiesContainer from spark_rapids_pytools.common.utilities import Utils @@ -199,16 +198,6 @@ def _build_cmd_scp_from_node(self, node: ClusterNode, src: str, dest: str) -> st dest] return Utils.gen_joined_str(' ', prefix_args) - # TODO: to be deprecated - def _build_platform_describe_node_instance(self, node: ClusterNode) -> list: - cmd_params = ['aws ec2 describe-instance-types', - '--region', f'{self.get_region()}', - '--instance-types', f'{node.instance_type}'] - return cmd_params - - def _get_instance_description_cache_key(self, node: ClusterNode) -> tuple: - return node.instance_type, self.get_region() - def get_zone(self) -> str: describe_cmd = ['aws ec2 describe-availability-zones', '--region', f'{self.get_region()}'] @@ -243,13 +232,6 @@ def exec_platform_describe_cluster_by_id(self, describe_cmd = f'aws emr describe-cluster --cluster-id {cluster_id}' return self.run_sys_cmd(describe_cmd) - # TODO: to be deprecated - def _exec_platform_describe_node_instance(self, node: ClusterNode) -> str: - raw_instance_descriptions = super()._exec_platform_describe_node_instance(node) - instance_descriptions = JSONPropertiesContainer(raw_instance_descriptions, file_load=False) - # Return the instance description of node type. Convert to valid JSON string for type matching. - return json.dumps(instance_descriptions.get_value('InstanceTypes')[0]) - def get_submit_spark_job_cmd_for_cluster(self, cluster_name: str, submit_args: dict) -> List[str]: raise NotImplementedError @@ -270,12 +252,6 @@ def _process_instance_description(self, instance_descriptions: str) -> dict: def get_instance_description_cli_params(self): return ['aws ec2 describe-instance-types', '--region', f'{self.get_region()}'] - def init_instance_descriptions(self) -> None: - platform = CspEnv.pretty_print(self.cloud_ctxt['platformType']) - instance_description_file_path = Utils.resource_path(f'{platform}-instance-catalog.json') - self.logger.info('Loading instance descriptions from file: %s', instance_description_file_path) - self.instance_descriptions = JSONPropertiesContainer(instance_description_file_path) - @dataclass class InstanceGroup: @@ -314,19 +290,10 @@ class EMRNode(ClusterNode): """ ec2_instance: Ec2Instance = field(default=None, init=False) - def _pull_and_set_mc_props(self, cli=None): - instances_description = cli.describe_node_instance(self.instance_type) if cli else None - self.mc_props = JSONPropertiesContainer(prop_arg=instances_description, file_load=False) - def _set_fields_from_props(self): self.name = self.ec2_instance.dns_name self.instance_type = self.ec2_instance.group.instance_type - def _pull_sys_info(self, cli=None) -> SysInfo: - cpu_mem = self.mc_props.get_value('MemoryInMB') - num_cpus = self.mc_props.get_value('VCpuCount') - return SysInfo(num_cpus=num_cpus, cpu_mem=cpu_mem) - def _pull_gpu_hw_info(self, cli=None) -> GpuHWInfo or None: raw_gpus = self.mc_props.get_value_silent('GpuInfo') if raw_gpus is None or len(raw_gpus) == 0: diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py index 3e2e5d8d8..7f972b804 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/onprem.py @@ -164,10 +164,10 @@ class OnPremNode(ClusterNode): """Implementation of Onprem cluster node.""" def fetch_and_set_hw_info(self, cli=None): - sys_info = self._pull_sys_info(cli) + sys_info = self._pull_sys_info() self.construct_hw_info(cli=cli, sys_info=sys_info) - def _pull_sys_info(self, cli=None) -> SysInfo: + def _pull_sys_info(self) -> SysInfo: cpu_mem = self.props.get_value('memory') cpu_mem = cpu_mem.replace('MiB', '') num_cpus = self.props.get_value('numCores') diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py index 9eff532fe..2d8d639ce 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py @@ -159,14 +159,16 @@ def _set_fields_from_props(self): pass def _pull_and_set_mc_props(self, cli=None): - instances_description = cli.exec_platform_describe_node_instance(self) if cli else None + instances_description = cli.describe_node_instance(self.instance_type) if cli else None self.mc_props = JSONPropertiesContainer(prop_arg=instances_description, file_load=False) def _pull_gpu_hw_info(self, cli=None) -> GpuHWInfo: raise NotImplementedError - def _pull_sys_info(self, cli=None) -> SysInfo: - raise NotImplementedError + def _pull_sys_info(self) -> SysInfo: + cpu_mem = self.mc_props.get_value('MemoryInMB') + num_cpus = self.mc_props.get_value('VCpuCount') + return SysInfo(num_cpus=num_cpus, cpu_mem=cpu_mem) def get_name(self) -> str: return self.name @@ -181,7 +183,7 @@ def construct_hw_info(self, def fetch_and_set_hw_info(self, cli=None): self._pull_and_set_mc_props(cli) - sys_info = self._pull_sys_info(cli) + sys_info = self._pull_sys_info() try: # if a node has no gpu, then it is expected that setting the gpu info fails gpu_info = self._pull_gpu_hw_info(cli) @@ -293,8 +295,6 @@ class CMDDriverBase: timeout: int = 0 env_vars: dict = field(default_factory=dict, init=False) logger: Logger = None - # TODO: to be deprecated - instance_descriptions_cache: dict = field(default_factory=dict, init=False) instance_descriptions: JSONPropertiesContainer = field(default=None, init=False) def get_env_var(self, key: str): @@ -508,43 +508,14 @@ def pull_cluster_props_by_args(self, args: dict) -> str or None: del args # Unused by super method. return '' - # TODO: to be deprecated - def _build_platform_describe_node_instance(self, node: ClusterNode) -> list: - del node # Unused by super method. - return [] - - def _get_instance_description_cache_key(self, node: ClusterNode) -> tuple: - """ - Generates a cache key from the node's instance type for accessing the instance description cache. - This default implementation should be overridden by subclasses that require additional fields. - """ - return (node.instance_type,) - - # TODO: to be deprecated - def _exec_platform_describe_node_instance(self, node: ClusterNode) -> str: - """ - Given a node, execute platform CLI to pull the properties of the instance type running on - that node - :param node: object representing cluster component - :return: string containing the properties of the machine. The string could be in json or yaml format. - """ - cmd_params = self._build_platform_describe_node_instance(node=node) - return self.run_sys_cmd(cmd_params) - - # TODO: to be deprecated - def exec_platform_describe_node_instance(self, node: ClusterNode): + def init_instance_descriptions(self) -> None: """ - Returns the instance type description of the cluster node. If the description - is not cached, it executes a platform specific command to fetch and cache it. + Load instance description file from resources based on platform type. """ - key = self._get_instance_description_cache_key(node) - if key not in self.instance_descriptions_cache: - # Cache the instance description - self.instance_descriptions_cache[key] = self._exec_platform_describe_node_instance(node) - return self.instance_descriptions_cache[key] - - def init_instance_descriptions(self) -> None: - pass + platform = CspEnv.pretty_print(self.cloud_ctxt['platformType']) + instance_description_file_path = Utils.resource_path(f'{platform}-instance-catalog.json') + self.logger.info('Loading instance descriptions from file: %s', instance_description_file_path) + self.instance_descriptions = JSONPropertiesContainer(instance_description_file_path) def describe_node_instance(self, instance_type: str) -> str: instance_info = self.instance_descriptions.get_value(instance_type) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index e7a27b4bd..c48bd1729 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -222,8 +222,8 @@ def _process_gpu_cluster_worker_node(): if gpu_cluster_obj: worker_node = gpu_cluster_obj.get_worker_node() worker_node._pull_and_set_mc_props(cli=self.ctxt.platform.cli) # pylint: disable=protected-access - sys_info = worker_node._pull_sys_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access - gpu_info = worker_node._pull_gpu_hw_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access + sys_info = worker_node._pull_sys_info() # pylint: disable=protected-access + gpu_info = worker_node._pull_gpu_hw_info() # pylint: disable=protected-access worker_node.hw_info = NodeHWInfo(sys_info=sys_info, gpu_info=gpu_info) except Exception as e: # pylint: disable=broad-except diff --git a/user_tools/tests/test_diagnostic.py b/user_tools/tests/test_diagnostic.py index 0689657b2..ea3f8cefa 100644 --- a/user_tools/tests/test_diagnostic.py +++ b/user_tools/tests/test_diagnostic.py @@ -22,7 +22,6 @@ from cli_test_helpers import ArgvContext, EnvironContext # pylint: disable=import-error from spark_rapids_pytools import wrapper -from .spark_rapids_tools_ut import conftest from .mock_cluster import mock_live_cluster @@ -240,20 +239,3 @@ def test_cancel_confirm(self, build_mock, cloud, user_input, capsys): assert 'User canceled the operation' in stderr assert 'Raised an error in phase [Process-Arguments]' in stderr - - @pytest.fixture(scope='function', autouse=True) - def mock_databricks_azure_instance_description(self, cloud, monkeypatch): - """ - Mock `init_instances_description()` for Databricks Azure tests to return a test JSON file path. - This is needed to avoid creating an actual azure-instance-catalog.json file in the `CACHE_FOLDER`. - """ - def mock_init_instances_description(_): - resource_dir = conftest.get_test_resources_path() - test_azure_catalog_file = 'cluster/databricks/test-azure-instances-catalog.json' - return os.path.join(resource_dir, test_azure_catalog_file) - - if cloud == 'databricks-azure': - monkeypatch.setattr( - 'spark_rapids_pytools.cloud_api.databricks_azure.DBAzureCMDDriver.init_instances_description', - mock_init_instances_description - )