Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add heuristics using stage spill metrics to skip apps #1002

Merged
47 changes: 32 additions & 15 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

from spark_rapids_pytools.cloud_api.sp_types import ClusterReshape, NodeHWInfo, SparkNodeType
from spark_rapids_pytools.common.cluster_inference import ClusterInference
from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator
from spark_rapids_pytools.rapids.rapids_job import RapidsJobPropContainer
from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool
from spark_rapids_tools.enums import QualFilterApp, QualGpuClusterReshapeType, QualEstimationModel
from spark_rapids_tools.tools.model_xgboost import predict
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.speedup_category import SpeedupCategory
from spark_rapids_tools.tools.top_candidates import TopCandidates
from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration
Expand Down Expand Up @@ -441,9 +443,13 @@ def __group_apps_by_name(self, all_apps) -> (pd.DataFrame, str):
all_apps_count = len(all_apps)

group_info = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'groupColumns')
for group_col in group_info['columns']:
valid_group_cols = Utilities.get_valid_df_columns(group_info['keys'], all_apps)
all_apps[group_col] = all_apps.groupby(valid_group_cols)[group_col].transform('mean')
valid_group_cols = Utilities.get_valid_df_columns(group_info['keys'], all_apps)
for agg_info in group_info['aggregate']:
agg_col = agg_info['column']
if agg_col in all_apps.columns:
# Group by columns can contain NaN values, so we need to include them in the grouping
all_apps[agg_col] = all_apps.groupby(valid_group_cols, dropna=False)[agg_col].transform(
agg_info['function'])

drop_arr = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'dropDuplicates')
valid_drop_cols = Utilities.get_valid_df_columns(drop_arr, all_apps)
Expand Down Expand Up @@ -686,22 +692,29 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series:
def __build_global_report_summary(self,
all_apps: pd.DataFrame,
unsupported_ops_df: pd.DataFrame,
output_files_info: dict) -> QualificationSummary:
output_files_raw: dict) -> QualificationSummary:
if all_apps.empty:
# No need to run saving estimator or process the data frames.
return QualificationSummary(comments=self.__generate_mc_types_conversion_report())

output_files_info = JSONPropertiesContainer(output_files_raw, file_load=False)
unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output',
'unsupportedOperators'))
# Calculate unsupported operators stage duration before grouping
all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df)
apps_pruned_df = self.__remap_columns_and_prune(all_apps)
# Apply additional heuristics to skip apps not suitable for GPU acceleration
heuristics_ob = AdditionalHeuristics(
props=self.ctxt.get_value('local', 'output', 'additionalHeuristics'),
tools_output_dir=self.ctxt.get_local('outputFolder'),
output_file=output_files_info.get_value('intermediateOutput', 'files', 'heuristics', 'path'))
apps_pruned_df = heuristics_ob.apply_heuristics(apps_pruned_df)
speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories'))
# Calculate the speedup category column
apps_pruned_df = speedup_category_ob.build_category_column(apps_pruned_df)
apps_pruned_df.to_csv(output_files_info['full']['path'], float_format='%.2f')
# Calculate the speedup category column, send a copy of the dataframe to avoid modifying the original
apps_pruned_result = speedup_category_ob.build_category_column(apps_pruned_df.copy())
apps_pruned_result.to_csv(output_files_info.get_value('full', 'path'), float_format='%.2f')
# Group the applications and recalculate metrics
apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df)
# Recalculate the speedup category column after grouping
apps_grouped_df = speedup_category_ob.build_category_column(apps_grouped_df)
recommended_apps = self.__get_recommended_apps(apps_grouped_df)
# if the gpu_reshape_type is set to JOB then, then we should ignore recommended apps
Expand All @@ -726,7 +739,7 @@ def __build_global_report_summary(self,
'clusterShapeCols', 'columnName')
speed_recommendation_col = self.ctxt.get_value('local', 'output', 'speedupRecommendColumn')
apps_reshaped_df, per_row_flag = self.__apply_gpu_cluster_reshape(apps_grouped_df)
csv_out = output_files_info['summary']['path']
csv_out = output_files_info.get_value('summary', 'path')
if launch_savings_calc:
# Now, the dataframe is ready to calculate the cost and the savings
apps_working_set = self.__calc_apps_cost(apps_reshaped_df,
Expand Down Expand Up @@ -950,15 +963,19 @@ def __build_prediction_output_files_info(self) -> dict:
FSUtil.make_dirs(output_dir)
return self.__update_files_info_with_paths(predictions_info['files'], output_dir)

@staticmethod
def __update_files_info_with_paths(files_info: dict, output_dir: str) -> dict:
@classmethod
def __update_files_info_with_paths(cls, files_info: dict, output_dir: str) -> dict:
"""
Update the given files_info dictionary with full file paths.
"""
for entry in files_info:
file_name = files_info[entry]['name']
file_path = FSUtil.build_path(output_dir, file_name)
files_info[entry]['path'] = file_path
for _, entry in files_info.items():
file_name = entry['name']
path = FSUtil.build_path(output_dir, file_name)
# if entry is a directory, create the directory and update the files info recursively
if entry.get('isDirectory'):
FSUtil.make_dirs(path)
entry['files'] = cls.__update_files_info_with_paths(entry['files'], path)
entry['path'] = path
return files_info

def __update_apps_with_prediction_info(self, all_apps: pd.DataFrame) -> pd.DataFrame:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,17 @@ toolOutput:
- Cluster Id
- Cluster Name
- App Name
columns:
- App Duration
- Estimated GPU Duration
- Unsupported Operators Stage Duration
aggregate:
- column: App Duration
function: 'mean'
- column: Estimated GPU Duration
function: 'mean'
- column: Unsupported Operators Stage Duration
function: 'mean'
- column: Skip by Heuristics
function: 'any'
- column: Skip by Heuristics Reason
function: 'concat'
dropDuplicates:
- Vendor
- Driver Host
Expand Down Expand Up @@ -113,6 +120,14 @@ local:
full:
name: qualification_summary_full.csv
outputComment: "Full savings and speedups CSV report"
intermediateOutput:
name: 'intermediate_output'
outputComment: "Intermediate output generated by tools"
isDirectory: true
files:
heuristics:
name: 'heuristics_info.csv'
outputComment: "Heuristics information"
costColumns:
- 'Savings Based Recommendation'
- 'Estimated App Cost'
Expand Down Expand Up @@ -232,6 +247,7 @@ local:
speedupCategories:
speedupColumnName: 'Estimated GPU Speedup'
categoryColumnName: 'Estimated GPU Speedup Category'
heuristicsColumnName: 'Skip by Heuristics'
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
Expand All @@ -253,6 +269,29 @@ local:
lowerBound: 0.0
upperBound: 25.0
defaultCategory: 'Not Recommended'
additionalHeuristics:
appInfo:
fileName: 'application_information.csv'
resultCols:
- 'App ID'
- 'Skip by Heuristics'
- 'Reason'
spillBased:
jobStageAggMetrics:
fileName: 'job_+_stage_level_aggregated_task_metrics.csv'
columns:
- 'ID'
- 'memoryBytesSpilled_sum'
sqlToStageInfo:
fileName: 'sql_to_stage_information.csv'
columns:
- 'stageId'
- 'SQL Nodes(IDs)'
spillThresholdBytes: 10737418240
allowedExecs:
- 'Aggregate'
- 'Join'
- 'Sort'
predictionModel:
outputDirectory: 'xgboost_predictions'
files:
Expand Down
128 changes: 128 additions & 0 deletions user_tools/src/spark_rapids_tools/tools/additional_heuristics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implementation class for Additional Heuristics logic."""

import os
import re
from dataclasses import dataclass, field
from logging import Logger

import pandas as pd

from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.utilities import ToolLogging
from spark_rapids_tools.tools.model_xgboost import find_paths, RegexPattern
from spark_rapids_tools.utils import Utilities


@dataclass
class AdditionalHeuristics:
"""
Encapsulates the logic to apply additional heuristics to skip applications.
"""
logger: Logger = field(default=None, init=False)
props: JSONPropertiesContainer = field(default=None, init=False)
tools_output_dir: str = field(default=None, init=False)
output_file: str = field(default=None, init=False)

def __init__(self, props: dict, tools_output_dir: str, output_file: str):
self.props = JSONPropertiesContainer(props, file_load=False)
self.tools_output_dir = tools_output_dir
self.output_file = output_file
self.logger = ToolLogging.get_and_setup_logger(f'rapids.tools.{self.__class__.__name__}')

def _apply_heuristics(self, app_ids: list) -> pd.DataFrame:
"""
Apply additional heuristics to applications to determine if they can be accelerated on GPU.
"""
profile_list = find_paths(
self.tools_output_dir,
RegexPattern.rapids_profile.match,
return_directories=True,
)
if len(profile_list) == 0:
self.logger.warning('No RAPIDS profiles found in output directory: %s', self.tools_output_dir)
return pd.DataFrame(columns=self.props.get_value('resultCols'))

profile_path = profile_list[0]
result_arr = []
if not os.listdir(profile_path) or len(app_ids) == 0:
self.logger.warning('Skipping empty profile: %s', profile_list[0])
else:
for app_id in app_ids:
app_id_path = os.path.join(profile_path, app_id)
try:
# Apply heuristics and determine if the application should be skipped.
# Note: `should_skip` flag can be a combination of multiple heuristic checks.
should_skip, reason = self.heuristics_based_on_spills(app_id_path)
except Exception as e: # pylint: disable=broad-except
should_skip = False
reason = f'Cannot apply heuristics for qualification. Reason - {type(e).__name__}:{e}'
self.logger.error(reason)
result_arr.append([app_id, should_skip, reason])

return pd.DataFrame(result_arr, columns=self.props.get_value('resultCols'))

def heuristics_based_on_spills(self, app_id_path: str) -> (bool, str):
"""
Apply heuristics based on spills to determine if the app can be accelerated on GPU.
"""
# Load stage aggregation metrics (this contains spill information)
job_stage_agg_metrics_file = self.props.get_value('spillBased', 'jobStageAggMetrics', 'fileName')
job_stage_agg_metrics = pd.read_csv(os.path.join(app_id_path, job_stage_agg_metrics_file))
job_stage_agg_metrics = job_stage_agg_metrics[self.props.get_value('spillBased',
'jobStageAggMetrics', 'columns')]

# Load sql-to-stage information (this contains Exec names)
sql_to_stage_info_file = self.props.get_value('spillBased', 'sqlToStageInfo', 'fileName')
sql_to_stage_info = pd.read_csv(os.path.join(app_id_path, sql_to_stage_info_file))
sql_to_stage_info = sql_to_stage_info[self.props.get_value('spillBased',
'sqlToStageInfo', 'columns')]

# Identify stages with significant spills
spill_threshold_bytes = self.props.get_value('spillBased', 'spillThresholdBytes')
stages_with_spills = job_stage_agg_metrics[
job_stage_agg_metrics['ID'].str.startswith('stage') &
(job_stage_agg_metrics['memoryBytesSpilled_sum'] > spill_threshold_bytes)
].copy()
stages_with_spills['stageId'] = stages_with_spills['ID'].str.extract(r'(\d+)').astype(int)

# Merge stages with spills with SQL-to-stage information
merged_df = pd.merge(stages_with_spills, sql_to_stage_info, on='stageId', how='inner')

# Identify stages with spills caused by Execs other than the ones allowed (Join, Aggregate or Sort)
# Note: Column 'SQL Nodes(IDs)' contains the Exec names
pattern = '|'.join(map(re.escape, self.props.get_value('spillBased', 'allowedExecs')))
relevant_stages_with_spills = merged_df[~merged_df['SQL Nodes(IDs)'].apply(
lambda x: isinstance(x, str) and bool(re.search(pattern, x)))]
# If there are any stages with spills caused by non-allowed Execs, skip the application
if not relevant_stages_with_spills.empty:
stages_str = '; '.join(relevant_stages_with_spills['stageId'].astype(str))
spill_threshold_human_readable = Utilities.bytes_to_human_readable(spill_threshold_bytes)
reason = f'Skipping due to spills in stages [{stages_str}] exceeding {spill_threshold_human_readable}'
return True, reason
return False, ''

def apply_heuristics(self, all_apps: pd.DataFrame) -> pd.DataFrame:
try:
heuristics_df = self._apply_heuristics(all_apps['App ID'].unique())
# Save the heuristics results to a file and drop the reason column
heuristics_df.to_csv(self.output_file, index=False)
heuristics_df.drop(columns=['Reason'], inplace=True)
all_apps = pd.merge(all_apps, heuristics_df, on=['App ID'], how='left')
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error occurred while applying additional heuristics. '
'Reason - %s:%s', type(e).__name__, e)
return all_apps
7 changes: 5 additions & 2 deletions user_tools/src/spark_rapids_tools/tools/speedup_category.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,15 @@ def __process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame:
reason: Category will be set to 'Not Recommended' because the criteriaCol1 is not within the range (18-30)
"""
category_col_name = self.props.get('categoryColumnName')
heuristics_col_name = self.props.get('heuristicsColumnName')

def process_row(single_row: pd.Series) -> str:
for entry in self.props.get('eligibilityConditions'):
col_value = single_row[entry.get('columnName')]
# If the value is not within the range, set the category to default category (Not Recommended)
if not entry.get('lowerBound') <= col_value <= entry.get('upperBound'):
# If the row is marked to be skipped by heuristics or the value is not within the range,
# set the category to default category (Not Recommended)
if (single_row.get(heuristics_col_name) is True or
cindyyuanjiang marked this conversation as resolved.
Show resolved Hide resolved
not entry.get('lowerBound') <= col_value <= entry.get('upperBound')):
return self.props.get('defaultCategory')
return single_row.get(category_col_name)

Expand Down
12 changes: 12 additions & 0 deletions user_tools/src/spark_rapids_tools/utils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,15 @@ def squeeze_df_header(cls, df_row: pd.DataFrame, header_width: int) -> pd.DataFr
df_row.columns = df_row.columns.str.replace(column,
new_column_name, regex=False)
return df_row

@classmethod
def bytes_to_human_readable(cls, num_bytes: int) -> str:
"""
Convert bytes to human-readable format up to PB
"""
size_units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
i = 0
while num_bytes >= 1024 and i < len(size_units) - 1:
num_bytes /= 1024.0
i += 1
return f'{num_bytes:.2f} {size_units[i]}'