Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add plugin mechanism for dataset-specific preprocessing in qualx #1148

Merged
merged 5 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions user_tools/src/spark_rapids_tools/tools/qualx/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def train(
drop=True
)
if cpu_aug_tbl.shape[0] < original_num_rows:
logger.warn(
logger.warning(
f'Removed {original_num_rows - cpu_aug_tbl.shape[0]} rows with NaN label values'
)

Expand Down Expand Up @@ -142,7 +142,7 @@ def predict(
if missing:
raise ValueError(f'Input is missing model features: {missing}')
if extra:
logger.warn(f'Input had extra features not present in model: {extra}')
logger.warning(f'Input had extra features not present in model: {extra}')

X = cpu_aug_tbl[model_features]
y = cpu_aug_tbl[label_col] if label_col else None
Expand Down Expand Up @@ -212,7 +212,7 @@ def extract_model_features(
"""Extract model features from raw features."""
missing = expected_raw_features - set(df.columns)
if missing:
logger.warn(f'Input dataframe is missing expected raw features: {missing}')
logger.warning(f'Input dataframe is missing expected raw features: {missing}')

if FILTER_SPILLS:
df = df[
Expand All @@ -234,7 +234,7 @@ def extract_model_features(
gpu_aug_tbl = df[df['runType'] == 'GPU']
if gpu_aug_tbl.shape[0] > 0:
if gpu_aug_tbl.shape[0] != cpu_aug_tbl.shape[0]:
logger.warn(
logger.warning(
'Number of GPU rows ({}) does not match number of CPU rows ({})'.format(
gpu_aug_tbl.shape[0], cpu_aug_tbl.shape[0]
)
Expand Down Expand Up @@ -262,7 +262,7 @@ def extract_model_features(
if (
num_na / num_rows > 0.05
): # arbitrary threshold, misaligned sqlIDs still may 'match' most of the time
logger.warn(
logger.warning(
f'Percentage of NaN GPU durations is high: {num_na} / {num_rows} Per-sql actual speedups may be inaccurate.'
)

Expand Down Expand Up @@ -299,7 +299,7 @@ def extract_model_features(
raise ValueError(f'Input data is missing model features: {missing}')
if extra:
# remove extra columns
logger.warn(f'Input data has extra features (removed): {extra}')
logger.warning(f'Input data has extra features (removed): {extra}')
feature_cols = [c for c in feature_cols if c not in extra]

# add train/val/test split column, if split function provided
Expand Down
58 changes: 38 additions & 20 deletions user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
get_cache_dir,
get_logger,
get_dataset_platforms,
load_plugin,
run_profiler_tool, log_fallback,
)

Expand Down Expand Up @@ -224,9 +225,12 @@ def load_datasets(
)
profile_df = pd.read_parquet(f'{platform_cache}/{PREPROCESSED_FILE}')
if ignore_test:
# remove any 'test' datasets from preprocessed data
# remove any 'test' datasets from cached data by filtering
# only appNames found in datasets structure
dataset_keys = list(datasets.keys())
profile_df = profile_df.loc[profile_df['appName'].isin(dataset_keys)]
profile_df['appName_base'] = profile_df['appName'].str.split(':').str[0]
profile_df = profile_df.loc[profile_df['appName_base'].isin(dataset_keys)]
profile_df.drop(columns='appName_base', inplace=True)
else:
# otherwise, check for cached profiler output
profile_dir = f'{platform_cache}/profile'
Expand All @@ -253,7 +257,7 @@ def load_datasets(

# sanity check
if ds_count != len(all_datasets):
logger.warn(
logger.warning(
f'Duplicate dataset key detected, got {len(all_datasets)} datasets, but read {ds_count} datasets.'
)

Expand All @@ -270,28 +274,32 @@ def load_profiles(
"""Load dataset profiler CSV files as a pd.DataFrame."""

def infer_app_meta(eventlogs: List[str]) -> Mapping[str, Mapping]:
"""Given a list of paths to eventlogs, infer the app_meta from the path for each appId."""
eventlog_list = [find_eventlogs(os.path.expandvars(e)) for e in eventlogs]
eventlog_list = list(chain(*eventlog_list))
app_meta = {}
for e in eventlog_list:
parts = Path(e).parts
appId = parts[-1]
runType = parts[-2].upper()
description = parts[-4]
jobName = parts[-4]
app_meta[appId] = {
'jobName': jobName,
'runType': runType,
'description': description,
'scaleFactor': 1,
}
return app_meta

plugins = []
all_raw_features = []
# get list of csv files from each profile
for ds_name, ds_meta in datasets.items():
toc_list = []
app_meta = ds_meta.get('app_meta', None)
platform = ds_meta.get('platform', 'onprem')
scalefactor_meta = ds_meta.get('scaleFactorFromSqlIDRank', None)
if 'load_profiles_hook' in ds_meta:
plugins.append(ds_meta['load_profiles_hook'])

if not app_meta:
# if no 'app_meta' key provided, infer app_meta from directory structure of eventlogs
Expand Down Expand Up @@ -358,29 +366,39 @@ def infer_app_meta(eventlogs: List[str]) -> Mapping[str, Mapping]:
app_scales = toc[['appId', 'scaleFactor']].drop_duplicates()
raw_features = raw_features.merge(app_scales, on='appId')

# override description from app_meta (if available)
if 'description' in app_meta[list(app_meta.keys())[0]]:
app_desc = {
appId: meta['description']
# add jobName to appName from app_meta (if available)
if 'jobName' in app_meta[list(app_meta.keys())[0]]:
app_job = {
appId: meta['jobName']
for appId, meta in app_meta.items()
if 'description' in meta
if 'jobName' in meta
}
raw_features['description'] = raw_features['appId'].map(app_desc)
# append also to appName to allow joining cpu and gpu logs at the app level
raw_features['jobName'] = raw_features['appId'].map(app_job)
# append jobName to appName to allow joining cpu and gpu logs at the app level
raw_features['appName'] = (
raw_features['appName'] + '_' + raw_features['description']
raw_features['appName'] + ':' + raw_features['jobName']
)
raw_features.drop(columns=['jobName'], inplace=True)

# add platform from app_meta
raw_features[f'platform_{platform}'] = 1
raw_features = impute(raw_features)
all_raw_features.append(raw_features)
return (

profile_df = (
pd.concat(all_raw_features).reset_index(drop=True)
if all_raw_features
else pd.DataFrame()
)

# run any plugin hooks on profile_df
for p in plugins:
plugin = load_plugin(p)
if plugin:
profile_df = plugin.load_profiles_hook(profile_df)

return profile_df


def extract_raw_features(
toc: pd.DataFrame,
Expand Down Expand Up @@ -715,15 +733,15 @@ def impute(full_tbl: pd.DataFrame) -> pd.DataFrame:
missing = sorted(expected_raw_features - actual_features)
extra = sorted(actual_features - expected_raw_features)
if missing:
logger.warn(f'Imputing missing features: {missing}')
logger.warning(f'Imputing missing features: {missing}')
for col in missing:
if col != 'fraction_supported':
full_tbl[col] = 0
else:
full_tbl[col] = 1.0

if extra:
logger.warn(f'Removing extra features: {extra}')
logger.warning(f'Removing extra features: {extra}')
full_tbl = full_tbl.drop(columns=extra)

# one last check after modifications (update expected_raw_features if needed)
Expand Down Expand Up @@ -757,7 +775,7 @@ def scan_tbl(
)
except Exception:
if warn_on_error or abort_on_error:
logger.warn(f'Failed to load {tb_name} for {app_id}.')
logger.warning(f'Failed to load {tb_name} for {app_id}.')
if abort_on_error:
raise ScanTblError()
out = pd.DataFrame()
Expand Down Expand Up @@ -993,7 +1011,7 @@ def scan_tbl(
)

if sqls_to_drop:
logger.warn(
logger.warning(
f'Ignoring sqlIDs {sqls_to_drop} due to excessive failed/cancelled stage duration.'
)

Expand Down Expand Up @@ -1074,12 +1092,12 @@ def scan_tbl(
aborted_sql_ids = set()

if aborted_sql_ids:
logger.warn(f'Ignoring sqlIDs {aborted_sql_ids} due to aborted jobs.')
logger.warning(f'Ignoring sqlIDs {aborted_sql_ids} due to aborted jobs.')

sqls_to_drop = sqls_to_drop.union(aborted_sql_ids)

if sqls_to_drop:
logger.warn(
logger.warning(
f'Ignoring a total of {len(sqls_to_drop)} sqlIDs due to stage/job failures.'
)
app_info_mg = app_info_mg.loc[~app_info_mg.sqlID.isin(sqls_to_drop)]
Expand Down
14 changes: 7 additions & 7 deletions user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def _read_dataset_scores(
nan_df['model'] + '/' + nan_df['platform'] + '/' + nan_df['dataset']
)
keys = list(nan_df['key'].unique())
logger.warn(f'Dropped rows w/ NaN values from: {eval_dir}: {keys}')
logger.warning(f'Dropped rows w/ NaN values from: {eval_dir}: {keys}')

return df

Expand Down Expand Up @@ -298,7 +298,7 @@ def _read_platform_scores(
nan_df['model'] + '/' + nan_df['platform'] + '/' + nan_df['dataset']
)
keys = list(nan_df['key'].unique())
logger.warn(f'Dropped rows w/ NaN values from: {eval_dir}: {keys}')
logger.warning(f'Dropped rows w/ NaN values from: {eval_dir}: {keys}')

# compute accuracy by platform
scores = {}
Expand Down Expand Up @@ -395,8 +395,8 @@ def train(

# sanity check
if set(dataset_list) != set(profile_datasets):
logger.error(
'Training data contained datasets: {profile_datasets}, expected: {dataset_list}.'
logger.warning(
f'Training data contained datasets: {profile_datasets}, expected: {dataset_list}.'
)

features, feature_cols, label_col = extract_model_features(profile_df, split_nds)
Expand Down Expand Up @@ -448,7 +448,7 @@ def predict(
)
appIds = [Path(p).name for p in appIds]
if len(appIds) == 0:
logger.warn(f'Skipping empty metrics directory: {metrics_dir}')
logger.warning(f'Skipping empty metrics directory: {metrics_dir}')
else:
try:
for appId in appIds:
Expand Down Expand Up @@ -522,7 +522,7 @@ def predict(
logger.error(e)
traceback.print_exc(e)
else:
logger.warn(f'Predicted speedup will be 1.0 for dataset: {dataset}. Check logs for details.')
logger.warning(f'Predicted speedup will be 1.0 for dataset: {dataset}. Check logs for details.')
# TODO: Writing CSV reports for all datasets to the same location. We should write to separate directories.
write_csv_reports(per_sql_summary, per_app_summary, output_info)
dataset_summaries.append(per_app_summary)
Expand Down Expand Up @@ -932,7 +932,7 @@ def compare(
# warn user of any new datasets
added = curr_datasets - prev_datasets
if added:
logger.warn(f'New datasets added, comparisons may be skewed: added={added}')
logger.warning(f'New datasets added, comparisons may be skewed: added={added}')


def entrypoint():
Expand Down
26 changes: 25 additions & 1 deletion user_tools/src/spark_rapids_tools/tools/qualx/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from concurrent.futures import ThreadPoolExecutor, as_completed
import glob
import importlib
import logging
import os
import re
Expand All @@ -23,6 +24,7 @@
import subprocess
import numpy as np
import pandas as pd
import types
from datetime import datetime, timezone
from pathlib import Path
from tabulate import tabulate
Expand Down Expand Up @@ -185,6 +187,28 @@ def compute_accuracy(
return scores


def load_plugin(plugin_path: str) -> types.ModuleType:
"""Dynamically load plugin modules with helper functions for dataset-specific code.

Supported APIs:

def load_profiles_hook(df: pd.DataFrame) -> pd.DataFrame:
# add dataset-specific modifications
return df
"""
plugin_path = os.path.expandvars(plugin_path)
plugin_name = Path(plugin_path).name.split('.')[0]
if os.path.exists(plugin_path):
spec = importlib.util.spec_from_file_location(plugin_name, plugin_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
logger.info(f'Successfully loaded plugin: {plugin_path}')
return module
else:
logger.warning(f'Failed to load plugin: {plugin_path}')
return None


def random_string(length: int) -> str:
"""Return a random hexadecimal string of a specified length."""
return ''.join(secrets.choice(string.hexdigits) for _ in range(length))
Expand All @@ -210,7 +234,7 @@ def run_profiler_tool(platform: str, eventlog: str, output_dir: str):
# f'spark_rapids_user_tools {platform} profiling --csv --eventlogs {log} --local_folder {output}'
'java -Xmx64g -cp $SPARK_RAPIDS_TOOLS_JAR:$SPARK_HOME/jars/*:$SPARK_HOME/assembly/target/scala-2.12/jars/* '
'com.nvidia.spark.rapids.tool.profiling.ProfileMain '
f'--platform {platform} --csv -o {output} {log}'
f'--platform {platform} --csv --output-sql-ids-aligned -o {output} {log}'
)
cmds.append(cmd)
run_commands(cmds)
Expand Down