Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Append HADOOP_CONF_DIR to the tools CLASSPATH execution cmd #1308

Merged
merged 6 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions user_tools/src/spark_rapids_pytools/common/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from logging import Logger
from shutil import make_archive, which
import tempfile
from typing import Callable, Any
from typing import Callable, Any, Optional

import chevron
from packaging.version import Version
Expand Down Expand Up @@ -105,7 +105,7 @@ def find_full_rapids_tools_env_key(cls, actual_key: str) -> str:
return f'RAPIDS_USER_TOOLS_{actual_key}'

@classmethod
def get_sys_env_var(cls, k: str, def_val=None):
def get_sys_env_var(cls, k: str, def_val=None) -> Optional[str]:
return os.environ.get(k, def_val)

@classmethod
Expand Down
62 changes: 55 additions & 7 deletions user_tools/src/spark_rapids_pytools/rapids/rapids_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@
import os
from dataclasses import dataclass, field
from logging import Logger
from typing import List
from typing import List, Optional

from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.utilities import ToolLogging, Utils
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext
from spark_rapids_tools.storagelib import LocalPath


@dataclass
class RapidsJobPropContainer(JSONPropertiesContainer):
"""
Container to manage the properties and arguments needed to submit a job running RAPIDS plugin.
Manages properties and arguments needed to running RAPIDS tools.
"""

def _init_fields(self):
Expand All @@ -51,7 +52,7 @@ def get_rapids_args(self):
@dataclass
class RapidsJob:
"""
A wrapper class to represent the actual execution of a RAPIDS plugin job on the cloud platform.
Represents an actual execution of a RAPIDS-tools job on the cloud platform.
"""
prop_container: RapidsJobPropContainer
exec_ctxt: ToolContext
Expand All @@ -64,7 +65,10 @@ def get_platform_name(self):

def _init_fields(self):
self.logger = ToolLogging.get_and_setup_logger(f'rapids.tools.submit.{self.job_label}')
self.output_path = self.prop_container.get_value_silent('outputDirectory')
output_directory = self.prop_container.get_value_silent('outputDirectory')
if output_directory is not None:
# use LocalPath to add the 'file://' prefix to the path
self.output_path = str(LocalPath(output_directory))

def __post_init__(self):
self._init_fields()
Expand Down Expand Up @@ -115,7 +119,7 @@ def run_job(self):
try:
job_output = self._submit_job(cmd_args)
if not ToolLogging.is_debug_mode_enabled():
# we check the debug level because we do not want the output to be displayed twice
# we check the debug level because we do not want the output displayed twice.
self._print_job_output(job_output)
finally:
self._cleanup_temp_log4j_files()
Expand All @@ -125,11 +129,55 @@ def run_job(self):
@dataclass
class RapidsLocalJob(RapidsJob):
"""
Implementation of a RAPIDS job that runs local on a local machine.
Implementation of a RAPIDS job that runs local on a machine.
"""

def _build_classpath(self):
def _get_hadoop_classpath(self) -> Optional[str]:
"""
Gets the Hadoop's configuration directory from the environment variables.
The first valid directory found is returned in the following order:
1. HADOOP_CONF_DIR
2. HADOOP_HOME/conf
3. HADOOP_HOME/etc/hadoop
Otherwise, returns None.

"""
hadoop_dir_lookups = {
'hadoopConfDir': {
'envVar': 'HADOOP_CONF_DIR',
'postfix': ''
},
'hadoopHomeV1': {
'envVar': 'HADOOP_HOME',
'postfix': '/conf'
},
'hadoopHomeV2': {
'envVar': 'HADOOP_HOME',
'postfix': '/etc/hadoop'
}
}
# Iterate on the hadoop_dir_lookups to return the first valid directory found.
for dir_key, dir_value in hadoop_dir_lookups.items():
env_var_value = Utils.get_sys_env_var(dir_value['envVar'])
if env_var_value is not None:
postfix = dir_value['postfix']
conf_dir = f'{env_var_value}{postfix}'
try:
conf_dir_path = LocalPath(conf_dir)
if conf_dir_path.is_dir() and conf_dir_path.exists():
# return the first valid directory found without the URI prefix
return conf_dir_path.no_prefix
except Exception as e: # pylint: disable=broad-except
self.logger.debug(
'Could not build hadoop classpath from %s. Reason: %s', dir_key, e)
return None

def _build_classpath(self) -> List[str]:
deps_arr = [self.prop_container.get_jar_file()]
hadoop_cp = self._get_hadoop_classpath()
# append hadoop conf dir if any
if hadoop_cp is not None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this throw an exception when hadoop conf dir was not found but event logs are in hdfs?

deps_arr.append(hadoop_cp)
dependencies = self.prop_container.get_value_silent('platformArgs', 'dependencies')
if dependencies is not None:
deps_arr.extend(dependencies)
Expand Down
8 changes: 7 additions & 1 deletion user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from spark_rapids_pytools.rapids.rapids_job import RapidsJobPropContainer
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext
from spark_rapids_tools import CspEnv
from spark_rapids_tools.storagelib import LocalPath
from spark_rapids_tools.utils import Utilities


Expand Down Expand Up @@ -135,7 +136,12 @@ def _process_output_args(self):
# make sure that output_folder is being absolute
if self.output_folder is None:
self.output_folder = Utils.get_rapids_tools_env('OUTPUT_DIRECTORY', os.getcwd())
self.output_folder = FSUtil.get_abs_path(self.output_folder)
try:
output_folder_path = LocalPath(self.output_folder)
self.output_folder = output_folder_path.no_prefix
except Exception as ex: # pylint: disable=broad-except
self.logger.error('Failed in processing output arguments. Output_folder must be a local directory')
raise ex
self.logger.debug('Root directory of local storage is set as: %s', self.output_folder)
self.ctxt.set_local_workdir(self.output_folder)
self.ctxt.load_prepackaged_resources()
Expand Down
15 changes: 13 additions & 2 deletions user_tools/src/spark_rapids_tools/storagelib/hdfs/hdfsfs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,11 @@

"""Wrapper for the Hadoop File system"""

from ..cspfs import CspFs, register_fs_class
from typing import Any

from pyarrow.fs import HadoopFileSystem

from ..cspfs import CspFs, register_fs_class, BoundedArrowFsT


@register_fs_class("hdfs", "HadoopFileSystem")
Expand All @@ -36,3 +40,10 @@ class HdfsFs(CspFs):
CLASSPATH: must contain the Hadoop jars.
example to set the export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
"""

@classmethod
def create_fs_handler(cls, *args: Any, **kwargs: Any) -> BoundedArrowFsT:
try:
return HadoopFileSystem(*(args or ("default",)), **kwargs)
except Exception as e: # pylint: disable=broad-except
raise RuntimeError(f"Failed to create HadoopFileSystem handler: {e}") from e