From 8c7ebcdf9db41d64cc93f2040bcda9a481a7c1a6 Mon Sep 17 00:00:00 2001 From: Ahmed Hussein Date: Wed, 21 Aug 2024 13:23:26 -0500 Subject: [PATCH 1/6] Append HADOOP_CONF_DIR to the tools CLASSPATH execution cmd Signed-off-by: Ahmed Hussein Fixes #1253 Fixes #1302 This change includes the following: - the python wrapper pulls the hadoop configuration directory `$HADOOP_CONF_DIR` env var. If the latter is not defined, the wrapper tries `$HADDOP_HOME/etc/hadoop`. - If the `hadoop_conf_dir` is defined then it is appended to the java CLASSPATH iff it is a valid local directory path - If none of the above applies, the class path will be the same. --- .../src/spark_rapids_pytools/common/utilities.py | 4 ++-- .../src/spark_rapids_pytools/rapids/rapids_job.py | 15 ++++++++++++++- user_tools/src/spark_rapids_tools/utils/util.py | 14 ++++++++++++++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/common/utilities.py b/user_tools/src/spark_rapids_pytools/common/utilities.py index ba6fbb7ec..b867a89a0 100644 --- a/user_tools/src/spark_rapids_pytools/common/utilities.py +++ b/user_tools/src/spark_rapids_pytools/common/utilities.py @@ -27,7 +27,7 @@ from logging import Logger from shutil import make_archive, which import tempfile -from typing import Callable, Any +from typing import Callable, Any, Optional import chevron from packaging.version import Version @@ -105,7 +105,7 @@ def find_full_rapids_tools_env_key(cls, actual_key: str) -> str: return f'RAPIDS_USER_TOOLS_{actual_key}' @classmethod - def get_sys_env_var(cls, k: str, def_val=None): + def get_sys_env_var(cls, k: str, def_val=None) -> Optional[str]: return os.environ.get(k, def_val) @classmethod diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py index a4486f4e3..609b3475d 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py @@ -22,6 +22,8 @@ from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer from spark_rapids_pytools.common.utilities import ToolLogging, Utils from spark_rapids_pytools.rapids.tool_ctxt import ToolContext +from spark_rapids_tools.storagelib import LocalPath +from spark_rapids_tools.utils import Utilities @dataclass @@ -128,8 +130,19 @@ class RapidsLocalJob(RapidsJob): Implementation of a RAPIDS job that runs local on a local machine. """ - def _build_classpath(self): + def _build_classpath(self) -> List[str]: deps_arr = [self.prop_container.get_jar_file()] + hadoop_confdir = Utilities.get_hadoop_conf_dir() + # append hadoop conf dir if any + if hadoop_confdir is not None: + try: + hadoopconf_path = LocalPath(hadoop_confdir) + # verify it is a valid directory + if hadoopconf_path.is_dir() and hadoopconf_path.exists(): + deps_arr.append(hadoopconf_path.no_prefix) + except Exception as e: # pylint: disable=broad-except # noqa: E722 + self.logger.warning('Ignoring HADOOP_CLASSPATH. %s\n\tReason: Error while ' + 'adding hadoop conf dir to classpath: %s', hadoop_confdir, e) dependencies = self.prop_container.get_value_silent('platformArgs', 'dependencies') if dependencies is not None: deps_arr.extend(dependencies) diff --git a/user_tools/src/spark_rapids_tools/utils/util.py b/user_tools/src/spark_rapids_tools/utils/util.py index ab90c34ea..9ba45a754 100644 --- a/user_tools/src/spark_rapids_tools/utils/util.py +++ b/user_tools/src/spark_rapids_tools/utils/util.py @@ -323,3 +323,17 @@ def bytes_to_human_readable(cls, num_bytes: int) -> str: num_bytes /= 1024.0 i += 1 return f'{num_bytes:.2f} {size_units[i]}' + + @classmethod + def get_hadoop_conf_dir(cls) -> Optional[str]: + """ + Get Hadoop's configuration directory from the environment variables. + If not defined, return $HADOOP_HOME/etc/hadoop if HADOOP_HOME is defined. + Otherwise, returns None. + """ + hadoop_conf_dir = Utils.get_sys_env_var('HADOOP_CONF_DIR') + if hadoop_conf_dir is None: + hadoop_home = Utils.get_sys_env_var('HADOOP_HOME') + if hadoop_home is not None: + hadoop_conf_dir = os.path.join(hadoop_home, 'etc', 'hadoop') + return hadoop_conf_dir From e944887e53fbf194a84cce13a13672a93285d6fa Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Wed, 21 Aug 2024 12:31:25 -0700 Subject: [PATCH 2/6] Add default argument for HDFS handler Signed-off-by: Partho Sarthi --- .../src/spark_rapids_tools/storagelib/hdfs/hdfsfs.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/user_tools/src/spark_rapids_tools/storagelib/hdfs/hdfsfs.py b/user_tools/src/spark_rapids_tools/storagelib/hdfs/hdfsfs.py index 8b3b64447..f06f79f13 100644 --- a/user_tools/src/spark_rapids_tools/storagelib/hdfs/hdfsfs.py +++ b/user_tools/src/spark_rapids_tools/storagelib/hdfs/hdfsfs.py @@ -13,8 +13,11 @@ # limitations under the License. """Wrapper for the Hadoop File system""" +from typing import Any -from ..cspfs import CspFs, register_fs_class +from pyarrow.fs import HadoopFileSystem + +from ..cspfs import CspFs, register_fs_class, BoundedArrowFsT @register_fs_class("hdfs", "HadoopFileSystem") @@ -36,3 +39,10 @@ class HdfsFs(CspFs): CLASSPATH: must contain the Hadoop jars. example to set the export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob` """ + + @classmethod + def create_fs_handler(cls, *args: Any, **kwargs: Any) -> BoundedArrowFsT: + try: + return HadoopFileSystem(*(args or ("default",)), **kwargs) + except Exception as e: # pylint: disable=broad-except + raise RuntimeError(f"Failed to create HadoopFileSystem handler: {e}") from e From 8b43b5bbc650251c9437e2825cb8e5b0eedbc71a Mon Sep 17 00:00:00 2001 From: Ahmed Hussein Date: Thu, 22 Aug 2024 09:53:24 -0500 Subject: [PATCH 3/6] Add HADOOP_HOME/conf to the list of hadoop conf directories Signed-off-by: Ahmed Hussein --- .../spark_rapids_pytools/rapids/rapids_job.py | 64 ++++++++++++++----- .../src/spark_rapids_tools/utils/util.py | 14 ---- 2 files changed, 48 insertions(+), 30 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py index 609b3475d..c0ab5a025 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py @@ -17,19 +17,18 @@ import os from dataclasses import dataclass, field from logging import Logger -from typing import List +from typing import List, Optional from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer from spark_rapids_pytools.common.utilities import ToolLogging, Utils from spark_rapids_pytools.rapids.tool_ctxt import ToolContext from spark_rapids_tools.storagelib import LocalPath -from spark_rapids_tools.utils import Utilities @dataclass class RapidsJobPropContainer(JSONPropertiesContainer): """ - Container to manage the properties and arguments needed to submit a job running RAPIDS plugin. + Manages properties and arguments needed to running RAPIDS tools. """ def _init_fields(self): @@ -53,7 +52,7 @@ def get_rapids_args(self): @dataclass class RapidsJob: """ - A wrapper class to represent the actual execution of a RAPIDS plugin job on the cloud platform. + Represents an actual execution of a RAPIDS-tools job on the cloud platform. """ prop_container: RapidsJobPropContainer exec_ctxt: ToolContext @@ -117,7 +116,7 @@ def run_job(self): try: job_output = self._submit_job(cmd_args) if not ToolLogging.is_debug_mode_enabled(): - # we check the debug level because we do not want the output to be displayed twice + # we check the debug level because we do not want the output displayed twice. self._print_job_output(job_output) finally: self._cleanup_temp_log4j_files() @@ -127,22 +126,55 @@ def run_job(self): @dataclass class RapidsLocalJob(RapidsJob): """ - Implementation of a RAPIDS job that runs local on a local machine. + Implementation of a RAPIDS job that runs local on a machine. """ + def _get_hadoop_classpath(self) -> Optional[str]: + """ + Gets the Hadoop's configuration directory from the environment variables. + The first valid directory found is returned in the following order: + 1. HADOOP_CONF_DIR + 2. HADOOP_HOME/conf + 3. HADOOP_HOME/etc/hadoop + Otherwise, returns None. + + """ + hadoop_dir_lookups = { + 'hadoopConfDir': { + 'envVar': 'HADOOP_CONF_DIR', + 'postfix': '' + }, + 'hadoopHomeV1': { + 'envVar': 'HADOOP_HOME', + 'postfix': '/conf' + }, + 'hadoopHomeV2': { + 'envVar': 'HADOOP_HOME', + 'postfix': '/etc/hadoop' + } + } + # Iterate on the hadoop_dir_lookups to return the first valid directory found. + for dir_key, dir_value in hadoop_dir_lookups.items(): + env_var_value = Utils.get_sys_env_var(dir_value['envVar']) + if env_var_value is not None: + postfix = dir_value['postfix'] + conf_dir = f'{env_var_value}{postfix}' + try: + conf_dir_path = LocalPath(conf_dir) + if conf_dir_path.is_dir() and conf_dir_path.exists(): + # return the first valid directory found without the URI prefix + return conf_dir_path.no_prefix + except Exception as e: # pylint: disable=broad-except + self.logger.debug( + 'Could not build hadoop classpath from %s. Reason: %s', dir_key, e) + return None + def _build_classpath(self) -> List[str]: deps_arr = [self.prop_container.get_jar_file()] - hadoop_confdir = Utilities.get_hadoop_conf_dir() + hadoop_cp = self._get_hadoop_classpath() # append hadoop conf dir if any - if hadoop_confdir is not None: - try: - hadoopconf_path = LocalPath(hadoop_confdir) - # verify it is a valid directory - if hadoopconf_path.is_dir() and hadoopconf_path.exists(): - deps_arr.append(hadoopconf_path.no_prefix) - except Exception as e: # pylint: disable=broad-except # noqa: E722 - self.logger.warning('Ignoring HADOOP_CLASSPATH. %s\n\tReason: Error while ' - 'adding hadoop conf dir to classpath: %s', hadoop_confdir, e) + if hadoop_cp is not None: + deps_arr.append(hadoop_cp) dependencies = self.prop_container.get_value_silent('platformArgs', 'dependencies') if dependencies is not None: deps_arr.extend(dependencies) diff --git a/user_tools/src/spark_rapids_tools/utils/util.py b/user_tools/src/spark_rapids_tools/utils/util.py index 9ba45a754..ab90c34ea 100644 --- a/user_tools/src/spark_rapids_tools/utils/util.py +++ b/user_tools/src/spark_rapids_tools/utils/util.py @@ -323,17 +323,3 @@ def bytes_to_human_readable(cls, num_bytes: int) -> str: num_bytes /= 1024.0 i += 1 return f'{num_bytes:.2f} {size_units[i]}' - - @classmethod - def get_hadoop_conf_dir(cls) -> Optional[str]: - """ - Get Hadoop's configuration directory from the environment variables. - If not defined, return $HADOOP_HOME/etc/hadoop if HADOOP_HOME is defined. - Otherwise, returns None. - """ - hadoop_conf_dir = Utils.get_sys_env_var('HADOOP_CONF_DIR') - if hadoop_conf_dir is None: - hadoop_home = Utils.get_sys_env_var('HADOOP_HOME') - if hadoop_home is not None: - hadoop_conf_dir = os.path.join(hadoop_home, 'etc', 'hadoop') - return hadoop_conf_dir From 652ce1ebebe38221b18abc196c7f0d24de80742b Mon Sep 17 00:00:00 2001 From: Ahmed Hussein Date: Thu, 22 Aug 2024 10:38:15 -0500 Subject: [PATCH 4/6] Enforce output folder to be local in tools arguments Signed-off-by: Ahmed Hussein Fixes #1303 --- user_tools/src/spark_rapids_pytools/rapids/rapids_job.py | 5 ++++- .../src/spark_rapids_pytools/rapids/rapids_tool.py | 9 ++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py index c0ab5a025..49cdb3209 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py @@ -65,7 +65,10 @@ def get_platform_name(self): def _init_fields(self): self.logger = ToolLogging.get_and_setup_logger(f'rapids.tools.submit.{self.job_label}') - self.output_path = self.prop_container.get_value_silent('outputDirectory') + output_directory = self.prop_container.get_value_silent('outputDirectory') + if output_directory is not None: + # use LocalPath to add the 'file://' prefix to the path + self.output_path = str(LocalPath(output_directory)) def __post_init__(self): self._init_fields() diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index 094b289e6..1dbb4651f 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -38,6 +38,7 @@ from spark_rapids_pytools.rapids.rapids_job import RapidsJobPropContainer from spark_rapids_pytools.rapids.tool_ctxt import ToolContext from spark_rapids_tools import CspEnv +from spark_rapids_tools.storagelib import LocalPath from spark_rapids_tools.utils import Utilities @@ -135,7 +136,13 @@ def _process_output_args(self): # make sure that output_folder is being absolute if self.output_folder is None: self.output_folder = Utils.get_rapids_tools_env('OUTPUT_DIRECTORY', os.getcwd()) - self.output_folder = FSUtil.get_abs_path(self.output_folder) + try: + output_folder_path = LocalPath(self.output_folder) + self.output_folder = output_folder_path.no_prefix + except Exception as ex: # pylint: disable=broad-except + self.logger.error('Failed in processing output arguments. Output_folder must be a local directory') + raise ex + # self.output_folder = FSUtil.get_abs_path(self.output_folder) self.logger.debug('Root directory of local storage is set as: %s', self.output_folder) self.ctxt.set_local_workdir(self.output_folder) self.ctxt.load_prepackaged_resources() From f626c602e9a03e911a82d69bfeb070ecac91ce6a Mon Sep 17 00:00:00 2001 From: Ahmed Hussein Date: Thu, 22 Aug 2024 13:07:48 -0500 Subject: [PATCH 5/6] Removing commented code Signed-off-by: Ahmed Hussein --- user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py | 1 - 1 file changed, 1 deletion(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index 1dbb4651f..3270bed0f 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -142,7 +142,6 @@ def _process_output_args(self): except Exception as ex: # pylint: disable=broad-except self.logger.error('Failed in processing output arguments. Output_folder must be a local directory') raise ex - # self.output_folder = FSUtil.get_abs_path(self.output_folder) self.logger.debug('Root directory of local storage is set as: %s', self.output_folder) self.ctxt.set_local_workdir(self.output_folder) self.ctxt.load_prepackaged_resources() From 8d82acd839bc9b90b50058a9ccd8c2884240ccba Mon Sep 17 00:00:00 2001 From: Ahmed Hussein Date: Thu, 22 Aug 2024 14:20:40 -0500 Subject: [PATCH 6/6] Update copyrights Signed-off-by: Ahmed Hussein --- user_tools/src/spark_rapids_tools/storagelib/hdfs/hdfsfs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/user_tools/src/spark_rapids_tools/storagelib/hdfs/hdfsfs.py b/user_tools/src/spark_rapids_tools/storagelib/hdfs/hdfsfs.py index f06f79f13..35623f127 100644 --- a/user_tools/src/spark_rapids_tools/storagelib/hdfs/hdfsfs.py +++ b/user_tools/src/spark_rapids_tools/storagelib/hdfs/hdfsfs.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ # limitations under the License. """Wrapper for the Hadoop File system""" + from typing import Any from pyarrow.fs import HadoopFileSystem