From d8b3a646513567dbf87e7728b880a6688d11c66c Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Wed, 8 May 2024 15:28:07 -0700 Subject: [PATCH 1/7] Add heuristics using stage spill metrics to skip apps Signed-off-by: Partho Sarthi --- .../rapids/qualification.py | 22 +++- .../resources/qualification-conf.yaml | 38 +++++- .../tools/additional_heuristics.py | 120 ++++++++++++++++++ .../tools/speedup_category.py | 7 +- 4 files changed, 174 insertions(+), 13 deletions(-) create mode 100644 user_tools/src/spark_rapids_tools/tools/additional_heuristics.py diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 99cd6a41e..8a7f3f150 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -31,6 +31,7 @@ from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool from spark_rapids_tools.enums import QualFilterApp, QualGpuClusterReshapeType, QualEstimationModel from spark_rapids_tools.tools.model_xgboost import predict +from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics from spark_rapids_tools.tools.speedup_category import SpeedupCategory from spark_rapids_tools.tools.top_candidates import TopCandidates from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration @@ -439,9 +440,12 @@ def __group_apps_by_name(self, all_apps) -> (pd.DataFrame, str): all_apps_count = len(all_apps) group_info = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'groupColumns') - for group_col in group_info['columns']: - valid_group_cols = Utilities.get_valid_df_columns(group_info['keys'], all_apps) - all_apps[group_col] = all_apps.groupby(valid_group_cols)[group_col].transform('mean') + valid_group_cols = Utilities.get_valid_df_columns(group_info['keys'], all_apps) + for agg_info in group_info['aggregate']: + agg_col = agg_info['column'] + if agg_col in all_apps.columns: + all_apps[agg_col] = all_apps.groupby(valid_group_cols)[agg_col].transform( + agg_info['function']) drop_arr = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'dropDuplicates') valid_drop_cols = Utilities.get_valid_df_columns(drop_arr, all_apps) @@ -694,12 +698,16 @@ def __build_global_report_summary(self, # Calculate unsupported operators stage duration before grouping all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df) apps_pruned_df = self.__remap_columns_and_prune(all_apps) + additional_heuristics = AdditionalHeuristics( + props=self.ctxt.get_value('local', 'output', 'additionalHeuristics'), + output_dir=self.ctxt.get_local('outputFolder')) + apps_pruned_df = additional_heuristics.apply_heuristics(apps_pruned_df) speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories')) - # Calculate the speedup category column - apps_pruned_df = speedup_category_ob.build_category_column(apps_pruned_df) - apps_pruned_df.to_csv(output_files_info['full']['path'], float_format='%.2f') + # Calculate the speedup category column, send a copy of the dataframe to avoid modifying the original + apps_pruned_result = speedup_category_ob.build_category_column(apps_pruned_df.copy()) + apps_pruned_result.to_csv(output_files_info['full']['path'], float_format='%.2f') + # Group the applications and recalculate metrics apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df) - # Recalculate the speedup category column after grouping apps_grouped_df = speedup_category_ob.build_category_column(apps_grouped_df) recommended_apps = self.__get_recommended_apps(apps_grouped_df) # if the gpu_reshape_type is set to JOB then, then we should ignore recommended apps diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index eea8f63aa..f4977f34c 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -46,10 +46,15 @@ toolOutput: - Cluster Id - Cluster Name - App Name - columns: - - App Duration - - Estimated GPU Duration - - Unsupported Operators Stage Duration + aggregate: + - column: App Duration + function: 'mean' + - column: Estimated GPU Duration + function: 'mean' + - column: Unsupported Operators Stage Duration + function: 'mean' + - column: Skip by Heuristics + function: 'any' dropDuplicates: - Vendor - Driver Host @@ -232,6 +237,7 @@ local: speedupCategories: speedupColumnName: 'Estimated GPU Speedup' categoryColumnName: 'Estimated GPU Speedup Category' + heuristicsColumnName: 'Skip by Heuristics' categories: - title: 'Not Applicable' lowerBound: -1000000.0 @@ -253,6 +259,30 @@ local: lowerBound: 0.0 upperBound: 25.0 defaultCategory: 'Not Recommended' + additionalHeuristics: + appInfo: + fileName: 'application_information.csv' + resultCols: + - 'App Name' + - 'App ID' + - 'Skip by Heuristics' + spillBased: + jobStageAggMetrics: + fileName: 'job_+_stage_level_aggregated_task_metrics.csv' + columns: + - 'ID' + - 'diskBytesSpilled_sum' + - 'memoryBytesSpilled_sum' + sqlToStageInfo: + fileName: 'sql_to_stage_information.csv' + columns: + - 'stageId' + - 'SQL Nodes(IDs)' + spillThresholdBytes: 1000000000 + allowedExecs: + - 'Aggregate' + - 'Join' + - 'Sort' predictionModel: outputDirectory: 'xgboost_predictions' files: diff --git a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py new file mode 100644 index 000000000..66ea0bd0b --- /dev/null +++ b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py @@ -0,0 +1,120 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Implementation class for Additional Heuristics logic.""" + +import os +import re +from dataclasses import dataclass, field +from logging import Logger +from pathlib import Path + +import pandas as pd + +from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer +from spark_rapids_pytools.common.utilities import ToolLogging +from spark_rapids_tools.tools.model_xgboost import find_paths, RegexPattern + + +@dataclass +class AdditionalHeuristics: + """ + Encapsulates the logic to apply additional heuristics to skip applications. + """ + logger: Logger = field(default=None, init=False) + props: JSONPropertiesContainer = field(default=None, init=False) + output_dir: str = field(default=None, init=False) + + def __init__(self, props: dict, output_dir: str): + self.props = JSONPropertiesContainer(props, file_load=False) + self.output_dir = output_dir + self.logger = ToolLogging.get_and_setup_logger(f'rapids.tools.{self.__class__.__name__}') + + def _apply_heuristics(self) -> pd.DataFrame: + """ + Apply additional heuristics to applications to determine if they can be accelerated on GPU. + """ + profile_list = find_paths( + self.output_dir, + RegexPattern.rapids_profile.match, + return_directories=True, + ) + if not profile_list: + self.logger.warning('No RAPIDS profiles found in output directory: %s', self.output_dir) + return pd.DataFrame() + + app_id_paths = find_paths(profile_list[0], RegexPattern.app_id.match, return_directories=True) + result_arr = [] + if not app_id_paths: + self.logger.warning('Skipping empty profile: %s', profile_list[0]) + else: + for app_id_path in app_id_paths: + try: + app_info_path = os.path.join(app_id_path, self.props.get_value('appInfo', 'fileName')) + app_info = pd.read_csv(app_info_path) + app_name = app_info['appName'].values[0] + app_id = Path(app_id_path).name + # Apply heuristics and determine if the application should be skipped. + # Note: `should_skip` flag can be a combination of multiple heuristic checks. + should_skip = self.heuristics_based_on_spills(app_id_path) + result_arr.append([app_name, app_id, should_skip]) + except Exception as e: # pylint: disable=broad-except + self.logger.error('Error occurred while applying additional heuristics. ' + 'Reason - %s:%s', type(e).__name__, e) + return pd.DataFrame(result_arr, columns=self.props.get_value('resultCols')) + + def heuristics_based_on_spills(self, app_id_path: str) -> bool: + """ + Apply heuristics based on spills to determine if the app can be accelerated on GPU. + """ + # Load stage aggregation metrics (this contains spill information) + job_stage_agg_metrics_file = self.props.get_value('spillBased', 'jobStageAggMetrics', 'fileName') + job_stage_agg_metrics = pd.read_csv(os.path.join(app_id_path, job_stage_agg_metrics_file)) + job_stage_agg_metrics = job_stage_agg_metrics[self.props.get_value('spillBased', + 'jobStageAggMetrics', 'columns')] + + # Load sql-to-stage information (this contains Exec names) + sql_to_stage_info_file = self.props.get_value('spillBased', 'sqlToStageInfo', 'fileName') + sql_to_stage_info = pd.read_csv(os.path.join(app_id_path, sql_to_stage_info_file)) + sql_to_stage_info = sql_to_stage_info[self.props.get_value('spillBased', + 'sqlToStageInfo', 'columns')] + + # Identify stages with significant spills + spill_threshold_bytes = self.props.get_value('spillBased', 'spillThresholdBytes') + stages_with_spills = job_stage_agg_metrics[ + job_stage_agg_metrics['ID'].str.startswith('stage') & + (job_stage_agg_metrics['diskBytesSpilled_sum'] + + job_stage_agg_metrics['memoryBytesSpilled_sum'] > spill_threshold_bytes) + ].copy() + stages_with_spills['stageId'] = stages_with_spills['ID'].str.extract(r'(\d+)').astype(int) + + # Merge stages with spills with SQL-to-stage information + merged_df = pd.merge(stages_with_spills, sql_to_stage_info, on='stageId', how='inner') + + # Identify stages with spills caused by Execs other than the ones allowed (Join, Aggregate or Sort) + # Note: Column 'SQL Nodes(IDs)' contains the Exec names + pattern = '|'.join(map(re.escape, self.props.get_value('spillBased', 'allowedExecs'))) + relevant_stages_with_spills = merged_df[~merged_df['SQL Nodes(IDs)'].apply( + lambda x: isinstance(x, str) and bool(re.search(pattern, x)))] + # If there are any stages with spills caused by non-allowed Execs, skip the application + return len(relevant_stages_with_spills) > 0 + + def apply_heuristics(self, all_apps: pd.DataFrame) -> pd.DataFrame: + try: + additional_heuristics_df = self._apply_heuristics() + all_apps = pd.merge(all_apps, additional_heuristics_df, on=['App Name', 'App ID'], how='left') + except Exception as e: # pylint: disable=broad-except + self.logger.error('Error occurred while applying additional heuristics. ' + 'Reason - %s:%s', type(e).__name__, e) + return all_apps diff --git a/user_tools/src/spark_rapids_tools/tools/speedup_category.py b/user_tools/src/spark_rapids_tools/tools/speedup_category.py index 40dc6928d..15d9b8790 100644 --- a/user_tools/src/spark_rapids_tools/tools/speedup_category.py +++ b/user_tools/src/spark_rapids_tools/tools/speedup_category.py @@ -75,12 +75,15 @@ def __process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame: reason: Category will be set to 'Not Recommended' because the criteriaCol1 is not within the range (18-30) """ category_col_name = self.props.get('categoryColumnName') + heuristics_col_name = self.props.get('heuristicsColumnName') def process_row(single_row: pd.Series) -> str: for entry in self.props.get('eligibilityConditions'): col_value = single_row[entry.get('columnName')] - # If the value is not within the range, set the category to default category (Not Recommended) - if not entry.get('lowerBound') <= col_value <= entry.get('upperBound'): + # If the row is marked to be skipped by heuristics or the value is not within the range, + # set the category to default category (Not Recommended) + if (single_row.get(heuristics_col_name) is True or + not entry.get('lowerBound') <= col_value <= entry.get('upperBound')): return self.props.get('defaultCategory') return single_row.get(category_col_name) From ca21dd2c3af31c5fb0c08cfc1d5f289ed2558773 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Thu, 9 May 2024 16:31:12 -0700 Subject: [PATCH 2/7] Remove disk spill metrics Signed-off-by: Partho Sarthi --- .../src/spark_rapids_pytools/resources/qualification-conf.yaml | 1 - .../src/spark_rapids_tools/tools/additional_heuristics.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index f4977f34c..c591c63c9 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -271,7 +271,6 @@ local: fileName: 'job_+_stage_level_aggregated_task_metrics.csv' columns: - 'ID' - - 'diskBytesSpilled_sum' - 'memoryBytesSpilled_sum' sqlToStageInfo: fileName: 'sql_to_stage_information.csv' diff --git a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py index 66ea0bd0b..bc11a991d 100644 --- a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py +++ b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py @@ -94,8 +94,7 @@ def heuristics_based_on_spills(self, app_id_path: str) -> bool: spill_threshold_bytes = self.props.get_value('spillBased', 'spillThresholdBytes') stages_with_spills = job_stage_agg_metrics[ job_stage_agg_metrics['ID'].str.startswith('stage') & - (job_stage_agg_metrics['diskBytesSpilled_sum'] + - job_stage_agg_metrics['memoryBytesSpilled_sum'] > spill_threshold_bytes) + (job_stage_agg_metrics['memoryBytesSpilled_sum'] > spill_threshold_bytes) ].copy() stages_with_spills['stageId'] = stages_with_spills['ID'].str.extract(r'(\d+)').astype(int) From b7749584104c180893d84512a663715c1eb2de5a Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 13 May 2024 17:17:49 -0700 Subject: [PATCH 3/7] Add skip reason Signed-off-by: Partho Sarthi --- .../rapids/qualification.py | 20 ++++++++-- .../resources/qualification-conf.yaml | 4 +- .../tools/additional_heuristics.py | 40 ++++++++++--------- 3 files changed, 40 insertions(+), 24 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 8a7f3f150..3fe68e420 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -16,7 +16,7 @@ from dataclasses import dataclass, field from math import ceil -from typing import Any, List, Callable +from typing import Any, List, Callable, Union import numpy as np import pandas as pd @@ -439,13 +439,25 @@ def __group_apps_by_name(self, all_apps) -> (pd.DataFrame, str): """ all_apps_count = len(all_apps) + def resolve_agg_function(agg_func_str: str) -> Union[str, Callable]: + """ + Return a custom aggregation function or the input string. + """ + agg_func_map = { + 'concat': lambda x: '; '.join(map(str, x)), + } + return agg_func_map.get(agg_func_str, agg_func_str) + group_info = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'groupColumns') valid_group_cols = Utilities.get_valid_df_columns(group_info['keys'], all_apps) for agg_info in group_info['aggregate']: - agg_col = agg_info['column'] + agg_col = agg_info.get('column') + # resolve the aggregation function if it is a custom function + agg_func = resolve_agg_function(agg_info.get('function')) if agg_col in all_apps.columns: - all_apps[agg_col] = all_apps.groupby(valid_group_cols)[agg_col].transform( - agg_info['function']) + all_apps[agg_col] = all_apps[agg_col].dropna() + # Group by columns can contain NaN values, so we need to include them in the grouping + all_apps[agg_col] = all_apps.groupby(valid_group_cols, dropna=False)[agg_col].transform(agg_func) drop_arr = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'dropDuplicates') valid_drop_cols = Utilities.get_valid_df_columns(drop_arr, all_apps) diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index c591c63c9..bec74099c 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -55,6 +55,8 @@ toolOutput: function: 'mean' - column: Skip by Heuristics function: 'any' + - column: Skip by Heuristics Reason + function: 'concat' dropDuplicates: - Vendor - Driver Host @@ -263,9 +265,9 @@ local: appInfo: fileName: 'application_information.csv' resultCols: - - 'App Name' - 'App ID' - 'Skip by Heuristics' + - 'Skip by Heuristics Reason' spillBased: jobStageAggMetrics: fileName: 'job_+_stage_level_aggregated_task_metrics.csv' diff --git a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py index bc11a991d..c35a56da1 100644 --- a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py +++ b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py @@ -18,7 +18,6 @@ import re from dataclasses import dataclass, field from logging import Logger -from pathlib import Path import pandas as pd @@ -41,7 +40,7 @@ def __init__(self, props: dict, output_dir: str): self.output_dir = output_dir self.logger = ToolLogging.get_and_setup_logger(f'rapids.tools.{self.__class__.__name__}') - def _apply_heuristics(self) -> pd.DataFrame: + def _apply_heuristics(self, app_ids: list) -> pd.DataFrame: """ Apply additional heuristics to applications to determine if they can be accelerated on GPU. """ @@ -50,31 +49,30 @@ def _apply_heuristics(self) -> pd.DataFrame: RegexPattern.rapids_profile.match, return_directories=True, ) - if not profile_list: + if len(profile_list) == 0: self.logger.warning('No RAPIDS profiles found in output directory: %s', self.output_dir) - return pd.DataFrame() + return pd.DataFrame(columns=self.props.get_value('resultCols')) - app_id_paths = find_paths(profile_list[0], RegexPattern.app_id.match, return_directories=True) + profile_path = profile_list[0] result_arr = [] - if not app_id_paths: + if not os.listdir(profile_path) or len(app_ids) == 0: self.logger.warning('Skipping empty profile: %s', profile_list[0]) else: - for app_id_path in app_id_paths: + for app_id in app_ids: + app_id_path = os.path.join(profile_path, app_id) try: - app_info_path = os.path.join(app_id_path, self.props.get_value('appInfo', 'fileName')) - app_info = pd.read_csv(app_info_path) - app_name = app_info['appName'].values[0] - app_id = Path(app_id_path).name # Apply heuristics and determine if the application should be skipped. # Note: `should_skip` flag can be a combination of multiple heuristic checks. - should_skip = self.heuristics_based_on_spills(app_id_path) - result_arr.append([app_name, app_id, should_skip]) + should_skip, reason = self.heuristics_based_on_spills(app_id_path) except Exception as e: # pylint: disable=broad-except - self.logger.error('Error occurred while applying additional heuristics. ' - 'Reason - %s:%s', type(e).__name__, e) + should_skip = False + reason = f'Cannot apply heuristics for qualification. Reason - {type(e).__name__}:{e}' + self.logger.error(reason) + result_arr.append([app_id, should_skip, reason]) + return pd.DataFrame(result_arr, columns=self.props.get_value('resultCols')) - def heuristics_based_on_spills(self, app_id_path: str) -> bool: + def heuristics_based_on_spills(self, app_id_path: str) -> (bool, str): """ Apply heuristics based on spills to determine if the app can be accelerated on GPU. """ @@ -107,12 +105,16 @@ def heuristics_based_on_spills(self, app_id_path: str) -> bool: relevant_stages_with_spills = merged_df[~merged_df['SQL Nodes(IDs)'].apply( lambda x: isinstance(x, str) and bool(re.search(pattern, x)))] # If there are any stages with spills caused by non-allowed Execs, skip the application - return len(relevant_stages_with_spills) > 0 + should_skip = len(relevant_stages_with_spills) > 0 + stages_str = ', '.join(relevant_stages_with_spills['stageId'].astype(str)) + reason = f'Skipping due to spills in stages [{stages_str}] exceeding {spill_threshold_bytes} bytes' \ + if should_skip else '' + return should_skip, reason def apply_heuristics(self, all_apps: pd.DataFrame) -> pd.DataFrame: try: - additional_heuristics_df = self._apply_heuristics() - all_apps = pd.merge(all_apps, additional_heuristics_df, on=['App Name', 'App ID'], how='left') + additional_heuristics_df = self._apply_heuristics(all_apps['App ID'].unique()) + all_apps = pd.merge(all_apps, additional_heuristics_df, on=['App ID'], how='left') except Exception as e: # pylint: disable=broad-except self.logger.error('Error occurred while applying additional heuristics. ' 'Reason - %s:%s', type(e).__name__, e) From f3987c58304ca7b042d382a8db692ae44d05ba0b Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Tue, 14 May 2024 14:25:50 -0700 Subject: [PATCH 4/7] Revert "Add skip reason" This reverts commit b7749584104c180893d84512a663715c1eb2de5a. --- .../rapids/qualification.py | 20 ++-------- .../resources/qualification-conf.yaml | 4 +- .../tools/additional_heuristics.py | 40 +++++++++---------- 3 files changed, 24 insertions(+), 40 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 3fe68e420..8a7f3f150 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -16,7 +16,7 @@ from dataclasses import dataclass, field from math import ceil -from typing import Any, List, Callable, Union +from typing import Any, List, Callable import numpy as np import pandas as pd @@ -439,25 +439,13 @@ def __group_apps_by_name(self, all_apps) -> (pd.DataFrame, str): """ all_apps_count = len(all_apps) - def resolve_agg_function(agg_func_str: str) -> Union[str, Callable]: - """ - Return a custom aggregation function or the input string. - """ - agg_func_map = { - 'concat': lambda x: '; '.join(map(str, x)), - } - return agg_func_map.get(agg_func_str, agg_func_str) - group_info = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'groupColumns') valid_group_cols = Utilities.get_valid_df_columns(group_info['keys'], all_apps) for agg_info in group_info['aggregate']: - agg_col = agg_info.get('column') - # resolve the aggregation function if it is a custom function - agg_func = resolve_agg_function(agg_info.get('function')) + agg_col = agg_info['column'] if agg_col in all_apps.columns: - all_apps[agg_col] = all_apps[agg_col].dropna() - # Group by columns can contain NaN values, so we need to include them in the grouping - all_apps[agg_col] = all_apps.groupby(valid_group_cols, dropna=False)[agg_col].transform(agg_func) + all_apps[agg_col] = all_apps.groupby(valid_group_cols)[agg_col].transform( + agg_info['function']) drop_arr = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'dropDuplicates') valid_drop_cols = Utilities.get_valid_df_columns(drop_arr, all_apps) diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index bec74099c..c591c63c9 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -55,8 +55,6 @@ toolOutput: function: 'mean' - column: Skip by Heuristics function: 'any' - - column: Skip by Heuristics Reason - function: 'concat' dropDuplicates: - Vendor - Driver Host @@ -265,9 +263,9 @@ local: appInfo: fileName: 'application_information.csv' resultCols: + - 'App Name' - 'App ID' - 'Skip by Heuristics' - - 'Skip by Heuristics Reason' spillBased: jobStageAggMetrics: fileName: 'job_+_stage_level_aggregated_task_metrics.csv' diff --git a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py index c35a56da1..bc11a991d 100644 --- a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py +++ b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py @@ -18,6 +18,7 @@ import re from dataclasses import dataclass, field from logging import Logger +from pathlib import Path import pandas as pd @@ -40,7 +41,7 @@ def __init__(self, props: dict, output_dir: str): self.output_dir = output_dir self.logger = ToolLogging.get_and_setup_logger(f'rapids.tools.{self.__class__.__name__}') - def _apply_heuristics(self, app_ids: list) -> pd.DataFrame: + def _apply_heuristics(self) -> pd.DataFrame: """ Apply additional heuristics to applications to determine if they can be accelerated on GPU. """ @@ -49,30 +50,31 @@ def _apply_heuristics(self, app_ids: list) -> pd.DataFrame: RegexPattern.rapids_profile.match, return_directories=True, ) - if len(profile_list) == 0: + if not profile_list: self.logger.warning('No RAPIDS profiles found in output directory: %s', self.output_dir) - return pd.DataFrame(columns=self.props.get_value('resultCols')) + return pd.DataFrame() - profile_path = profile_list[0] + app_id_paths = find_paths(profile_list[0], RegexPattern.app_id.match, return_directories=True) result_arr = [] - if not os.listdir(profile_path) or len(app_ids) == 0: + if not app_id_paths: self.logger.warning('Skipping empty profile: %s', profile_list[0]) else: - for app_id in app_ids: - app_id_path = os.path.join(profile_path, app_id) + for app_id_path in app_id_paths: try: + app_info_path = os.path.join(app_id_path, self.props.get_value('appInfo', 'fileName')) + app_info = pd.read_csv(app_info_path) + app_name = app_info['appName'].values[0] + app_id = Path(app_id_path).name # Apply heuristics and determine if the application should be skipped. # Note: `should_skip` flag can be a combination of multiple heuristic checks. - should_skip, reason = self.heuristics_based_on_spills(app_id_path) + should_skip = self.heuristics_based_on_spills(app_id_path) + result_arr.append([app_name, app_id, should_skip]) except Exception as e: # pylint: disable=broad-except - should_skip = False - reason = f'Cannot apply heuristics for qualification. Reason - {type(e).__name__}:{e}' - self.logger.error(reason) - result_arr.append([app_id, should_skip, reason]) - + self.logger.error('Error occurred while applying additional heuristics. ' + 'Reason - %s:%s', type(e).__name__, e) return pd.DataFrame(result_arr, columns=self.props.get_value('resultCols')) - def heuristics_based_on_spills(self, app_id_path: str) -> (bool, str): + def heuristics_based_on_spills(self, app_id_path: str) -> bool: """ Apply heuristics based on spills to determine if the app can be accelerated on GPU. """ @@ -105,16 +107,12 @@ def heuristics_based_on_spills(self, app_id_path: str) -> (bool, str): relevant_stages_with_spills = merged_df[~merged_df['SQL Nodes(IDs)'].apply( lambda x: isinstance(x, str) and bool(re.search(pattern, x)))] # If there are any stages with spills caused by non-allowed Execs, skip the application - should_skip = len(relevant_stages_with_spills) > 0 - stages_str = ', '.join(relevant_stages_with_spills['stageId'].astype(str)) - reason = f'Skipping due to spills in stages [{stages_str}] exceeding {spill_threshold_bytes} bytes' \ - if should_skip else '' - return should_skip, reason + return len(relevant_stages_with_spills) > 0 def apply_heuristics(self, all_apps: pd.DataFrame) -> pd.DataFrame: try: - additional_heuristics_df = self._apply_heuristics(all_apps['App ID'].unique()) - all_apps = pd.merge(all_apps, additional_heuristics_df, on=['App ID'], how='left') + additional_heuristics_df = self._apply_heuristics() + all_apps = pd.merge(all_apps, additional_heuristics_df, on=['App Name', 'App ID'], how='left') except Exception as e: # pylint: disable=broad-except self.logger.error('Error occurred while applying additional heuristics. ' 'Reason - %s:%s', type(e).__name__, e) From 75ac4e858a288331b1b1cfa819dfe469c9abcd22 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Tue, 14 May 2024 14:24:53 -0700 Subject: [PATCH 5/7] Generate skip reason to intermediate output directory Signed-off-by: Partho Sarthi --- .../rapids/qualification.py | 35 +++++++----- .../resources/qualification-conf.yaml | 13 ++++- .../tools/additional_heuristics.py | 55 +++++++++++-------- 3 files changed, 65 insertions(+), 38 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 8a7f3f150..35f954e65 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -24,6 +24,7 @@ from spark_rapids_pytools.cloud_api.sp_types import ClusterReshape, NodeHWInfo, SparkNodeType from spark_rapids_pytools.common.cluster_inference import ClusterInference +from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator from spark_rapids_pytools.pricing.price_provider import SavingsEstimator @@ -444,7 +445,8 @@ def __group_apps_by_name(self, all_apps) -> (pd.DataFrame, str): for agg_info in group_info['aggregate']: agg_col = agg_info['column'] if agg_col in all_apps.columns: - all_apps[agg_col] = all_apps.groupby(valid_group_cols)[agg_col].transform( + # Group by columns can contain NaN values, so we need to include them in the grouping + all_apps[agg_col] = all_apps.groupby(valid_group_cols, dropna=False)[agg_col].transform( agg_info['function']) drop_arr = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'dropDuplicates') @@ -688,24 +690,27 @@ def get_cost_per_row(df_row, reshape_col: str) -> pd.Series: def __build_global_report_summary(self, all_apps: pd.DataFrame, unsupported_ops_df: pd.DataFrame, - output_files_info: dict) -> QualificationSummary: + output_files_raw: dict) -> QualificationSummary: if all_apps.empty: # No need to run saving estimator or process the data frames. return QualificationSummary(comments=self.__generate_mc_types_conversion_report()) + output_files_info = JSONPropertiesContainer(output_files_raw, file_load=False) unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output', 'unsupportedOperators')) # Calculate unsupported operators stage duration before grouping all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df) apps_pruned_df = self.__remap_columns_and_prune(all_apps) - additional_heuristics = AdditionalHeuristics( + # Apply additional heuristics to skip apps not suitable for GPU acceleration + heuristics_ob = AdditionalHeuristics( props=self.ctxt.get_value('local', 'output', 'additionalHeuristics'), - output_dir=self.ctxt.get_local('outputFolder')) - apps_pruned_df = additional_heuristics.apply_heuristics(apps_pruned_df) + tools_output_dir=self.ctxt.get_local('outputFolder'), + output_file=output_files_info.get_value('intermediateOutput', 'files', 'heuristics', 'path')) + apps_pruned_df = heuristics_ob.apply_heuristics(apps_pruned_df) speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories')) # Calculate the speedup category column, send a copy of the dataframe to avoid modifying the original apps_pruned_result = speedup_category_ob.build_category_column(apps_pruned_df.copy()) - apps_pruned_result.to_csv(output_files_info['full']['path'], float_format='%.2f') + apps_pruned_result.to_csv(output_files_info.get_value('full', 'path'), float_format='%.2f') # Group the applications and recalculate metrics apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df) apps_grouped_df = speedup_category_ob.build_category_column(apps_grouped_df) @@ -732,7 +737,7 @@ def __build_global_report_summary(self, 'clusterShapeCols', 'columnName') speed_recommendation_col = self.ctxt.get_value('local', 'output', 'speedupRecommendColumn') apps_reshaped_df, per_row_flag = self.__apply_gpu_cluster_reshape(apps_grouped_df) - csv_out = output_files_info['summary']['path'] + csv_out = output_files_info.get_value('summary', 'path') if launch_savings_calc: # Now, the dataframe is ready to calculate the cost and the savings apps_working_set = self.__calc_apps_cost(apps_reshaped_df, @@ -956,15 +961,19 @@ def __build_prediction_output_files_info(self) -> dict: FSUtil.make_dirs(output_dir) return self.__update_files_info_with_paths(predictions_info['files'], output_dir) - @staticmethod - def __update_files_info_with_paths(files_info: dict, output_dir: str) -> dict: + @classmethod + def __update_files_info_with_paths(cls, files_info: dict, output_dir: str) -> dict: """ Update the given files_info dictionary with full file paths. """ - for entry in files_info: - file_name = files_info[entry]['name'] - file_path = FSUtil.build_path(output_dir, file_name) - files_info[entry]['path'] = file_path + for _, entry in files_info.items(): + file_name = entry['name'] + path = FSUtil.build_path(output_dir, file_name) + # if entry is a directory, create the directory and update the files info recursively + if entry.get('isDirectory'): + FSUtil.make_dirs(path) + entry['files'] = cls.__update_files_info_with_paths(entry['files'], path) + entry['path'] = path return files_info def __update_apps_with_prediction_info(self, all_apps: pd.DataFrame) -> pd.DataFrame: diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index c591c63c9..6e90c18cc 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -55,6 +55,8 @@ toolOutput: function: 'mean' - column: Skip by Heuristics function: 'any' + - column: Skip by Heuristics Reason + function: 'concat' dropDuplicates: - Vendor - Driver Host @@ -118,6 +120,14 @@ local: full: name: qualification_summary_full.csv outputComment: "Full savings and speedups CSV report" + intermediateOutput: + name: 'intermediate_output' + outputComment: "Intermediate output generated by tools" + isDirectory: true + files: + heuristics: + name: 'heuristics_info.csv' + outputComment: "Heuristics information" costColumns: - 'Savings Based Recommendation' - 'Estimated App Cost' @@ -262,10 +272,11 @@ local: additionalHeuristics: appInfo: fileName: 'application_information.csv' + reasonColumnName: 'Skip by Heuristics Reason' resultCols: - - 'App Name' - 'App ID' - 'Skip by Heuristics' + - 'Skip by Heuristics Reason' spillBased: jobStageAggMetrics: fileName: 'job_+_stage_level_aggregated_task_metrics.csv' diff --git a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py index bc11a991d..5eff7d253 100644 --- a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py +++ b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py @@ -18,7 +18,6 @@ import re from dataclasses import dataclass, field from logging import Logger -from pathlib import Path import pandas as pd @@ -34,47 +33,48 @@ class AdditionalHeuristics: """ logger: Logger = field(default=None, init=False) props: JSONPropertiesContainer = field(default=None, init=False) - output_dir: str = field(default=None, init=False) + tools_output_dir: str = field(default=None, init=False) + output_file: str = field(default=None, init=False) - def __init__(self, props: dict, output_dir: str): + def __init__(self, props: dict, tools_output_dir: str, output_file: str): self.props = JSONPropertiesContainer(props, file_load=False) - self.output_dir = output_dir + self.tools_output_dir = tools_output_dir + self.output_file = output_file self.logger = ToolLogging.get_and_setup_logger(f'rapids.tools.{self.__class__.__name__}') - def _apply_heuristics(self) -> pd.DataFrame: + def _apply_heuristics(self, app_ids: list) -> pd.DataFrame: """ Apply additional heuristics to applications to determine if they can be accelerated on GPU. """ profile_list = find_paths( - self.output_dir, + self.tools_output_dir, RegexPattern.rapids_profile.match, return_directories=True, ) - if not profile_list: - self.logger.warning('No RAPIDS profiles found in output directory: %s', self.output_dir) - return pd.DataFrame() + if len(profile_list) == 0: + self.logger.warning('No RAPIDS profiles found in output directory: %s', self.tools_output_dir) + return pd.DataFrame(columns=self.props.get_value('resultCols')) - app_id_paths = find_paths(profile_list[0], RegexPattern.app_id.match, return_directories=True) + profile_path = profile_list[0] result_arr = [] - if not app_id_paths: + if not os.listdir(profile_path) or len(app_ids) == 0: self.logger.warning('Skipping empty profile: %s', profile_list[0]) else: - for app_id_path in app_id_paths: + for app_id in app_ids: + app_id_path = os.path.join(profile_path, app_id) try: - app_info_path = os.path.join(app_id_path, self.props.get_value('appInfo', 'fileName')) - app_info = pd.read_csv(app_info_path) - app_name = app_info['appName'].values[0] - app_id = Path(app_id_path).name # Apply heuristics and determine if the application should be skipped. # Note: `should_skip` flag can be a combination of multiple heuristic checks. - should_skip = self.heuristics_based_on_spills(app_id_path) - result_arr.append([app_name, app_id, should_skip]) + should_skip, reason = self.heuristics_based_on_spills(app_id_path) except Exception as e: # pylint: disable=broad-except - self.logger.error('Error occurred while applying additional heuristics. ' - 'Reason - %s:%s', type(e).__name__, e) + should_skip = False + reason = f'Cannot apply heuristics for qualification. Reason - {type(e).__name__}:{e}' + self.logger.error(reason) + result_arr.append([app_id, should_skip, reason]) + return pd.DataFrame(result_arr, columns=self.props.get_value('resultCols')) - def heuristics_based_on_spills(self, app_id_path: str) -> bool: + def heuristics_based_on_spills(self, app_id_path: str) -> (bool, str): """ Apply heuristics based on spills to determine if the app can be accelerated on GPU. """ @@ -107,12 +107,19 @@ def heuristics_based_on_spills(self, app_id_path: str) -> bool: relevant_stages_with_spills = merged_df[~merged_df['SQL Nodes(IDs)'].apply( lambda x: isinstance(x, str) and bool(re.search(pattern, x)))] # If there are any stages with spills caused by non-allowed Execs, skip the application - return len(relevant_stages_with_spills) > 0 + if not relevant_stages_with_spills.empty: + stages_str = ', '.join(relevant_stages_with_spills['stageId'].astype(str)) + reason = f'Skipping due to spills in stages [{stages_str}] exceeding {spill_threshold_bytes} bytes' + return True, reason + return False, '' def apply_heuristics(self, all_apps: pd.DataFrame) -> pd.DataFrame: try: - additional_heuristics_df = self._apply_heuristics() - all_apps = pd.merge(all_apps, additional_heuristics_df, on=['App Name', 'App ID'], how='left') + heuristics_df = self._apply_heuristics(all_apps['App ID'].unique()) + # Save the heuristics results to a file and drop the reason column + heuristics_df.to_csv(self.output_file, index=False) + heuristics_df.drop(columns=self.props.get_value('reasonColumnName'), inplace=True) + all_apps = pd.merge(all_apps, heuristics_df, on=['App ID'], how='left') except Exception as e: # pylint: disable=broad-except self.logger.error('Error occurred while applying additional heuristics. ' 'Reason - %s:%s', type(e).__name__, e) From 16435a33cf727d5ddf2b4b75071e0e3a7cb58086 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Tue, 14 May 2024 14:24:53 -0700 Subject: [PATCH 6/7] Change delimiter to semi colon and update reason column name Signed-off-by: Partho Sarthi --- .../spark_rapids_pytools/resources/qualification-conf.yaml | 3 +-- .../src/spark_rapids_tools/tools/additional_heuristics.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index 6e90c18cc..2a2eefb14 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -272,11 +272,10 @@ local: additionalHeuristics: appInfo: fileName: 'application_information.csv' - reasonColumnName: 'Skip by Heuristics Reason' resultCols: - 'App ID' - 'Skip by Heuristics' - - 'Skip by Heuristics Reason' + - 'Reason' spillBased: jobStageAggMetrics: fileName: 'job_+_stage_level_aggregated_task_metrics.csv' diff --git a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py index 5eff7d253..9f24f6d45 100644 --- a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py +++ b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py @@ -108,7 +108,7 @@ def heuristics_based_on_spills(self, app_id_path: str) -> (bool, str): lambda x: isinstance(x, str) and bool(re.search(pattern, x)))] # If there are any stages with spills caused by non-allowed Execs, skip the application if not relevant_stages_with_spills.empty: - stages_str = ', '.join(relevant_stages_with_spills['stageId'].astype(str)) + stages_str = '; '.join(relevant_stages_with_spills['stageId'].astype(str)) reason = f'Skipping due to spills in stages [{stages_str}] exceeding {spill_threshold_bytes} bytes' return True, reason return False, '' @@ -118,7 +118,7 @@ def apply_heuristics(self, all_apps: pd.DataFrame) -> pd.DataFrame: heuristics_df = self._apply_heuristics(all_apps['App ID'].unique()) # Save the heuristics results to a file and drop the reason column heuristics_df.to_csv(self.output_file, index=False) - heuristics_df.drop(columns=self.props.get_value('reasonColumnName'), inplace=True) + heuristics_df.drop(columns=['Reason'], inplace=True) all_apps = pd.merge(all_apps, heuristics_df, on=['App ID'], how='left') except Exception as e: # pylint: disable=broad-except self.logger.error('Error occurred while applying additional heuristics. ' From a3a755182dfd01e962b45bff12b65fb706e66818 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Thu, 16 May 2024 10:57:21 -0700 Subject: [PATCH 7/7] Add function to convert size to human-readable format Signed-off-by: Partho Sarthi --- .../resources/qualification-conf.yaml | 2 +- .../tools/additional_heuristics.py | 4 +++- user_tools/src/spark_rapids_tools/utils/util.py | 12 ++++++++++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index 2a2eefb14..e566149df 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -287,7 +287,7 @@ local: columns: - 'stageId' - 'SQL Nodes(IDs)' - spillThresholdBytes: 1000000000 + spillThresholdBytes: 10737418240 allowedExecs: - 'Aggregate' - 'Join' diff --git a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py index 9f24f6d45..b2dff8417 100644 --- a/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py +++ b/user_tools/src/spark_rapids_tools/tools/additional_heuristics.py @@ -24,6 +24,7 @@ from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer from spark_rapids_pytools.common.utilities import ToolLogging from spark_rapids_tools.tools.model_xgboost import find_paths, RegexPattern +from spark_rapids_tools.utils import Utilities @dataclass @@ -109,7 +110,8 @@ def heuristics_based_on_spills(self, app_id_path: str) -> (bool, str): # If there are any stages with spills caused by non-allowed Execs, skip the application if not relevant_stages_with_spills.empty: stages_str = '; '.join(relevant_stages_with_spills['stageId'].astype(str)) - reason = f'Skipping due to spills in stages [{stages_str}] exceeding {spill_threshold_bytes} bytes' + spill_threshold_human_readable = Utilities.bytes_to_human_readable(spill_threshold_bytes) + reason = f'Skipping due to spills in stages [{stages_str}] exceeding {spill_threshold_human_readable}' return True, reason return False, '' diff --git a/user_tools/src/spark_rapids_tools/utils/util.py b/user_tools/src/spark_rapids_tools/utils/util.py index 8361aa764..1ff849e9b 100644 --- a/user_tools/src/spark_rapids_tools/utils/util.py +++ b/user_tools/src/spark_rapids_tools/utils/util.py @@ -309,3 +309,15 @@ def squeeze_df_header(cls, df_row: pd.DataFrame, header_width: int) -> pd.DataFr df_row.columns = df_row.columns.str.replace(column, new_column_name, regex=False) return df_row + + @classmethod + def bytes_to_human_readable(cls, num_bytes: int) -> str: + """ + Convert bytes to human-readable format up to PB + """ + size_units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] + i = 0 + while num_bytes >= 1024 and i < len(size_units) - 1: + num_bytes /= 1024.0 + i += 1 + return f'{num_bytes:.2f} {size_units[i]}'