Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow-up 1318: Fix QualX fallback with default speedup and duration columns #1330

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 37 additions & 24 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,12 @@ def create_stdout_table_pprinter(total_apps: pd.DataFrame,
df = self.__update_apps_with_prediction_info(df,
self.ctxt.get_ctxt('estimationModelArgs'))
except Exception as e: # pylint: disable=broad-except
self.logger.error('Unable to use XGBoost estimation model for speed ups. '
'Falling-back to default model. Reason - %s:%s', type(e).__name__, e)
# If an error occurs while updating the apps with prediction info (speedups and durations),
# raise an error and stop the execution as the tool cannot continue without this information.
raise RuntimeError(
'Failed to use XGBoost estimation model for speedups. Qualification tool cannot continue. '
f'Reason - {type(e).__name__}: {e}'
) from e

# 2. Operations related to cluster information
try:
Expand Down Expand Up @@ -512,32 +516,41 @@ def __update_apps_with_prediction_info(self,
model_name = self.ctxt.platform.get_prediction_model_name()
qual_output_dir = self.ctxt.get_local('outputFolder')
output_info = self.__build_prediction_output_files_info()
predictions_df = predict(platform=model_name, qual=qual_output_dir,
output_info=output_info,
model=estimation_model_args['customModelFile'])
try:
predictions_df = predict(platform=model_name, qual=qual_output_dir,
output_info=output_info,
model=estimation_model_args['customModelFile'])
except Exception as e: # pylint: disable=broad-except
predictions_df = pd.DataFrame()
self.logger.error(
'Failed to execute the prediction model. Using default speed up of 1.0 for all apps. '
'Reason - %s:%s', type(e).__name__, e)

if predictions_df.empty:
return all_apps

result_info = self.ctxt.get_value('local', 'output', 'predictionModel', 'updateResult')
# Merge with a left join to include all rows from all apps and relevant rows from model predictions
result_df = pd.merge(all_apps, predictions_df[result_info['subsetColumns']],
how='left', left_on='App ID', right_on='appId')
# Update columns in all apps with values from corresponding XGBoost columns.
for remap_column in result_info['remapColumns']:
src_col, dst_col = remap_column['srcCol'], remap_column['dstCol']
if src_col in result_df and dst_col in result_df:
result_df[dst_col] = result_df[src_col]
result_df.rename(columns={'speedup': 'Estimated GPU Speedup'},
inplace=True)
# if the qualx does not have a speedup value, default to 1.0
result_df['Estimated GPU Speedup'].fillna(1.0, inplace=True)
# if the qualx does not have a speedup value, default to App Duration
result_df['Estimated GPU Duration'].fillna(result_df['App Duration'], inplace=True)
result_df = all_apps.copy()
# If the prediction model fails, set the estimated GPU speedup to 1.0 and the estimated GPU duration to
# the app duration.
result_df['Estimated GPU Speedup'] = 1.0
result_df['Estimated GPU Duration'] = result_df['App Duration']
else:
result_info = self.ctxt.get_value('local', 'output', 'predictionModel', 'updateResult')
# Merge with a left join to include all rows from all apps and relevant rows from model predictions
result_df = pd.merge(all_apps, predictions_df[result_info['subsetColumns']],
how='left', left_on='App ID', right_on='appId')
# Replace columns in all apps with values from corresponding XGBoost columns.
for remap_column in result_info['remapColumns']:
src_col, dst_col = remap_column['srcCol'], remap_column['dstCol']
# Drop the dest column if it exists
result_df.drop(columns=dst_col, errors='ignore', inplace=True)
# Rename the source column to the destination column
result_df.rename(columns={src_col: dst_col}, errors='ignore', inplace=True)
# if the qualx does not have a speedup value, default to 1.0
result_df['Estimated GPU Speedup'].fillna(1.0, inplace=True)
# if the qualx does not have a duration value, default to App Duration
result_df['Estimated GPU Duration'].fillna(result_df['App Duration'], inplace=True)
# We need to be careful about other columns that depend on remapped columns
result_df['Estimated GPU Time Saved'] = result_df['App Duration'] - result_df['Estimated GPU Duration']
# drop the subset_cols and ignore the errors in case some columns within the subset got renamed.
return result_df.drop(columns=result_info['subsetColumns'], errors='ignore')
return result_df

def _write_app_metadata(self, tools_processed_apps: pd.DataFrame,
metadata_file_info: dict, config_recommendations_dir_info: dict) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ local:
- 'speedup'
- 'appDuration_pred'
remapColumns:
- srcCol: 'speedup'
dstCol: 'Estimated GPU Speedup'
- srcCol: 'appDuration_pred'
dstCol: 'Estimated GPU Duration'
clusterInference:
Expand Down
30 changes: 11 additions & 19 deletions user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
load_profiles,
load_qtool_execs,
load_qual_csv,
PREPROCESSED_FILE,
ScanTblError
PREPROCESSED_FILE
)
from spark_rapids_tools.tools.qualx.model import (
extract_model_features,
Expand Down Expand Up @@ -573,24 +572,17 @@ def predict(
'platform': platform,
}

profile_df = pd.DataFrame()
try:
logger.info('Loading dataset: %s', dataset_name)
profile_df = load_profiles(
datasets=datasets,
node_level_supp=node_level_supp,
qual_tool_filter=qual_tool_filter,
qual_tool_output=qual_tool_output
)
# reset appName to original
profile_df['appName'] = profile_df['appId'].map(app_id_name_map)
except ScanTblError:
# ignore
logger.error('Skipping invalid dataset: %s', dataset_name)

logger.info('Loading dataset: %s', dataset_name)
profile_df = load_profiles(
datasets=datasets,
node_level_supp=node_level_supp,
qual_tool_filter=qual_tool_filter,
qual_tool_output=qual_tool_output
)
if profile_df.empty:
# this is an error condition, and we should not fall back to the default predictions.
raise ValueError('Data preprocessing resulted in an empty dataset. Speedup predictions will be skipped.')
raise ValueError('Data preprocessing resulted in an empty dataset. Speedup predictions will default to 1.0.')
# reset appName to original
profile_df['appName'] = profile_df['appId'].map(app_id_name_map)

filter_str = (
f'with {qual_tool_filter} filtering'
Expand Down