Skip to content

Commit

Permalink
Add heuristics using stage spill metrics to skip apps (#1002)
Browse files Browse the repository at this point in the history
* Add heuristics using stage spill metrics to skip apps

Signed-off-by: Partho Sarthi <[email protected]>

* Remove disk spill metrics

Signed-off-by: Partho Sarthi <[email protected]>

* Add skip reason

Signed-off-by: Partho Sarthi <[email protected]>

* Revert "Add skip reason"

This reverts commit b774958.

* Generate skip reason to intermediate output directory

Signed-off-by: Partho Sarthi <[email protected]>

* Change delimiter to semi colon and update reason column name

Signed-off-by: Partho Sarthi <[email protected]>

* Add function to convert size to human-readable format

Signed-off-by: Partho Sarthi <[email protected]>

---------

Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa authored May 16, 2024
1 parent 0cc500f commit 4f592ce
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 21 deletions.
47 changes: 32 additions & 15 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

from spark_rapids_pytools.cloud_api.sp_types import ClusterReshape, NodeHWInfo, SparkNodeType
from spark_rapids_pytools.common.cluster_inference import ClusterInference
from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator
from spark_rapids_pytools.rapids.rapids_job import RapidsJobPropContainer
from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool
from spark_rapids_tools.enums import QualFilterApp, QualGpuClusterReshapeType, QualEstimationModel
from spark_rapids_tools.tools.model_xgboost import predict
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.speedup_category import SpeedupCategory
from spark_rapids_tools.tools.top_candidates import TopCandidates
from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration
Expand Down Expand Up @@ -441,9 +443,13 @@ def __group_apps_by_name(self, all_apps) -> (pd.DataFrame, str):
all_apps_count = len(all_apps)

group_info = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'groupColumns')
for group_col in group_info['columns']:
valid_group_cols = Utilities.get_valid_df_columns(group_info['keys'], all_apps)
all_apps[group_col] = all_apps.groupby(valid_group_cols)[group_col].transform('mean')
valid_group_cols = Utilities.get_valid_df_columns(group_info['keys'], all_apps)
for agg_info in group_info['aggregate']:
agg_col = agg_info['column']
if agg_col in all_apps.columns:
# Group by columns can contain NaN values, so we need to include them in the grouping
all_apps[agg_col] = all_apps.groupby(valid_group_cols, dropna=False)[agg_col].transform(
agg_info['function'])

drop_arr = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'dropDuplicates')
valid_drop_cols = Utilities.get_valid_df_columns(drop_arr, all_apps)
Expand Down Expand Up @@ -686,22 +692,29 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series:
def __build_global_report_summary(self,
all_apps: pd.DataFrame,
unsupported_ops_df: pd.DataFrame,
output_files_info: dict) -> QualificationSummary:
output_files_raw: dict) -> QualificationSummary:
if all_apps.empty:
# No need to run saving estimator or process the data frames.
return QualificationSummary(comments=self.__generate_mc_types_conversion_report())

output_files_info = JSONPropertiesContainer(output_files_raw, file_load=False)
unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output',
'unsupportedOperators'))
# Calculate unsupported operators stage duration before grouping
all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df)
apps_pruned_df = self.__remap_columns_and_prune(all_apps)
# Apply additional heuristics to skip apps not suitable for GPU acceleration
heuristics_ob = AdditionalHeuristics(
props=self.ctxt.get_value('local', 'output', 'additionalHeuristics'),
tools_output_dir=self.ctxt.get_local('outputFolder'),
output_file=output_files_info.get_value('intermediateOutput', 'files', 'heuristics', 'path'))
apps_pruned_df = heuristics_ob.apply_heuristics(apps_pruned_df)
speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories'))
# Calculate the speedup category column
apps_pruned_df = speedup_category_ob.build_category_column(apps_pruned_df)
apps_pruned_df.to_csv(output_files_info['full']['path'], float_format='%.2f')
# Calculate the speedup category column, send a copy of the dataframe to avoid modifying the original
apps_pruned_result = speedup_category_ob.build_category_column(apps_pruned_df.copy())
apps_pruned_result.to_csv(output_files_info.get_value('full', 'path'), float_format='%.2f')
# Group the applications and recalculate metrics
apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df)
# Recalculate the speedup category column after grouping
apps_grouped_df = speedup_category_ob.build_category_column(apps_grouped_df)
recommended_apps = self.__get_recommended_apps(apps_grouped_df)
# if the gpu_reshape_type is set to JOB then, then we should ignore recommended apps
Expand All @@ -726,7 +739,7 @@ def __build_global_report_summary(self,
'clusterShapeCols', 'columnName')
speed_recommendation_col = self.ctxt.get_value('local', 'output', 'speedupRecommendColumn')
apps_reshaped_df, per_row_flag = self.__apply_gpu_cluster_reshape(apps_grouped_df)
csv_out = output_files_info['summary']['path']
csv_out = output_files_info.get_value('summary', 'path')
if launch_savings_calc:
# Now, the dataframe is ready to calculate the cost and the savings
apps_working_set = self.__calc_apps_cost(apps_reshaped_df,
Expand Down Expand Up @@ -950,15 +963,19 @@ def __build_prediction_output_files_info(self) -> dict:
FSUtil.make_dirs(output_dir)
return self.__update_files_info_with_paths(predictions_info['files'], output_dir)

@staticmethod
def __update_files_info_with_paths(files_info: dict, output_dir: str) -> dict:
@classmethod
def __update_files_info_with_paths(cls, files_info: dict, output_dir: str) -> dict:
"""
Update the given files_info dictionary with full file paths.
"""
for entry in files_info:
file_name = files_info[entry]['name']
file_path = FSUtil.build_path(output_dir, file_name)
files_info[entry]['path'] = file_path
for _, entry in files_info.items():
file_name = entry['name']
path = FSUtil.build_path(output_dir, file_name)
# if entry is a directory, create the directory and update the files info recursively
if entry.get('isDirectory'):
FSUtil.make_dirs(path)
entry['files'] = cls.__update_files_info_with_paths(entry['files'], path)
entry['path'] = path
return files_info

def __update_apps_with_prediction_info(self, all_apps: pd.DataFrame) -> pd.DataFrame:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,17 @@ toolOutput:
- Cluster Id
- Cluster Name
- App Name
columns:
- App Duration
- Estimated GPU Duration
- Unsupported Operators Stage Duration
aggregate:
- column: App Duration
function: 'mean'
- column: Estimated GPU Duration
function: 'mean'
- column: Unsupported Operators Stage Duration
function: 'mean'
- column: Skip by Heuristics
function: 'any'
- column: Skip by Heuristics Reason
function: 'concat'
dropDuplicates:
- Vendor
- Driver Host
Expand Down Expand Up @@ -113,6 +120,14 @@ local:
full:
name: qualification_summary_full.csv
outputComment: "Full savings and speedups CSV report"
intermediateOutput:
name: 'intermediate_output'
outputComment: "Intermediate output generated by tools"
isDirectory: true
files:
heuristics:
name: 'heuristics_info.csv'
outputComment: "Heuristics information"
costColumns:
- 'Savings Based Recommendation'
- 'Estimated App Cost'
Expand Down Expand Up @@ -232,6 +247,7 @@ local:
speedupCategories:
speedupColumnName: 'Estimated GPU Speedup'
categoryColumnName: 'Estimated GPU Speedup Category'
heuristicsColumnName: 'Skip by Heuristics'
categories:
- title: 'Not Applicable'
lowerBound: -1000000.0
Expand All @@ -253,6 +269,29 @@ local:
lowerBound: 0.0
upperBound: 25.0
defaultCategory: 'Not Recommended'
additionalHeuristics:
appInfo:
fileName: 'application_information.csv'
resultCols:
- 'App ID'
- 'Skip by Heuristics'
- 'Reason'
spillBased:
jobStageAggMetrics:
fileName: 'job_+_stage_level_aggregated_task_metrics.csv'
columns:
- 'ID'
- 'memoryBytesSpilled_sum'
sqlToStageInfo:
fileName: 'sql_to_stage_information.csv'
columns:
- 'stageId'
- 'SQL Nodes(IDs)'
spillThresholdBytes: 10737418240
allowedExecs:
- 'Aggregate'
- 'Join'
- 'Sort'
predictionModel:
outputDirectory: 'xgboost_predictions'
files:
Expand Down
128 changes: 128 additions & 0 deletions user_tools/src/spark_rapids_tools/tools/additional_heuristics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implementation class for Additional Heuristics logic."""

import os
import re
from dataclasses import dataclass, field
from logging import Logger

import pandas as pd

from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.utilities import ToolLogging
from spark_rapids_tools.tools.model_xgboost import find_paths, RegexPattern
from spark_rapids_tools.utils import Utilities


@dataclass
class AdditionalHeuristics:
"""
Encapsulates the logic to apply additional heuristics to skip applications.
"""
logger: Logger = field(default=None, init=False)
props: JSONPropertiesContainer = field(default=None, init=False)
tools_output_dir: str = field(default=None, init=False)
output_file: str = field(default=None, init=False)

def __init__(self, props: dict, tools_output_dir: str, output_file: str):
self.props = JSONPropertiesContainer(props, file_load=False)
self.tools_output_dir = tools_output_dir
self.output_file = output_file
self.logger = ToolLogging.get_and_setup_logger(f'rapids.tools.{self.__class__.__name__}')

def _apply_heuristics(self, app_ids: list) -> pd.DataFrame:
"""
Apply additional heuristics to applications to determine if they can be accelerated on GPU.
"""
profile_list = find_paths(
self.tools_output_dir,
RegexPattern.rapids_profile.match,
return_directories=True,
)
if len(profile_list) == 0:
self.logger.warning('No RAPIDS profiles found in output directory: %s', self.tools_output_dir)
return pd.DataFrame(columns=self.props.get_value('resultCols'))

profile_path = profile_list[0]
result_arr = []
if not os.listdir(profile_path) or len(app_ids) == 0:
self.logger.warning('Skipping empty profile: %s', profile_list[0])
else:
for app_id in app_ids:
app_id_path = os.path.join(profile_path, app_id)
try:
# Apply heuristics and determine if the application should be skipped.
# Note: `should_skip` flag can be a combination of multiple heuristic checks.
should_skip, reason = self.heuristics_based_on_spills(app_id_path)
except Exception as e: # pylint: disable=broad-except
should_skip = False
reason = f'Cannot apply heuristics for qualification. Reason - {type(e).__name__}:{e}'
self.logger.error(reason)
result_arr.append([app_id, should_skip, reason])

return pd.DataFrame(result_arr, columns=self.props.get_value('resultCols'))

def heuristics_based_on_spills(self, app_id_path: str) -> (bool, str):
"""
Apply heuristics based on spills to determine if the app can be accelerated on GPU.
"""
# Load stage aggregation metrics (this contains spill information)
job_stage_agg_metrics_file = self.props.get_value('spillBased', 'jobStageAggMetrics', 'fileName')
job_stage_agg_metrics = pd.read_csv(os.path.join(app_id_path, job_stage_agg_metrics_file))
job_stage_agg_metrics = job_stage_agg_metrics[self.props.get_value('spillBased',
'jobStageAggMetrics', 'columns')]

# Load sql-to-stage information (this contains Exec names)
sql_to_stage_info_file = self.props.get_value('spillBased', 'sqlToStageInfo', 'fileName')
sql_to_stage_info = pd.read_csv(os.path.join(app_id_path, sql_to_stage_info_file))
sql_to_stage_info = sql_to_stage_info[self.props.get_value('spillBased',
'sqlToStageInfo', 'columns')]

# Identify stages with significant spills
spill_threshold_bytes = self.props.get_value('spillBased', 'spillThresholdBytes')
stages_with_spills = job_stage_agg_metrics[
job_stage_agg_metrics['ID'].str.startswith('stage') &
(job_stage_agg_metrics['memoryBytesSpilled_sum'] > spill_threshold_bytes)
].copy()
stages_with_spills['stageId'] = stages_with_spills['ID'].str.extract(r'(\d+)').astype(int)

# Merge stages with spills with SQL-to-stage information
merged_df = pd.merge(stages_with_spills, sql_to_stage_info, on='stageId', how='inner')

# Identify stages with spills caused by Execs other than the ones allowed (Join, Aggregate or Sort)
# Note: Column 'SQL Nodes(IDs)' contains the Exec names
pattern = '|'.join(map(re.escape, self.props.get_value('spillBased', 'allowedExecs')))
relevant_stages_with_spills = merged_df[~merged_df['SQL Nodes(IDs)'].apply(
lambda x: isinstance(x, str) and bool(re.search(pattern, x)))]
# If there are any stages with spills caused by non-allowed Execs, skip the application
if not relevant_stages_with_spills.empty:
stages_str = '; '.join(relevant_stages_with_spills['stageId'].astype(str))
spill_threshold_human_readable = Utilities.bytes_to_human_readable(spill_threshold_bytes)
reason = f'Skipping due to spills in stages [{stages_str}] exceeding {spill_threshold_human_readable}'
return True, reason
return False, ''

def apply_heuristics(self, all_apps: pd.DataFrame) -> pd.DataFrame:
try:
heuristics_df = self._apply_heuristics(all_apps['App ID'].unique())
# Save the heuristics results to a file and drop the reason column
heuristics_df.to_csv(self.output_file, index=False)
heuristics_df.drop(columns=['Reason'], inplace=True)
all_apps = pd.merge(all_apps, heuristics_df, on=['App ID'], how='left')
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error occurred while applying additional heuristics. '
'Reason - %s:%s', type(e).__name__, e)
return all_apps
7 changes: 5 additions & 2 deletions user_tools/src/spark_rapids_tools/tools/speedup_category.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,15 @@ def __process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame:
reason: Category will be set to 'Not Recommended' because the criteriaCol1 is not within the range (18-30)
"""
category_col_name = self.props.get('categoryColumnName')
heuristics_col_name = self.props.get('heuristicsColumnName')

def process_row(single_row: pd.Series) -> str:
for entry in self.props.get('eligibilityConditions'):
col_value = single_row[entry.get('columnName')]
# If the value is not within the range, set the category to default category (Not Recommended)
if not entry.get('lowerBound') <= col_value <= entry.get('upperBound'):
# If the row is marked to be skipped by heuristics or the value is not within the range,
# set the category to default category (Not Recommended)
if (single_row.get(heuristics_col_name) is True or
not entry.get('lowerBound') <= col_value <= entry.get('upperBound')):
return self.props.get('defaultCategory')
return single_row.get(category_col_name)

Expand Down
12 changes: 12 additions & 0 deletions user_tools/src/spark_rapids_tools/utils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,15 @@ def squeeze_df_header(cls, df_row: pd.DataFrame, header_width: int) -> pd.DataFr
df_row.columns = df_row.columns.str.replace(column,
new_column_name, regex=False)
return df_row

@classmethod
def bytes_to_human_readable(cls, num_bytes: int) -> str:
"""
Convert bytes to human-readable format up to PB
"""
size_units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
i = 0
while num_bytes >= 1024 and i < len(size_units) - 1:
num_bytes /= 1024.0
i += 1
return f'{num_bytes:.2f} {size_units[i]}'

0 comments on commit 4f592ce

Please sign in to comment.