Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Add Estimation Model to Qualification CLI #870

Merged
merged 3 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion user_tools/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ dependencies = [
"azure-storage-blob==12.17.0",
"adlfs==2023.4.0",
# used for spinner animation
"progress==1.6"
"progress==1.6",
# used for model estimations
"xgboost==2.0.3"
]
dynamic=["entry-points", "version"]

Expand Down
7 changes: 0 additions & 7 deletions user_tools/src/spark_rapids_pytools/rapids/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,6 @@ def _generate_autotuner_input(self):
gpu_cluster_obj = self.ctxt.get_ctxt('gpuClusterProxy')
self._generate_autotuner_input_from_cluster(gpu_cluster_obj)

def _create_autotuner_rapids_args(self) -> list:
# Add the autotuner argument, also add worker-info if the autotunerPath exists
autotuner_path = self.ctxt.get_ctxt('autoTunerFilePath')
if autotuner_path is None:
return ['--auto-tuner']
return ['--auto-tuner', '--worker-info', autotuner_path]

def __read_single_app_output(self, file_path: str) -> (str, List[str], List[str]):
def split_list_str_by_pattern(input_seq: List[str], pattern: str) -> int:
ind = 0
Expand Down
87 changes: 73 additions & 14 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

"""Implementation class representing wrapper around the RAPIDS acceleration Qualification tool."""

import textwrap
import json
import textwrap
from dataclasses import dataclass, field
from math import ceil
from typing import Any, List, Callable
Expand All @@ -29,8 +29,10 @@
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator
from spark_rapids_pytools.rapids.rapids_job import RapidsJobPropContainer
from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool
from spark_rapids_tools.enums import QualFilterApp, QualGpuClusterReshapeType
from spark_rapids_tools.enums import QualFilterApp, QualGpuClusterReshapeType, QualEstimationModel
from spark_rapids_tools.tools.model_xgboost import predict
from spark_rapids_tools.tools.top_candidates import TopCandidates
from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration

Expand Down Expand Up @@ -302,6 +304,24 @@ def __process_filter_args(self, arg_val: str):
selected_filter = QualFilterApp.fromstring(default_filter_txt)
self.ctxt.set_ctxt('filterApps', selected_filter)

def _process_estimation_model_args(self):
# set the estimation model
estimation_model_str = self.wrapper_options.get('estimationModel')
if estimation_model_str is not None:
selected_estimation_model = QualEstimationModel.fromstring(estimation_model_str)
if selected_estimation_model is None:
selected_estimation_model = QualEstimationModel.get_default()
available_models = [qual_model.value for qual_model in QualEstimationModel]
self.logger.warning(
'Invalid argument estimation_model=%s.\n\t'
'Accepted options are: [%s].\n\t'
'Falling-back to default filter: %s',
estimation_model_str, Utils.gen_joined_str(' | ', available_models),
selected_estimation_model.value)
else:
selected_estimation_model = QualEstimationModel.get_default()
self.ctxt.set_ctxt('estimationModel', selected_estimation_model)

def _process_external_pricing_args(self):
cpu_cluster_price = self.wrapper_options.get('cpuClusterPrice')
estimated_gpu_cluster_price = self.wrapper_options.get('estimatedGpuClusterPrice')
Expand Down Expand Up @@ -342,15 +362,6 @@ def check_discount_percentage(discount_type: str, discount_value: int):
self.ctxt.set_ctxt('cpu_discount', cpu_discount)
self.ctxt.set_ctxt('gpu_discount', gpu_discount)

def _create_autotuner_rapids_args(self) -> list:
# Add the autotuner argument, also add worker-info if the autotunerPath exists
if self.ctxt.get_rapids_auto_tuner_enabled():
autotuner_path = self.ctxt.get_ctxt('autoTunerFilePath')
if autotuner_path is None:
return ['--auto-tuner']
return ['--auto-tuner', '--worker-info', autotuner_path]
return []

def _create_cluster_report_args(self) -> list:
# Add the cluster-report argument if the cluster is not defined else disable it
if self.ctxt.get_ctxt('cpuClusterProxy') is None:
Expand Down Expand Up @@ -385,7 +396,7 @@ def _process_custom_args(self):
# we need to process each argument to verify it is valid. otherwise, we may crash late
self.__process_gpu_cluster_recommendation(self.wrapper_options.get('gpuClusterRecommendation'))
self.__process_filter_args(self.wrapper_options.get('filterApps'))

self._process_estimation_model_args()
self._process_offline_cluster_args()
self._process_eventlogs_args()
self._process_external_pricing_args()
Expand Down Expand Up @@ -799,6 +810,12 @@ def process_df_for_stdout(raw_df):

if not self._evaluate_rapids_jar_tool_output_exist():
return
# process the output through the XGboost model if enabled
if self.ctxt.get_ctxt('estimationModel') == QualEstimationModel.XGBOOST:
xgboost_input_dir = self.ctxt.get_local('outputFolder')
xgboost_output_dir = xgboost_input_dir
predict(qual=xgboost_input_dir, profile=xgboost_input_dir, output_dir=xgboost_output_dir)

rapids_output_dir = self.ctxt.get_rapids_output_folder()
rapids_summary_file = FSUtil.build_path(rapids_output_dir,
self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'fileName'))
Expand Down Expand Up @@ -844,9 +861,15 @@ def _generate_section_lines(self, sec_conf: dict) -> List[str]:
return super()._generate_section_content(sec_conf)

def _init_rapids_arg_list(self) -> List[str]:
return super()._init_rapids_arg_list() + self._create_autotuner_rapids_args() + \
return super()._init_rapids_arg_list() + self._init_rapids_arg_list_for_qual()

def _init_rapids_arg_list_for_qual(self) -> List[str]:
return ['--per-sql'] + self._create_autotuner_rapids_args() + \
self._create_cluster_report_args()

def _init_rapids_arg_list_for_profile(self) -> List[str]:
return super()._init_rapids_arg_list() + ['--csv']

def _process_cluster_info_and_update_savings(self, cluster_info_file):
"""
Process the cluster information from the cluster info file and update savings if CPU cluster
Expand Down Expand Up @@ -907,7 +930,10 @@ def _process_job_submission_args(self):
self._process_local_job_submission_args()

def _prepare_job_arguments(self):
self._prepare_local_job_arguments()
super()._prepare_local_job_arguments()
if self.ctxt.get_ctxt('estimationModel') == QualEstimationModel.XGBOOST:
# when estimation_model is enabled
self._prepare_profile_job_args()

def _delete_remote_dep_folder(self):
self.logger.debug('Local mode skipping deleting the remote workdir')
Expand All @@ -917,3 +943,36 @@ def _download_remote_output_folder(self):

def _archive_results(self):
self._archive_local_results()

def _prepare_profile_job_args(self):
# get the job arguments
job_args = self.ctxt.get_ctxt('jobArgs')
# now we can create the job object
# Todo: For dataproc, this can be autogenerated from cluster name
rapids_arg_list = self._init_rapids_arg_list_for_profile()
ctxt_rapids_args = self.ctxt.get_ctxt('rapidsArgs')
jar_file_path = ctxt_rapids_args.get('jarFilePath')
rapids_opts = ctxt_rapids_args.get('rapidsOpts')
if rapids_opts:
rapids_arg_list.extend(rapids_opts)
# add the eventlogs at the end of all the tool options
rapids_arg_list.extend(self.ctxt.get_ctxt('eventLogs'))
class_name = 'com.nvidia.spark.rapids.tool.profiling.ProfileMain'
rapids_arg_obj = {
'jarFile': jar_file_path,
'jarArgs': rapids_arg_list,
'className': class_name
}
platform_args = job_args.get('platformArgs')
spark_conf_args = {}
job_properties_json = {
'outputDirectory': job_args.get('outputDirectory'),
'rapidsArgs': rapids_arg_obj,
'sparkConfArgs': spark_conf_args,
'platformArgs': platform_args
}
rapids_job_container = RapidsJobPropContainer(prop_arg=job_properties_json,
file_load=False)
rapids_containers = self.ctxt.get_ctxt('rapidsJobContainers')
rapids_containers.append(rapids_job_container)
self.ctxt.set_ctxt('rapidsJobContainers', rapids_containers)
41 changes: 33 additions & 8 deletions user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,9 +637,8 @@ def _run_rapids_tool(self):
# 2- prepare the arguments
# 2.a -check if the app_id is not none
self._prepare_job_arguments()
#
# 3- create a submission job
# 4- execute
# 3- create a submit jobs
self._submit_jobs()

def _get_main_cluster_obj(self):
return self.ctxt.get_ctxt('cpuClusterProxy')
Expand Down Expand Up @@ -803,6 +802,15 @@ def _init_rapids_arg_list(self) -> List[str]:
# TODO: Make sure we add this argument only for jar versions 23.02+
return ['--platform', self.ctxt.platform.get_platform_name().replace('_', '-')]

def _create_autotuner_rapids_args(self) -> list:
# Add the autotuner argument, also add worker-info if the autotunerPath exists
if self.ctxt.get_rapids_auto_tuner_enabled():
autotuner_path = self.ctxt.get_ctxt('autoTunerFilePath')
if autotuner_path is None:
return ['--auto-tuner']
return ['--auto-tuner', '--worker-info', autotuner_path]
return []

@timeit('Building Job Arguments and Executing Job CMD') # pylint: disable=too-many-function-args
def _prepare_local_job_arguments(self):
job_args = self.ctxt.get_ctxt('jobArgs')
Expand Down Expand Up @@ -830,11 +838,9 @@ def _prepare_local_job_arguments(self):
'sparkConfArgs': spark_conf_args,
'platformArgs': platform_args
}
job_properties = RapidsJobPropContainer(prop_arg=job_properties_json,
file_load=False)
job_obj = self.ctxt.platform.create_local_submission_job(job_prop=job_properties,
ctxt=self.ctxt)
job_obj.run_job()
rapids_job_container = RapidsJobPropContainer(prop_arg=job_properties_json,
file_load=False)
self.ctxt.set_ctxt('rapidsJobContainers', [rapids_job_container])

def _archive_results(self):
self._archive_local_results()
Expand All @@ -845,3 +851,22 @@ def _archive_local_results(self):
local_folder = self.ctxt.get_output_folder()
# TODO make sure it worth issuing the command
self.ctxt.platform.storage.upload_resource(local_folder, remote_work_dir)

def _submit_jobs(self):
# create submission jobs
rapids_job_containers = self.ctxt.get_ctxt('rapidsJobContainers')
futures_list = []
results = []
with ThreadPoolExecutor(max_workers=len(rapids_job_containers)) as executor:
for rapids_job in rapids_job_containers:
job_obj = self.ctxt.platform.create_local_submission_job(job_prop=rapids_job,
ctxt=self.ctxt)
futures = executor.submit(job_obj.run_job)
futures_list.append(futures)
try:
for future in concurrent.futures.as_completed(futures_list):
result = future.result()
results.append(result)
except Exception as ex: # pylint: disable=broad-except
self.logger.error('Failed to download dependencies %s', ex)
raise ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ sparkRapids:
repoUrl: '{}/{}/rapids-4-spark-tools_2.12-{}.jar'
mainClass: 'com.nvidia.spark.rapids.tool.profiling.ProfileMain'
outputDocURL: 'https://docs.nvidia.com/spark-rapids/user-guide/latest/spark-profiling-tool.html#understanding-profiling-tool-detailed-output-and-examples'
enableAutoTuner: true
cli:
toolOptions:
- csv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ sparkRapids:
- num-output-rows
- num-threads
- order
- p
- per-sql
- r
- report-read-schema
- s
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def qualification(cpu_cluster: str = None,
filter_apps: str = QualFilterApp.tostring(QualFilterApp.SAVINGS),
gpu_cluster_recommendation: str = QualGpuClusterReshapeType.tostring(
QualGpuClusterReshapeType.get_default()),
estimation_model: str = None,
jvm_heap_size: int = None,
verbose: bool = None,
cpu_discount: int = None,
Expand Down Expand Up @@ -92,6 +93,10 @@ def qualification(cpu_cluster: str = None,
"MATCH": keep GPU cluster same number of nodes as CPU cluster;
"CLUSTER": recommend optimal GPU cluster by cost for entire cluster;
"JOB": recommend optimal GPU cluster by cost per job.
:param estimation_model: Model used to calculate the estimated GPU duration and cost savings.
It accepts one of the following:
"xgboost": an XGBoost model for GPU duration estimation
"speedups": set by default. It uses a simple static estimated speedup per operator.
:param jvm_heap_size: The maximum heap size of the JVM in gigabytes.
:param verbose: True or False to enable verbosity to the wrapper script.
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
Expand Down Expand Up @@ -142,7 +147,8 @@ def qualification(cpu_cluster: str = None,
'gpuClusterRecommendation': gpu_cluster_recommendation,
'cpuDiscount': cpu_discount,
'gpuDiscount': gpu_discount,
'globalDiscount': global_discount
'globalDiscount': global_discount,
'estimationModel': estimation_model
}
QualificationAsLocal(platform_type=CspEnv.DATABRICKS_AWS,
cluster=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def qualification(cpu_cluster: str = None,
filter_apps: str = QualFilterApp.tostring(QualFilterApp.SAVINGS),
gpu_cluster_recommendation: str = QualGpuClusterReshapeType.tostring(
QualGpuClusterReshapeType.get_default()),
estimation_model: str = None,
jvm_heap_size: int = None,
verbose: bool = None,
cpu_discount: int = None,
Expand Down Expand Up @@ -90,6 +91,10 @@ def qualification(cpu_cluster: str = None,
"MATCH": keep GPU cluster same number of nodes as CPU cluster;
"CLUSTER": recommend optimal GPU cluster by cost for entire cluster;
"JOB": recommend optimal GPU cluster by cost per job.
:param estimation_model: Model used to calculate the estimated GPU duration and cost savings.
It accepts one of the following:
"xgboost": an XGBoost model for GPU duration estimation
"speedups": set by default. It uses a simple static estimated speedup per operator.
:param jvm_heap_size: The maximum heap size of the JVM in gigabytes.
:param verbose: True or False to enable verbosity to the wrapper script.
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
Expand Down Expand Up @@ -138,7 +143,8 @@ def qualification(cpu_cluster: str = None,
'gpuClusterRecommendation': gpu_cluster_recommendation,
'cpuDiscount': cpu_discount,
'gpuDiscount': gpu_discount,
'globalDiscount': global_discount
'globalDiscount': global_discount,
'estimationModel': estimation_model
}
QualificationAsLocal(platform_type=CspEnv.DATABRICKS_AZURE,
cluster=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def qualification(cpu_cluster: str = None,
filter_apps: str = QualFilterApp.tostring(QualFilterApp.SAVINGS),
gpu_cluster_recommendation: str = QualGpuClusterReshapeType.tostring(
QualGpuClusterReshapeType.get_default()),
estimation_model: str = None,
jvm_heap_size: int = None,
verbose: bool = None,
cpu_discount: int = None,
Expand Down Expand Up @@ -87,6 +88,10 @@ def qualification(cpu_cluster: str = None,
"MATCH": keep GPU cluster same number of nodes as CPU cluster;
"CLUSTER": recommend optimal GPU cluster by cost for entire cluster;
"JOB": recommend optimal GPU cluster by cost per job
:param estimation_model: Model used to calculate the estimated GPU duration and cost savings.
It accepts one of the following:
"xgboost": an XGBoost model for GPU duration estimation
"speedups": set by default. It uses a simple static estimated speedup per operator.
:param jvm_heap_size: The maximum heap size of the JVM in gigabytes
:param verbose: True or False to enable verbosity to the wrapper script
:param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value
Expand Down Expand Up @@ -132,7 +137,8 @@ def qualification(cpu_cluster: str = None,
'gpuClusterRecommendation': gpu_cluster_recommendation,
'cpuDiscount': cpu_discount,
'gpuDiscount': gpu_discount,
'globalDiscount': global_discount
'globalDiscount': global_discount,
'estimationModel': estimation_model
}

tool_obj = QualificationAsLocal(platform_type=CspEnv.DATAPROC_GKE,
Expand Down
Loading
Loading