diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index d1a672ebe..282510b69 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -24,6 +24,7 @@ from spark_rapids_pytools.cloud_api.sp_types import ClusterReshape, NodeHWInfo, SparkNodeType from spark_rapids_pytools.common.cluster_inference import ClusterInference +from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator from spark_rapids_pytools.pricing.price_provider import SavingsEstimator @@ -31,6 +32,7 @@ from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool from spark_rapids_tools.enums import QualFilterApp, QualGpuClusterReshapeType, QualEstimationModel from spark_rapids_tools.tools.model_xgboost import predict +from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics from spark_rapids_tools.tools.speedup_category import SpeedupCategory from spark_rapids_tools.tools.top_candidates import TopCandidates from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration @@ -441,9 +443,13 @@ def __group_apps_by_name(self, all_apps) -> (pd.DataFrame, str): all_apps_count = len(all_apps) group_info = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'groupColumns') - for group_col in group_info['columns']: - valid_group_cols = Utilities.get_valid_df_columns(group_info['keys'], all_apps) - all_apps[group_col] = all_apps.groupby(valid_group_cols)[group_col].transform('mean') + valid_group_cols = Utilities.get_valid_df_columns(group_info['keys'], all_apps) + for agg_info in group_info['aggregate']: + agg_col = agg_info['column'] + if agg_col in all_apps.columns: + # Group by columns can contain NaN values, so we need to include them in the grouping + all_apps[agg_col] = all_apps.groupby(valid_group_cols, dropna=False)[agg_col].transform( + agg_info['function']) drop_arr = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'dropDuplicates') valid_drop_cols = Utilities.get_valid_df_columns(drop_arr, all_apps) @@ -686,22 +692,29 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series: def __build_global_report_summary(self, all_apps: pd.DataFrame, unsupported_ops_df: pd.DataFrame, - output_files_info: dict) -> QualificationSummary: + output_files_raw: dict) -> QualificationSummary: if all_apps.empty: # No need to run saving estimator or process the data frames. return QualificationSummary(comments=self.__generate_mc_types_conversion_report()) + output_files_info = JSONPropertiesContainer(output_files_raw, file_load=False) unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output', 'unsupportedOperators')) # Calculate unsupported operators stage duration before grouping all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df) apps_pruned_df = self.__remap_columns_and_prune(all_apps) + # Apply additional heuristics to skip apps not suitable for GPU acceleration + heuristics_ob = AdditionalHeuristics( + props=self.ctxt.get_value('local', 'output', 'additionalHeuristics'), + tools_output_dir=self.ctxt.get_local('outputFolder'), + output_file=output_files_info.get_value('intermediateOutput', 'files', 'heuristics', 'path')) + apps_pruned_df = heuristics_ob.apply_heuristics(apps_pruned_df) speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories')) - # Calculate the speedup category column - apps_pruned_df = speedup_category_ob.build_category_column(apps_pruned_df) - apps_pruned_df.to_csv(output_files_info['full']['path'], float_format='%.2f') + # Calculate the speedup category column, send a copy of the dataframe to avoid modifying the original + apps_pruned_result = speedup_category_ob.build_category_column(apps_pruned_df.copy()) + apps_pruned_result.to_csv(output_files_info.get_value('full', 'path'), float_format='%.2f') + # Group the applications and recalculate metrics apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df) - # Recalculate the speedup category column after grouping apps_grouped_df = speedup_category_ob.build_category_column(apps_grouped_df) recommended_apps = self.__get_recommended_apps(apps_grouped_df) # if the gpu_reshape_type is set to JOB then, then we should ignore recommended apps @@ -726,7 +739,7 @@ def __build_global_report_summary(self, 'clusterShapeCols', 'columnName') speed_recommendation_col = self.ctxt.get_value('local', 'output', 'speedupRecommendColumn') apps_reshaped_df, per_row_flag = self.__apply_gpu_cluster_reshape(apps_grouped_df) - csv_out = output_files_info['summary']['path'] + csv_out = output_files_info.get_value('summary', 'path') if launch_savings_calc: # Now, the dataframe is ready to calculate the cost and the savings apps_working_set = self.__calc_apps_cost(apps_reshaped_df, @@ -950,15 +963,19 @@ def __build_prediction_output_files_info(self) -> dict: FSUtil.make_dirs(output_dir) return self.__update_files_info_with_paths(predictions_info['files'], output_dir) - @staticmethod - def __update_files_info_with_paths(files_info: dict, output_dir: str) -> dict: + @classmethod + def __update_files_info_with_paths(cls, files_info: dict, output_dir: str) -> dict: """ Update the given files_info dictionary with full file paths. """ - for entry in files_info: - file_name = files_info[entry]['name'] - file_path = FSUtil.build_path(output_dir, file_name) - files_info[entry]['path'] = file_path + for _, entry in files_info.items(): + file_name = entry['name'] + path = FSUtil.build_path(output_dir, file_name) + # if entry is a directory, create the directory and update the files info recursively + if entry.get('isDirectory'): + FSUtil.make_dirs(path) + entry['files'] = cls.__update_files_info_with_paths(entry['files'], path) + entry['path'] = path return files_info def __update_apps_with_prediction_info(self, all_apps: pd.DataFrame) -> pd.DataFrame: diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index eea8f63aa..e566149df 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -46,10 +46,17 @@ toolOutput: - Cluster Id - Cluster Name - App Name - columns: - - App Duration - - Estimated GPU Duration - - Unsupported Operators Stage Duration + aggregate: + - column: App Duration + function: 'mean' + - column: Estimated GPU Duration + function: 'mean' + - column: Unsupported Operators Stage Duration + function: 'mean' + - column: Skip by Heuristics + function: 'any' + - column: Skip by Heuristics Reason + function: 'concat' dropDuplicates: - Vendor - Driver Host @@ -113,6 +120,14 @@ local: full: name: qualification_summary_full.csv outputComment: "Full savings and speedups CSV report" + intermediateOutput: + name: 'intermediate_output' + outputComment: "Intermediate output generated by tools" + isDirectory: true + files: + heuristics: + name: 'heuristics_info.csv' + outputComment: "Heuristics information" costColumns: - 'Savings Based Recommendation' - 'Estimated App Cost' @@ -232,6 +247,7 @@ local: speedupCategories: speedupColumnName: 'Estimated GPU Speedup' categoryColumnName: 'Estimated GPU Speedup Category' + heuristicsColumnName: 'Skip by Heuristics' categories: - title: 'Not Applicable' lowerBound: -1000000.0 @@ -253,6 +269,29 @@ local: lowerBound: 0.0 upperBound: 25.0 defaultCategory: 'Not Recommended' + additionalHeuristics: + appInfo: + fileName: 'application_information.csv' + resultCols: + - 'App ID' + - 'Skip by Heuristics' + - 'Reason' + spillBased: + jobStageAggMetrics: + fileName: 'job_+_stage_level_aggregated_task_metrics.csv' + columns: + - 'ID' + - 'memoryBytesSpilled_sum' + sqlToStageInfo: + fileName: 'sql_to_stage_information.csv' + columns: + - 'stageId' + - 'SQL Nodes(IDs)' + spillThresholdBytes: 10737418240 + allowedExecs: + - 'Aggregate' + - 'Join' + - 'Sort' predictionModel: outputDirectory: 'xgboost_predictions' files: diff --git a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py new file mode 100644 index 000000000..b2dff8417 --- /dev/null +++ b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py @@ -0,0 +1,128 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Implementation class for Additional Heuristics logic.""" + +import os +import re +from dataclasses import dataclass, field +from logging import Logger + +import pandas as pd + +from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer +from spark_rapids_pytools.common.utilities import ToolLogging +from spark_rapids_tools.tools.model_xgboost import find_paths, RegexPattern +from spark_rapids_tools.utils import Utilities + + +@dataclass +class AdditionalHeuristics: + """ + Encapsulates the logic to apply additional heuristics to skip applications. + """ + logger: Logger = field(default=None, init=False) + props: JSONPropertiesContainer = field(default=None, init=False) + tools_output_dir: str = field(default=None, init=False) + output_file: str = field(default=None, init=False) + + def __init__(self, props: dict, tools_output_dir: str, output_file: str): + self.props = JSONPropertiesContainer(props, file_load=False) + self.tools_output_dir = tools_output_dir + self.output_file = output_file + self.logger = ToolLogging.get_and_setup_logger(f'rapids.tools.{self.__class__.__name__}') + + def _apply_heuristics(self, app_ids: list) -> pd.DataFrame: + """ + Apply additional heuristics to applications to determine if they can be accelerated on GPU. + """ + profile_list = find_paths( + self.tools_output_dir, + RegexPattern.rapids_profile.match, + return_directories=True, + ) + if len(profile_list) == 0: + self.logger.warning('No RAPIDS profiles found in output directory: %s', self.tools_output_dir) + return pd.DataFrame(columns=self.props.get_value('resultCols')) + + profile_path = profile_list[0] + result_arr = [] + if not os.listdir(profile_path) or len(app_ids) == 0: + self.logger.warning('Skipping empty profile: %s', profile_list[0]) + else: + for app_id in app_ids: + app_id_path = os.path.join(profile_path, app_id) + try: + # Apply heuristics and determine if the application should be skipped. + # Note: `should_skip` flag can be a combination of multiple heuristic checks. + should_skip, reason = self.heuristics_based_on_spills(app_id_path) + except Exception as e: # pylint: disable=broad-except + should_skip = False + reason = f'Cannot apply heuristics for qualification. Reason - {type(e).__name__}:{e}' + self.logger.error(reason) + result_arr.append([app_id, should_skip, reason]) + + return pd.DataFrame(result_arr, columns=self.props.get_value('resultCols')) + + def heuristics_based_on_spills(self, app_id_path: str) -> (bool, str): + """ + Apply heuristics based on spills to determine if the app can be accelerated on GPU. + """ + # Load stage aggregation metrics (this contains spill information) + job_stage_agg_metrics_file = self.props.get_value('spillBased', 'jobStageAggMetrics', 'fileName') + job_stage_agg_metrics = pd.read_csv(os.path.join(app_id_path, job_stage_agg_metrics_file)) + job_stage_agg_metrics = job_stage_agg_metrics[self.props.get_value('spillBased', + 'jobStageAggMetrics', 'columns')] + + # Load sql-to-stage information (this contains Exec names) + sql_to_stage_info_file = self.props.get_value('spillBased', 'sqlToStageInfo', 'fileName') + sql_to_stage_info = pd.read_csv(os.path.join(app_id_path, sql_to_stage_info_file)) + sql_to_stage_info = sql_to_stage_info[self.props.get_value('spillBased', + 'sqlToStageInfo', 'columns')] + + # Identify stages with significant spills + spill_threshold_bytes = self.props.get_value('spillBased', 'spillThresholdBytes') + stages_with_spills = job_stage_agg_metrics[ + job_stage_agg_metrics['ID'].str.startswith('stage') & + (job_stage_agg_metrics['memoryBytesSpilled_sum'] > spill_threshold_bytes) + ].copy() + stages_with_spills['stageId'] = stages_with_spills['ID'].str.extract(r'(\d+)').astype(int) + + # Merge stages with spills with SQL-to-stage information + merged_df = pd.merge(stages_with_spills, sql_to_stage_info, on='stageId', how='inner') + + # Identify stages with spills caused by Execs other than the ones allowed (Join, Aggregate or Sort) + # Note: Column 'SQL Nodes(IDs)' contains the Exec names + pattern = '|'.join(map(re.escape, self.props.get_value('spillBased', 'allowedExecs'))) + relevant_stages_with_spills = merged_df[~merged_df['SQL Nodes(IDs)'].apply( + lambda x: isinstance(x, str) and bool(re.search(pattern, x)))] + # If there are any stages with spills caused by non-allowed Execs, skip the application + if not relevant_stages_with_spills.empty: + stages_str = '; '.join(relevant_stages_with_spills['stageId'].astype(str)) + spill_threshold_human_readable = Utilities.bytes_to_human_readable(spill_threshold_bytes) + reason = f'Skipping due to spills in stages [{stages_str}] exceeding {spill_threshold_human_readable}' + return True, reason + return False, '' + + def apply_heuristics(self, all_apps: pd.DataFrame) -> pd.DataFrame: + try: + heuristics_df = self._apply_heuristics(all_apps['App ID'].unique()) + # Save the heuristics results to a file and drop the reason column + heuristics_df.to_csv(self.output_file, index=False) + heuristics_df.drop(columns=['Reason'], inplace=True) + all_apps = pd.merge(all_apps, heuristics_df, on=['App ID'], how='left') + except Exception as e: # pylint: disable=broad-except + self.logger.error('Error occurred while applying additional heuristics. ' + 'Reason - %s:%s', type(e).__name__, e) + return all_apps diff --git a/user_tools/src/spark_rapids_tools/tools/speedup_category.py b/user_tools/src/spark_rapids_tools/tools/speedup_category.py index 40dc6928d..15d9b8790 100644 --- a/user_tools/src/spark_rapids_tools/tools/speedup_category.py +++ b/user_tools/src/spark_rapids_tools/tools/speedup_category.py @@ -75,12 +75,15 @@ def __process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame: reason: Category will be set to 'Not Recommended' because the criteriaCol1 is not within the range (18-30) """ category_col_name = self.props.get('categoryColumnName') + heuristics_col_name = self.props.get('heuristicsColumnName') def process_row(single_row: pd.Series) -> str: for entry in self.props.get('eligibilityConditions'): col_value = single_row[entry.get('columnName')] - # If the value is not within the range, set the category to default category (Not Recommended) - if not entry.get('lowerBound') <= col_value <= entry.get('upperBound'): + # If the row is marked to be skipped by heuristics or the value is not within the range, + # set the category to default category (Not Recommended) + if (single_row.get(heuristics_col_name) is True or + not entry.get('lowerBound') <= col_value <= entry.get('upperBound')): return self.props.get('defaultCategory') return single_row.get(category_col_name) diff --git a/user_tools/src/spark_rapids_tools/utils/util.py b/user_tools/src/spark_rapids_tools/utils/util.py index 8361aa764..1ff849e9b 100644 --- a/user_tools/src/spark_rapids_tools/utils/util.py +++ b/user_tools/src/spark_rapids_tools/utils/util.py @@ -309,3 +309,15 @@ def squeeze_df_header(cls, df_row: pd.DataFrame, header_width: int) -> pd.DataFr df_row.columns = df_row.columns.str.replace(column, new_column_name, regex=False) return df_row + + @classmethod + def bytes_to_human_readable(cls, num_bytes: int) -> str: + """ + Convert bytes to human-readable format up to PB + """ + size_units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] + i = 0 + while num_bytes >= 1024 and i < len(size_units) - 1: + num_bytes /= 1024.0 + i += 1 + return f'{num_bytes:.2f} {size_units[i]}'