Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate summary metadata file and fix node recommendation in python #1216

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 14 additions & 21 deletions user_tools/src/spark_rapids_pytools/common/cluster_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,24 @@ def get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dict
# If driver instance is not set, use the default value from platform configurations
if pd.isna(driver_instance):
driver_instance = self.platform.configs.get_value('clusterInference', 'defaultCpuInstances', 'driver')
# first try to read the Recommended setting from the scala tool side and if its not available fall back
# to the CPU inference done on the python side
executor_instance = cluster_info_df.get('Recommended Executor Instance')
if pd.notna(executor_instance):
self.logger.debug('GPU infer cluster executor instance rec is %s', executor_instance)
num_executor_nodes = cluster_info_df.get('Recommended Num Executor Nodes')
else:
num_executor_nodes = cluster_info_df.get('Num Executor Nodes')
executor_instance = cluster_info_df.get('Executor Instance')
num_executor_nodes = cluster_info_df.get('Num Executor Nodes')
executor_instance = cluster_info_df.get('Executor Instance')
if pd.isna(executor_instance):
# If executor instance is not set, use the default value based on the number of cores
cores_per_executor = cluster_info_df.get('Cores Per Executor')
execs_per_node = cluster_info_df.get('Num Executors Per Node')
total_cores_per_node = execs_per_node * cores_per_executor
# TODO - need to account for number of GPUs per executor
executor_instance = self.platform.get_matching_executor_instance(total_cores_per_node)
if pd.isna(executor_instance):
# If executor instance is not set, use the default value based on the number of cores
cores_per_executor = cluster_info_df.get('Cores Per Executor')
execs_per_node = cluster_info_df.get('Num Executors Per Node')
total_cores_per_node = execs_per_node * cores_per_executor
# TODO - need to account for number of GPUs per executor
executor_instance = self.platform.get_matching_executor_instance(total_cores_per_node)
if pd.isna(executor_instance):
self.logger.info('Unable to infer CPU cluster. No matching executor instance found for vCPUs = %s',
total_cores_per_node)
return None
self.logger.info('Unable to infer CPU cluster. No matching executor instance found for vCPUs = %s',
total_cores_per_node)
return None
return {
'DRIVER_INSTANCE': f'"{driver_instance}"',
'NUM_DRIVER_NODES': num_driver_nodes,
'NUM_DRIVER_NODES': int(num_driver_nodes),
'EXECUTOR_INSTANCE': f'"{executor_instance}"',
'NUM_EXECUTOR_NODES': num_executor_nodes
'NUM_EXECUTOR_NODES': int(num_executor_nodes)
}

def infer_cpu_cluster(self, cluster_info_df: pd.DataFrame) -> Optional[ClusterBase]:
Expand Down
168 changes: 43 additions & 125 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

"""Implementation class representing wrapper around the RAPIDS acceleration Qualification tool."""

import json
from dataclasses import dataclass, field
from math import ceil
Expand All @@ -31,6 +30,7 @@
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator
from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool
from spark_rapids_tools.enums import QualFilterApp, QualGpuClusterReshapeType, QualEstimationModel
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
from spark_rapids_tools.tools.qualx.qualx_main import predict
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.speedup_category import SpeedupCategory
Expand Down Expand Up @@ -127,41 +127,7 @@ def generate_report(self,
if FSUtil.resource_exists(abs_path): # check if the file exists
report_content.append(f' - {output_comment}: {abs_path}')

full_tunings_file = self.df_result['App ID'] + '.conf'
gpu_tunings_file = self.df_result['App ID'] + '.log'

# check to see if the tuning are actually there, assume if one tuning file is there,
# the other will be as well.
tunings_abs_path = FSUtil.get_abs_path(self.auto_tuning_path)
if FSUtil.resource_exists(tunings_abs_path): # check if the file exists
for index, file in gpu_tunings_file.items():
full_tunings_path = self.auto_tuning_path + '/' + file
abs_path = FSUtil.get_abs_path(full_tunings_path)
if not FSUtil.resource_exists(abs_path): # check if the file exists
gpu_tunings_file.at[index] = "Doesn't exist, see log"
full_tunings_file.at[index] = "Doesn't exist, see log"
else:
full_tunings_file = "Doesn't exist, see the stdout for errors"
gpu_tunings_file = "Doesn't exist, see the stdout for errors"

# 'all' is a special indication that all the applications need to use this same node
# recommendation vs the recommendations being per application
if 'all' in self.conversion_items:
print_result = self.df_result
print_result['Qualified Node Recommendation'] = self.conversion_items['all']
elif not self.conversion_items:
print_result = self.df_result
print_result['Qualified Node Recommendation'] = 'Not Available'
else:
# add the per app node conversions
conversion_column_dict = {'App ID': list(self.conversion_items.keys()),
'Qualified Node Recommendation': list(self.conversion_items.values())}
conversion_df = pd.DataFrame.from_dict(conversion_column_dict)
print_result = pd.merge(self.df_result, conversion_df, on=['App ID'], how='left')

print_result['Full Cluster Config Recommendations*'] = full_tunings_file
print_result['GPU Config Recommendation Breakdown*'] = gpu_tunings_file
pretty_df = df_pprinter(print_result)
pretty_df = df_pprinter(self.df_result)
self.filter_apps_count = len(pretty_df)
if pretty_df.empty:
# the results were reduced to no rows because of the filters
Expand All @@ -176,10 +142,9 @@ def generate_report(self,
report_content.append(f'{app_name} tool found no records to show.')

if self.filter_apps_count > 0:
report_content.append(f'* Config Recommendations can be found in {self.auto_tuning_path}')
report_content.append('** Estimated GPU Speedup Category assumes the user is using the node type '
'recommended and config recommendations with the same size cluster as was used '
'with the CPU side.')
self.comments.append('\'Estimated GPU Speedup Category\' assumes the user is using the node type '
'recommended and config recommendations with the same size cluster as was used '
'with the CPU side.')

report_content.append(Utils.gen_report_sec_header('Report Summary', hrule=False))
report_content.append(tabulate(self.__generate_report_summary(), colalign=('left', 'right')))
Expand Down Expand Up @@ -277,7 +242,6 @@ def _process_gpu_cluster_worker_node():
self.ctxt.set_ctxt('gpuClusterProxy', gpu_cluster_obj)

_process_gpu_cluster_worker_node()
self.__generate_cluster_recommendation_report()
if cpu_cluster and cpu_cluster.is_inferred:
# If the CPU cluster is inferred, we skip the auto-tuner as it is called after the Qualification tool.
return gpu_cluster_obj is not None
Expand Down Expand Up @@ -589,7 +553,7 @@ def __remap_cols_for_shape_type(self,

return subset_data

def __generate_mc_types_conversion_report(self):
def __generate_mc_types_conversion_report(self): # pylint: disable=unused-private-member
report_content = []
if bool(self.ctxt.platform.ctxt['notes']):
# get the converted instance types
Expand Down Expand Up @@ -784,58 +748,15 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series:
self.logger.error('Error computing cost savings. Reason - %s: %s. Skipping!', type(e).__name__, e)
return app_df_set

def __generate_cluster_recommendation_report(self):
"""
Generate the cluster shape recommendation as:
{
"clusterName": "1234-5678-test",
"sourceCluster": {"driverInstance": "m6gd.xlarge", "executorInstance": "m6gd.2xlarge", "numExecutors": 2 },
"targetCluster": {"driverInstance": "m6gd.xlarge", "executorInstance": "g5.2xlarge", "numExecutors": 2 }
}
"""
cpu_cluster = self.ctxt.get_ctxt('cpuClusterProxy')
gpu_cluster = self.ctxt.get_ctxt('gpuClusterProxy')
if cpu_cluster is None or gpu_cluster is None:
self.logger.warning('Cannot generate the cluster recommendation report because the cluster information is '
'not available.')
else:
try:
cpu_cluster_info = cpu_cluster.get_cluster_configuration()
gpu_cluster_info = gpu_cluster.get_cluster_configuration()
cluster_shape_recommendation = [{
'clusterName': cpu_cluster.get_name(),
'sourceCluster': cpu_cluster_info,
'targetCluster': gpu_cluster_info
}]
self.ctxt.set_ctxt('clusterShapeRecommendation', cluster_shape_recommendation)
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error generating the cluster recommendation report. '
'Reason - %s:%s', type(e).__name__, e)

def __write_cluster_recommendation_report(self, output_file: str):
"""
Write the cluster shape recommendation as a JSON file.
"""
cluster_recommendation = self.ctxt.get_ctxt('clusterShapeRecommendation')
if cluster_recommendation and output_file:
try:
with open(output_file, 'w', encoding='UTF-8') as f:
json.dump(cluster_recommendation, f, indent=2)
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error writing the cluster recommendation report to %s. '
'Reason - %s:%s', output_file, type(e).__name__, e)

def __build_global_report_summary(self,
all_apps: pd.DataFrame,
unsupported_ops_df: pd.DataFrame,
output_files_raw: dict) -> QualificationSummary:
if all_apps.empty:
# No need to run saving estimator or process the data frames.
return QualificationSummary(comments=self.__generate_mc_types_conversion_report())
return QualificationSummary()

output_files_info = JSONPropertiesContainer(output_files_raw, file_load=False)
self.__write_cluster_recommendation_report(output_files_info.get_value('intermediateOutput', 'files',
'clusterShapeRecommendation', 'path'))
unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output',
'unsupportedOperators'))
# Calculate unsupported operators stage duration before grouping
Expand Down Expand Up @@ -897,50 +818,19 @@ def __build_global_report_summary(self,
self.logger.info('Generating GPU Estimated Speedup: as %s', csv_out)
apps_reshaped_df.to_csv(csv_out, float_format='%.2f')
filter_top_candidate_enabled = self.ctxt.get_ctxt('filterApps') == QualFilterApp.TOP_CANDIDATES

conversion_items_summary = {}
cpu_cluster_info = self.ctxt.get_ctxt('cpuClusterProxy')
gpu_cluster_info = self.ctxt.get_ctxt('gpuClusterProxy')
if cpu_cluster_info:
if cpu_cluster_info is not None and gpu_cluster_info is not None:
cpu_instance_type = cpu_cluster_info.get_worker_node().instance_type
gpu_instance_type = gpu_cluster_info.get_worker_node().instance_type
if cpu_instance_type == gpu_instance_type:
conversion_items_summary['all'] = cpu_instance_type
else:
conversion_items_summary['all'] = cpu_instance_type + ' to ' + gpu_instance_type
else:
conversion_items_summary['all'] = cpu_cluster_info.get_worker_node().instance_type

gpu_cluster_info_per_app = self.ctxt.get_ctxt('gpuClusterInfoPerApp')
cpu_cluster_info_per_app = self.ctxt.get_ctxt('cpuClusterInfoPerApp')
if cpu_cluster_info_per_app is not None:
for app_id in cpu_cluster_info_per_app:
cpu_cluster_info = cpu_cluster_info_per_app[app_id]
gpu_cluster_info = gpu_cluster_info_per_app[app_id]
if cpu_cluster_info is not None and gpu_cluster_info is not None:
cpu_instance_type = cpu_cluster_info.get_worker_node().instance_type
gpu_instance_type = gpu_cluster_info.get_worker_node().instance_type
if cpu_instance_type == gpu_instance_type:
conversion_items_summary[app_id] = cpu_instance_type
else:
conversion_items_summary[app_id] = cpu_instance_type + ' to '\
+ gpu_instance_type

rapids_output_dir = self.ctxt.get_rapids_output_folder()
tunings_dir = FSUtil.build_path(rapids_output_dir,
self.ctxt.get_value('toolOutput', 'csv', 'tunings', 'subFolder'))

# Add columns for cluster configuration recommendations and tuning configurations to the processed_apps.
recommender = ClusterConfigRecommender(self.ctxt)
df_final_result = recommender.add_cluster_and_tuning_recommendations(df_final_result)
# Write the summary metadata
self._write_summary_metadata(df_final_result, output_files_info.get_value('summaryMetadata'),
output_files_info.get_value('configRecommendations'))
return QualificationSummary(comments=report_comments,
all_apps=apps_grouped_df,
recommended_apps=recommended_apps,
savings_report_flag=launch_savings_calc,
df_result=df_final_result,
irrelevant_speedups=speedups_irrelevant_flag,
sections_generators=[self.__generate_mc_types_conversion_report],
top_candidates_flag=filter_top_candidate_enabled,
conversion_items=conversion_items_summary,
auto_tuning_path=tunings_dir)
top_candidates_flag=filter_top_candidate_enabled)

def _process_output(self):
def process_df_for_stdout(raw_df):
Expand Down Expand Up @@ -1137,8 +1027,9 @@ def __infer_cluster_for_auto_tuning(self, cluster_info_df: pd.DataFrame):
# TODO - test executor instance picked up if there
# Infer the CPU cluster from the cluster information
cpu_cluster_obj = ClusterInference(platform=self.ctxt.platform).infer_cpu_cluster(single_cluster_df)
# Continue cluster inference for next app
if cpu_cluster_obj is None:
return
continue
cpu_cluster_dict[row['App ID']] = cpu_cluster_obj
# Log the inferred cluster information and set the context
self._log_inferred_cluster_info(cpu_cluster_obj)
Expand Down Expand Up @@ -1227,6 +1118,33 @@ def __update_apps_with_prediction_info(self,
result_df['Estimated GPU Time Saved'] = result_df['App Duration'] - result_df['Estimated GPU Duration']
return result_df.drop(columns=result_info['subsetColumns'])

def _write_summary_metadata(self, tools_processed_apps: pd.DataFrame,
metadata_file_info: dict, config_recommendations_dir_info: dict) -> None:
"""
Write the summary metadata to a JSON file.
:param tools_processed_apps: Processed applications from tools
:param metadata_file_info: Metadata file information
:param config_recommendations_dir_info: Configuration recommendations directory information
"""
summary_metadata_df = tools_processed_apps[metadata_file_info.get('columns')].copy()
if not summary_metadata_df.empty:
try:
# 1. prepend parent dir to the config recommendations columns (only for the JSON file, not stdout)
parent_dir = config_recommendations_dir_info.get('path')
for col in config_recommendations_dir_info.get('columns'):
if col in summary_metadata_df.columns:
summary_metadata_df[col] = summary_metadata_df[col].apply(
lambda conf_file: FSUtil.build_path(parent_dir, conf_file))

# 2. convert column names to camel case for JSON file writing
summary_metadata_df.rename(columns=Utilities.convert_to_camel_case, inplace=True)
summary_metadata_dict = summary_metadata_df.to_dict(orient='records')
with open(metadata_file_info.get('path'), 'w', encoding='UTF-8') as f:
json.dump(summary_metadata_dict, f, indent=2)
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error writing the summary metadata report. Reason - %s:%s',
type(e).__name__, e)


@dataclass
class QualificationAsLocal(Qualification):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,22 @@ local:
heuristics:
name: 'heuristics_info.csv'
outputComment: "Heuristics information"
clusterShapeRecommendation:
name: cluster_shape_recommendation.json
outputComment: "Cluster shape recommendation report"
summaryMetadata:
name: qualification_summary_metadata.json
outputComment: "Metadata for the summary report"
columns:
- 'App ID'
- 'App Name'
- 'Recommended Cluster'
- 'Estimated GPU Speedup Category'
- 'Full Cluster Config Recommendations*'
- 'GPU Config Recommendation Breakdown*'
configRecommendations:
name: 'rapids_4_spark_qualification_output/tuning'
outputComment: "Config Recommendations"
columns:
- 'Full Cluster Config Recommendations*'
- 'GPU Config Recommendation Breakdown*'
costColumns:
- 'Savings Based Recommendation'
- 'Estimated App Cost'
Expand Down
Loading