Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync w/ internal repo; update models #1083

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

16 changes: 14 additions & 2 deletions user_tools/src/spark_rapids_tools/tools/qualx/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
'scaleFactor',
'sparkVersion',
'sqlID',
'xgpu_appDuration',
]
)

Expand Down Expand Up @@ -164,6 +165,8 @@ def predict(
]
if 'split' in cpu_aug_tbl:
select_columns.append('split')
if 'xgpu_appDuration' in cpu_aug_tbl:
select_columns.append('xgpu_appDuration')

# join predictions with select input features
results_df = (
Expand Down Expand Up @@ -230,9 +233,18 @@ def extract_model_features(
)
# train/validation dataset with CPU + GPU runs
gpu_aug_tbl = gpu_aug_tbl[
['appName', 'scaleFactor', 'sqlID', 'Duration', 'description']
[
'appName',
'scaleFactor',
'sqlID',
'Duration',
'description',
'appDuration',
]
]
gpu_aug_tbl = gpu_aug_tbl.rename(columns={'Duration': 'xgpu_Duration'})
gpu_aug_tbl = gpu_aug_tbl.rename(
columns={'Duration': 'xgpu_Duration', 'appDuration': 'xgpu_appDuration'}
)
cpu_aug_tbl = cpu_aug_tbl.merge(
gpu_aug_tbl,
on=['appName', 'scaleFactor', 'sqlID', 'description'],
Expand Down
31 changes: 19 additions & 12 deletions user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ def infer_app_meta(eventlogs: List[str]) -> Mapping[str, Mapping]:
)
toc_list.append(tmp)

if toc_list:
if not toc_list:
raise ValueError(f'No CSV files found for: {ds_name}')
else:
Comment on lines +333 to +335
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets set the standard:

  • raising an error: it means something is really bad. In that case do we fallback to legacy Speedup?
  • Return a default DF with speedup 1.0: this is the case when the QualX says that there is no speedup because there are ceratin features, files/columns I cannot read/empty. Then the speedup is 1.0

For the case when it is "not toc_list", do we want to raise an error? or return 1.0 after logging that files were missing?
CC: @eordentlich and @parthosa

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amahussein this should be a very rare case. Basically, the user has requested to load CSV files for a dataset, and we have found none at all for the entire dataset. Previously, we would just skip this dataset (failing silently), so added this code to raise it to the user's attention. Presumably, the legacy Speedup would also fail in this case too (or would it just return 1.0?).

toc = pd.concat(toc_list)
raw_features = extract_raw_features(toc, node_level_supp, qualtool_filter)
if raw_features.empty:
Expand Down Expand Up @@ -415,7 +417,7 @@ def combine_tables(table_name: str) -> pd.DataFrame:

# normalize WholeStageCodegen labels
ops_tbl.loc[
ops_tbl['nodeName'].str.startswith('WholeStageCodegen') == True, 'nodeName'
ops_tbl['nodeName'].str.startswith('WholeStageCodegen'), 'nodeName'
] = 'WholeStageCodegen'

# format WholeStageCodegen for merging
Expand Down Expand Up @@ -449,7 +451,7 @@ def combine_tables(table_name: str) -> pd.DataFrame:
]
for op in dynamic_op_labels:
sql_ops_counter.loc[
sql_ops_counter['nodeName'].str.startswith(op) == True, 'nodeName'
sql_ops_counter['nodeName'].str.startswith(op), 'nodeName'
] = op

# count occurrences
Expand Down Expand Up @@ -533,7 +535,7 @@ def combine_tables(table_name: str) -> pd.DataFrame:

# filter rows w/ sqlID
sql_job_agg_tbl['hasSqlID'] = sql_job_agg_tbl['sqlID'] >= 0
full_tbl = sql_job_agg_tbl[sql_job_agg_tbl['hasSqlID'] == True]
full_tbl = sql_job_agg_tbl[sql_job_agg_tbl['hasSqlID']]

# add runType features from toc
app_runtype = toc[['appId', 'runType']].drop_duplicates()
Expand Down Expand Up @@ -929,21 +931,22 @@ def scan_tbl(
sql_to_stage, left_on='ID', right_on='stageId'
)
total_stage_time = (
stage_agg_tbl[['sqlID', 'ID', 'Duration']]
stage_agg_tbl[['sqlID', 'Duration']]
.groupby('sqlID')
.agg('sum')
.reset_index()
)
failed_stage_time = (
stage_agg_tbl[['sqlID', 'ID', 'Duration']]
.merge(failed_stages, left_on='ID', right_on='stageId', how='inner')
.merge(failed_stages, left_on='ID', right_on='stageId', how='inner')[
['sqlID', 'Duration']
]
.groupby('sqlID')
.agg('sum')
.reset_index()
.drop(columns=['sqlID'])
)
stage_times = total_stage_time.merge(
failed_stage_time, on='ID', how='inner'
failed_stage_time, on='sqlID', how='inner'
)
stage_times.info()
sqls_to_drop = set(
Expand Down Expand Up @@ -1074,10 +1077,14 @@ def load_qtool_execs(qtool_execs: List[str]) -> Optional[pd.DataFrame]:
if qtool_execs:
exec_info = pd.concat([pd.read_csv(f) for f in qtool_execs])
node_level_supp = exec_info.copy()
node_level_supp['Exec Is Supported'] = node_level_supp[
'Exec Is Supported'
] | node_level_supp['Exec Name'].apply(
lambda x: any([x.startswith(nm) for nm in unsupported_overrides])
node_level_supp['Exec Is Supported'] = (
node_level_supp['Exec Is Supported']
| node_level_supp['Exec Name'].apply(
lambda x: any([x.startswith(nm) for nm in unsupported_overrides])
)
| node_level_supp['Exec Name'].apply(
lambda x: x.startswith('WholeStageCodegen')
)
Comment on lines +1080 to +1087
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that change was rolled back..Just double checking.

Copy link
Collaborator Author

@leewyang leewyang Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per @eordentlich's commit to internal repo, this is a workaround for #860.

)
node_level_supp = (
node_level_supp[['App ID', 'SQL ID', 'SQL Node Id', 'Exec Is Supported']]
Expand Down
27 changes: 15 additions & 12 deletions user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import pandas as pd
import traceback
import xgboost as xgb
from importlib_resources import files as package_files
from pathlib import Path
from spark_rapids_tools.tools.qualx.preprocess import (
load_datasets,
Expand Down Expand Up @@ -126,12 +125,14 @@ def _compute_summary(results):
'Duration_supported',
# actual
'gpuDuration',
'xgpu_appDuration',
]
cols = [col for col in result_cols if col in results.columns]
# compute per-app stats
summary = (
results[cols].groupby(['appName', 'appId', 'appDuration']).sum().reset_index()
)
group_by_cols = ['appName', 'appId', 'appDuration']
if 'xgpu_appDuration' in results:
group_by_cols.append('xgpu_appDuration')
summary = results[cols].groupby(group_by_cols).sum().reset_index()
if 'gpuDuration' in summary:
summary['gpuDuration'] = summary['gpuDuration'].replace(0.0, np.nan)

Expand All @@ -153,9 +154,8 @@ def _compute_summary(results):
# for datasets w/ labels, reconstruct actual speedup per-app
# TODO: get this from actual CSV file?
if 'y' in results:
summary['appDuration_actual'] = (
summary['appDuration'] - summary['Duration'] + summary['gpuDuration']
)
assert 'xgpu_appDuration' in summary
summary = summary.rename({'xgpu_appDuration': 'appDuration_actual'}, axis=1)
summary['speedup_actual'] = (
summary['appDuration'] / summary['appDuration_actual']
)
Expand Down Expand Up @@ -199,6 +199,7 @@ def _predict(
if 'y' not in results:
results['y'] = np.nan
results['gpuDuration'] = 0.0
results['xgpu_appDuration'] = 0.0

# compute per-app speedups
summary = _compute_summary(results)
Expand All @@ -222,7 +223,7 @@ def _read_dataset_scores(
eval_dir: str
Path to the output directory of a `qualx evaluate` run.
score: str
Type of metric to report: MAPE, wMAPE.
Type of metric to report.
granularity: str
Aggregation level for metric: sql, app.
split: str
Expand Down Expand Up @@ -263,7 +264,7 @@ def _read_platform_scores(
eval_dir: str
Path to the output directory of a `qualx evaluate` run.
score: str
Type of metric to report: MAPE, wMAPE.
Type of metric to report
split: str
Name of data split to report: train, test, all.
"""
Expand Down Expand Up @@ -321,7 +322,9 @@ def models():
"""Show available pre-trained models."""
available_models = [
model.replace('.json', '')
for model in os.listdir(package_files('qualx').joinpath('models'))
for model in os.listdir(
Path(Utils.resource_path('qualx/models/xgboost'))
)
]
for model in sorted(available_models):
print(model)
Expand Down Expand Up @@ -830,7 +833,7 @@ def compare(
previous: str,
current: str,
*,
score: str = 'wMAPE',
score: str = 'dMAPE',
granularity: str = 'sql',
split: str = 'all',
):
Expand All @@ -845,7 +848,7 @@ def compare(
current: str
Path to current evaluation results directory.
score: str
Type of score to compare: 'MAPE' (default) or 'wMAPE'.
Type of score to compare: 'MAPE', 'wMAPE', or 'dMAPE' (default).
granularity: str
Granularity of score to compare: 'sql' (default) or 'app'.
split: str
Expand Down