Skip to content

Commit

Permalink
cleaned up qual tool code after removing cli dependency (#1256)
Browse files Browse the repository at this point in the history
Signed-off-by: cindyyuanjiang <[email protected]>
  • Loading branch information
cindyyuanjiang authored Aug 7, 2024
1 parent 431d77b commit 5c01aa4
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 194 deletions.
17 changes: 0 additions & 17 deletions user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,26 +169,9 @@ def _build_cmd_scp_from_node(self, node: ClusterNode, src: str, dest: str) -> st
dest]
return Utils.gen_joined_str(' ', prefix_args)

# TODO: to be deprecated
def _build_platform_describe_node_instance(self, node: ClusterNode) -> list:
cmd_params = ['aws ec2 describe-instance-types',
'--region', f'{self.get_region()}',
'--instance-types', f'{node.instance_type}']
return cmd_params

def _get_instance_description_cache_key(self, node: ClusterNode) -> tuple:
return node.instance_type, self.get_region()

def get_submit_spark_job_cmd_for_cluster(self, cluster_name: str, submit_args: dict) -> List[str]:
raise NotImplementedError

# TODO: to be deprecated
def _exec_platform_describe_node_instance(self, node: ClusterNode) -> str:
raw_instance_descriptions = super()._exec_platform_describe_node_instance(node)
instance_descriptions = JSONPropertiesContainer(raw_instance_descriptions, file_load=False)
# Return the instance description of node type. Convert to valid JSON string for type matching.
return json.dumps(instance_descriptions.get_value('InstanceTypes')[0])

def init_instance_descriptions(self) -> None:
instance_description_file_path = Utils.resource_path('emr-instance-catalog.json')
self.logger.info('Loading instance descriptions from file: %s', instance_description_file_path)
Expand Down
51 changes: 0 additions & 51 deletions user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

"""Implementation specific to DATABRICKS_AZURE"""

import datetime
import json
import os
from dataclasses import dataclass, field
from typing import Any, List

Expand All @@ -26,7 +24,6 @@
from spark_rapids_pytools.cloud_api.sp_types import CMDDriverBase, ClusterBase, ClusterNode, \
PlatformBase, SysInfo, GpuHWInfo, ClusterState, SparkNodeType, ClusterGetAccessor, NodeHWInfo, GpuDevice
from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils
from spark_rapids_pytools.pricing.databricks_azure_pricing import DatabricksAzurePriceProvider
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator
Expand Down Expand Up @@ -111,8 +108,6 @@ class DBAzureCMDDriver(CMDDriverBase):
"""Represents the command interface that will be used by DATABRICKS_AZURE"""

configs: JSONPropertiesContainer = None
cache_expiration_secs: int = field(default=604800, init=False) # update the file once a week
# logger: Logger = field(default=ToolLogging.get_and_setup_logger('rapids.tools.databricks.azure'), init=False)

def _list_inconsistent_configurations(self) -> list:
incorrect_envs = super()._list_inconsistent_configurations()
Expand Down Expand Up @@ -198,36 +193,6 @@ def _process_instance_description(self, instance_descriptions: str) -> dict:
def get_instance_description_cli_params(self):
return ['az vm list-skus', '--location', f'{self.get_region()}']

# TODO: to be deprecated
def _build_platform_describe_node_instance(self, node: ClusterNode) -> list:
pass

def _caches_expired(self, cache_file) -> bool:
if not os.path.exists(cache_file):
return True
modified_time = os.path.getmtime(cache_file)
diff_time = int(datetime.datetime.now().timestamp() - modified_time)
if diff_time > self.cache_expiration_secs:
return True
return False

# TODO: to be deprecated
def init_instances_description(self) -> str:
cache_dir = Utils.get_rapids_tools_env('CACHE_FOLDER')
fpath = FSUtil.build_path(cache_dir, 'azure-instances-catalog.json')
if self._caches_expired(fpath):
self.logger.info('Downloading the Azure instance type descriptions catalog')
self.generate_instance_description(fpath)
else:
self.logger.info('The Azure instance type descriptions catalog is loaded from the cache')
return fpath

# TODO: to be deprecated
def _exec_platform_describe_node_instance(self, node: ClusterNode) -> str:
instance_descriptions = JSONPropertiesContainer(self.init_instances_description())
# Return the instance description of node type. Convert to valid JSON string for type matching.
return json.dumps(instance_descriptions.get_value_silent(node.instance_type))

def get_submit_spark_job_cmd_for_cluster(self, cluster_name: str, submit_args: dict) -> List[str]:
raise NotImplementedError

Expand All @@ -236,12 +201,6 @@ def get_region(self) -> str:
return self.env_vars.get('location')
return self.env_vars.get('region')

def init_instance_descriptions(self) -> None:
platform = CspEnv.pretty_print(self.cloud_ctxt['platformType'])
instance_description_file_path = Utils.resource_path(f'{platform}-instance-catalog.json')
self.logger.info('Loading instance descriptions from file: %s', instance_description_file_path)
self.instance_descriptions = JSONPropertiesContainer(instance_description_file_path)


@dataclass
class DatabricksAzureNode(ClusterNode):
Expand All @@ -252,12 +211,6 @@ class DatabricksAzureNode(ClusterNode):
def _set_fields_from_props(self):
self.name = self.props.get_value_silent('public_dns')

def _pull_sys_info(self, cli=None) -> SysInfo:
cpu_mem = self.mc_props.get_value('MemoryInMB')
num_cpus = self.mc_props.get_value('VCpuCount')

return SysInfo(num_cpus=num_cpus, cpu_mem=cpu_mem)

def _pull_gpu_hw_info(self, cli=None) -> GpuHWInfo or None:
gpu_info = cli.configs.get_value('gpuConfigs', 'user-tools', 'supportedGpuInstances')
if gpu_info is None:
Expand All @@ -270,10 +223,6 @@ def _pull_gpu_hw_info(self, cli=None) -> GpuHWInfo or None:
gpu_device=gpu_device,
gpu_mem=gpu_instance['MemoryInfo']['SizeInMiB'])

def _pull_and_set_mc_props(self, cli=None):
instances_description = cli.describe_node_instance(self.instance_type) if cli else None
self.mc_props = JSONPropertiesContainer(prop_arg=instances_description, file_load=False)


@dataclass
class DatabricksAzureCluster(ClusterBase):
Expand Down
29 changes: 0 additions & 29 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,20 +192,6 @@ def _list_inconsistent_configurations(self) -> list:
incorrect_envs.append(f'Property {prop_entry} is not set.')
return incorrect_envs

# TODO: to be deprecated
def _build_platform_describe_node_instance(self, node: ClusterNode) -> list:
cmd_params = ['gcloud',
'compute',
'machine-types',
'describe',
f'{node.instance_type}',
'--zone',
f'{node.zone}']
return cmd_params

def _get_instance_description_cache_key(self, node: ClusterNode) -> tuple:
return node.instance_type, node.zone

def _build_platform_list_cluster(self,
cluster,
query_args: dict = None) -> list:
Expand Down Expand Up @@ -368,23 +354,13 @@ def extract_gpu_name(gpu_description: str) -> str:
def get_instance_description_cli_params(self) -> list:
return ['gcloud compute machine-types list', '--zones', f'{self.get_zone()}']

def init_instance_descriptions(self) -> None:
platform = CspEnv.pretty_print(self.cloud_ctxt['platformType'])
instance_description_file_path = Utils.resource_path(f'{platform}-instance-catalog.json')
self.logger.info('Loading instance descriptions from file: %s', instance_description_file_path)
self.instance_descriptions = JSONPropertiesContainer(instance_description_file_path)


@dataclass
class DataprocNode(ClusterNode):
"""Implementation of Dataproc cluster node."""

zone: str = field(default=None, init=False)

def _pull_and_set_mc_props(self, cli=None):
instances_description = cli.describe_node_instance(self.instance_type) if cli else None
self.mc_props = JSONPropertiesContainer(prop_arg=instances_description, file_load=False)

@staticmethod
def __extract_info_from_value(conf_val: str):
if '/' in conf_val:
Expand Down Expand Up @@ -440,11 +416,6 @@ def get_gpu_device(accelerator_name: str) -> GpuDevice:
gpu_mem=gpu_mem)
return None

def _pull_sys_info(self, cli=None) -> SysInfo:
cpu_mem = self.mc_props.get_value('MemoryInMB')
num_cpus = self.mc_props.get_value('VCpuCount')
return SysInfo(num_cpus=num_cpus, cpu_mem=cpu_mem)

def _set_fields_from_props(self):
# set the machine type
if not self.props:
Expand Down
35 changes: 1 addition & 34 deletions user_tools/src/spark_rapids_pytools/cloud_api/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
from spark_rapids_pytools.cloud_api.emr_job import EmrLocalRapidsJob
from spark_rapids_pytools.cloud_api.s3storage import S3StorageDriver
from spark_rapids_pytools.cloud_api.sp_types import PlatformBase, ClusterBase, CMDDriverBase, \
ClusterState, SparkNodeType, ClusterNode, GpuHWInfo, SysInfo, GpuDevice, \
ClusterGetAccessor
ClusterState, SparkNodeType, ClusterNode, GpuHWInfo, GpuDevice, ClusterGetAccessor
from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer, \
AbstractPropertiesContainer
from spark_rapids_pytools.common.utilities import Utils
Expand Down Expand Up @@ -199,16 +198,6 @@ def _build_cmd_scp_from_node(self, node: ClusterNode, src: str, dest: str) -> st
dest]
return Utils.gen_joined_str(' ', prefix_args)

# TODO: to be deprecated
def _build_platform_describe_node_instance(self, node: ClusterNode) -> list:
cmd_params = ['aws ec2 describe-instance-types',
'--region', f'{self.get_region()}',
'--instance-types', f'{node.instance_type}']
return cmd_params

def _get_instance_description_cache_key(self, node: ClusterNode) -> tuple:
return node.instance_type, self.get_region()

def get_zone(self) -> str:
describe_cmd = ['aws ec2 describe-availability-zones',
'--region', f'{self.get_region()}']
Expand Down Expand Up @@ -243,13 +232,6 @@ def exec_platform_describe_cluster_by_id(self,
describe_cmd = f'aws emr describe-cluster --cluster-id {cluster_id}'
return self.run_sys_cmd(describe_cmd)

# TODO: to be deprecated
def _exec_platform_describe_node_instance(self, node: ClusterNode) -> str:
raw_instance_descriptions = super()._exec_platform_describe_node_instance(node)
instance_descriptions = JSONPropertiesContainer(raw_instance_descriptions, file_load=False)
# Return the instance description of node type. Convert to valid JSON string for type matching.
return json.dumps(instance_descriptions.get_value('InstanceTypes')[0])

def get_submit_spark_job_cmd_for_cluster(self, cluster_name: str, submit_args: dict) -> List[str]:
raise NotImplementedError

Expand All @@ -270,12 +252,6 @@ def _process_instance_description(self, instance_descriptions: str) -> dict:
def get_instance_description_cli_params(self):
return ['aws ec2 describe-instance-types', '--region', f'{self.get_region()}']

def init_instance_descriptions(self) -> None:
platform = CspEnv.pretty_print(self.cloud_ctxt['platformType'])
instance_description_file_path = Utils.resource_path(f'{platform}-instance-catalog.json')
self.logger.info('Loading instance descriptions from file: %s', instance_description_file_path)
self.instance_descriptions = JSONPropertiesContainer(instance_description_file_path)


@dataclass
class InstanceGroup:
Expand Down Expand Up @@ -314,19 +290,10 @@ class EMRNode(ClusterNode):
"""
ec2_instance: Ec2Instance = field(default=None, init=False)

def _pull_and_set_mc_props(self, cli=None):
instances_description = cli.describe_node_instance(self.instance_type) if cli else None
self.mc_props = JSONPropertiesContainer(prop_arg=instances_description, file_load=False)

def _set_fields_from_props(self):
self.name = self.ec2_instance.dns_name
self.instance_type = self.ec2_instance.group.instance_type

def _pull_sys_info(self, cli=None) -> SysInfo:
cpu_mem = self.mc_props.get_value('MemoryInMB')
num_cpus = self.mc_props.get_value('VCpuCount')
return SysInfo(num_cpus=num_cpus, cpu_mem=cpu_mem)

def _pull_gpu_hw_info(self, cli=None) -> GpuHWInfo or None:
raw_gpus = self.mc_props.get_value_silent('GpuInfo')
if raw_gpus is None or len(raw_gpus) == 0:
Expand Down
4 changes: 2 additions & 2 deletions user_tools/src/spark_rapids_pytools/cloud_api/onprem.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ class OnPremNode(ClusterNode):
"""Implementation of Onprem cluster node."""

def fetch_and_set_hw_info(self, cli=None):
sys_info = self._pull_sys_info(cli)
sys_info = self._pull_sys_info()
self.construct_hw_info(cli=cli, sys_info=sys_info)

def _pull_sys_info(self, cli=None) -> SysInfo:
def _pull_sys_info(self) -> SysInfo:
cpu_mem = self.props.get_value('memory')
cpu_mem = cpu_mem.replace('MiB', '')
num_cpus = self.props.get_value('numCores')
Expand Down
53 changes: 12 additions & 41 deletions user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,16 @@ def _set_fields_from_props(self):
pass

def _pull_and_set_mc_props(self, cli=None):
instances_description = cli.exec_platform_describe_node_instance(self) if cli else None
instances_description = cli.describe_node_instance(self.instance_type) if cli else None
self.mc_props = JSONPropertiesContainer(prop_arg=instances_description, file_load=False)

def _pull_gpu_hw_info(self, cli=None) -> GpuHWInfo:
raise NotImplementedError

def _pull_sys_info(self, cli=None) -> SysInfo:
raise NotImplementedError
def _pull_sys_info(self) -> SysInfo:
cpu_mem = self.mc_props.get_value('MemoryInMB')
num_cpus = self.mc_props.get_value('VCpuCount')
return SysInfo(num_cpus=num_cpus, cpu_mem=cpu_mem)

def get_name(self) -> str:
return self.name
Expand All @@ -181,7 +183,7 @@ def construct_hw_info(self,

def fetch_and_set_hw_info(self, cli=None):
self._pull_and_set_mc_props(cli)
sys_info = self._pull_sys_info(cli)
sys_info = self._pull_sys_info()
try:
# if a node has no gpu, then it is expected that setting the gpu info fails
gpu_info = self._pull_gpu_hw_info(cli)
Expand Down Expand Up @@ -293,8 +295,6 @@ class CMDDriverBase:
timeout: int = 0
env_vars: dict = field(default_factory=dict, init=False)
logger: Logger = None
# TODO: to be deprecated
instance_descriptions_cache: dict = field(default_factory=dict, init=False)
instance_descriptions: JSONPropertiesContainer = field(default=None, init=False)

def get_env_var(self, key: str):
Expand Down Expand Up @@ -508,43 +508,14 @@ def pull_cluster_props_by_args(self, args: dict) -> str or None:
del args # Unused by super method.
return ''

# TODO: to be deprecated
def _build_platform_describe_node_instance(self, node: ClusterNode) -> list:
del node # Unused by super method.
return []

def _get_instance_description_cache_key(self, node: ClusterNode) -> tuple:
"""
Generates a cache key from the node's instance type for accessing the instance description cache.
This default implementation should be overridden by subclasses that require additional fields.
"""
return (node.instance_type,)

# TODO: to be deprecated
def _exec_platform_describe_node_instance(self, node: ClusterNode) -> str:
"""
Given a node, execute platform CLI to pull the properties of the instance type running on
that node
:param node: object representing cluster component
:return: string containing the properties of the machine. The string could be in json or yaml format.
"""
cmd_params = self._build_platform_describe_node_instance(node=node)
return self.run_sys_cmd(cmd_params)

# TODO: to be deprecated
def exec_platform_describe_node_instance(self, node: ClusterNode):
def init_instance_descriptions(self) -> None:
"""
Returns the instance type description of the cluster node. If the description
is not cached, it executes a platform specific command to fetch and cache it.
Load instance description file from resources based on platform type.
"""
key = self._get_instance_description_cache_key(node)
if key not in self.instance_descriptions_cache:
# Cache the instance description
self.instance_descriptions_cache[key] = self._exec_platform_describe_node_instance(node)
return self.instance_descriptions_cache[key]

def init_instance_descriptions(self) -> None:
pass
platform = CspEnv.pretty_print(self.cloud_ctxt['platformType'])
instance_description_file_path = Utils.resource_path(f'{platform}-instance-catalog.json')
self.logger.info('Loading instance descriptions from file: %s', instance_description_file_path)
self.instance_descriptions = JSONPropertiesContainer(instance_description_file_path)

def describe_node_instance(self, instance_type: str) -> str:
instance_info = self.instance_descriptions.get_value(instance_type)
Expand Down
4 changes: 2 additions & 2 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ def _process_gpu_cluster_worker_node():
if gpu_cluster_obj:
worker_node = gpu_cluster_obj.get_worker_node()
worker_node._pull_and_set_mc_props(cli=self.ctxt.platform.cli) # pylint: disable=protected-access
sys_info = worker_node._pull_sys_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access
gpu_info = worker_node._pull_gpu_hw_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access
sys_info = worker_node._pull_sys_info() # pylint: disable=protected-access
gpu_info = worker_node._pull_gpu_hw_info() # pylint: disable=protected-access
worker_node.hw_info = NodeHWInfo(sys_info=sys_info, gpu_info=gpu_info)

except Exception as e: # pylint: disable=broad-except
Expand Down
18 changes: 0 additions & 18 deletions user_tools/tests/test_diagnostic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from cli_test_helpers import ArgvContext, EnvironContext # pylint: disable=import-error

from spark_rapids_pytools import wrapper
from .spark_rapids_tools_ut import conftest
from .mock_cluster import mock_live_cluster


Expand Down Expand Up @@ -240,20 +239,3 @@ def test_cancel_confirm(self, build_mock, cloud, user_input, capsys):

assert 'User canceled the operation' in stderr
assert 'Raised an error in phase [Process-Arguments]' in stderr

@pytest.fixture(scope='function', autouse=True)
def mock_databricks_azure_instance_description(self, cloud, monkeypatch):
"""
Mock `init_instances_description()` for Databricks Azure tests to return a test JSON file path.
This is needed to avoid creating an actual azure-instance-catalog.json file in the `CACHE_FOLDER`.
"""
def mock_init_instances_description(_):
resource_dir = conftest.get_test_resources_path()
test_azure_catalog_file = 'cluster/databricks/test-azure-instances-catalog.json'
return os.path.join(resource_dir, test_azure_catalog_file)

if cloud == 'databricks-azure':
monkeypatch.setattr(
'spark_rapids_pytools.cloud_api.databricks_azure.DBAzureCMDDriver.init_instances_description',
mock_init_instances_description
)

0 comments on commit 5c01aa4

Please sign in to comment.