Skip to content

Commit

Permalink
Report Spark job ID as container name
Browse files Browse the repository at this point in the history
  • Loading branch information
liorgorb committed Sep 25, 2024
1 parent c620532 commit b77ff19
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 3 deletions.
21 changes: 18 additions & 3 deletions gprofiler/profiler_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@

from psutil import Process

from gprofiler.log import get_logger_adapter
from gprofiler.utils.databricks import DatabricksWebUI

if TYPE_CHECKING:
from gprofiler.containers_client import ContainerNamesClient

from gprofiler.utils import TemporaryDirectoryWithMode


logger = get_logger_adapter(__name__)


@dataclass
class ProfilerState:
# Class for storing generic state parameters. These parameters are the same for each profiler.
Expand All @@ -29,9 +35,18 @@ class ProfilerState:
def __post_init__(self) -> None:
self._temporary_dir = TemporaryDirectoryWithMode(dir=self.storage_dir, mode=0o755)
self.storage_dir = self._temporary_dir.name
self._databricks_web_ui = DatabricksWebUI()

def get_container_name(self, pid: int) -> str:
if self.container_names_client is not None:
return self.container_names_client.get_container_name(pid)
else:
try:
jobs = self._databricks_web_ui.get_running_jobs()
if len(jobs) == 0:
return ""
elif len(jobs) == 1:
return f"{jobs[0]['jobId']} ({jobs[0]['jobGroup']})"
else:
return "Multiple jobs"

except Exception:
logger.exception("Failed to get Databricks job ID")
return ""
95 changes: 95 additions & 0 deletions gprofiler/utils/databricks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from datetime import datetime, timedelta
from functools import lru_cache
from typing import Optional, Any, Dict, cast, List

import psutil
import requests
from psutil import Process

from gprofiler.log import get_logger_adapter

DATABRICKS_METRICS_PROP_PATH = "/databricks/spark/conf/metrics.properties"
HOST_KEY_NAME = "*.sink.ganglia.host"
DEFAULT_WEBUI_PORT = 40001
RUNNING_JOBS_CACHE_TTL = timedelta(seconds=60)

logger = get_logger_adapter(__name__)


class DatabricksWebUI:
def __init__(self) -> None:
self._executor_process: Optional[Process] = None
self._driver_daemon_process: Optional[Process] = None
self._running_jobs = []
self._last_running_jobs_fetch_time = Optional[datetime]

@lru_cache(maxsize=None)
def get_webui_address(self) -> Optional[str]:
with open(DATABRICKS_METRICS_PROP_PATH) as f:
properties = f.read()
try:
# Ignore line without `=` declaration
properties_values = dict(line.split("=", 1) for line in properties.splitlines() if "=" in line)
host = properties_values[HOST_KEY_NAME]
except KeyError as e:
if e.args[0] == HOST_KEY_NAME:
# Might happen while provisioning the cluster, retry.
return None
raise Exception(
f"Failed to get Databricks webui address {properties=}"
) from e
except Exception as e:
raise Exception(
f"Failed to get Databricks webui address {properties=}"
) from e
return f"{host}:{DEFAULT_WEBUI_PORT}"

def get_executor_process(self) -> Optional[Process]:
if self._executor_process is not None and self._executor_process.is_running():
return self._executor_process

for process in psutil.process_iter():
if 'org.apache.spark.executor.CoarseGrainedExecutorBackend' in process.cmdline():
self._executor_process = process
return self._executor_process

return None

def get_driver_daemon_process(self) -> Optional[Process]:
if self._driver_daemon_process is not None and self._driver_daemon_process.is_running():
return self._driver_daemon_process

for process in psutil.process_iter():
if 'org.apache.spark.deploy.master.DriverDaemon' in process.cmdline():
self._driver_daemon_process = process
return self._driver_daemon_process

return None

def web_ui_request(self, query: str) -> Any:
webui_address = self.get_webui_address()
if webui_address is None:
raise Exception("WebUI address is not available")
return cast(Any, requests.get(f"http://{webui_address}/{query}").json())

@lru_cache(maxsize=None)
def get_app_id(self) -> Optional[str]:
apps = self.web_ui_request("api/v1/applications")
if not apps:
return None
assert len(apps) == 1
return cast(str, apps[0]["id"])

@
def get_running_jobs(self) -> List[Dict[str, Any]]:
if self._last_running_jobs_fetch_time is not None:
if (datetime.now() - self._last_running_jobs_fetch_time) < RUNNING_JOBS_CACHE_TTL:
return self._running_jobs

app_id = self.get_app_id()
if app_id is None:
return []

self._running_jobs = cast(List, self.web_ui_request(f"api/v1/applications/{app_id}/jobs"))
self._last_running_jobs_fetch_time = datetime.now()
return self._running_jobs

0 comments on commit b77ff19

Please sign in to comment.