diff --git a/raydar/task_tracker/task_tracker.py b/raydar/task_tracker/task_tracker.py index 058287e..b1b027d 100644 --- a/raydar/task_tracker/task_tracker.py +++ b/raydar/task_tracker/task_tracker.py @@ -1,16 +1,21 @@ import asyncio import coolname +import datetime import itertools import logging import os import pandas as pd import polars as pl import ray -from collections.abc import Iterable +import re +import requests +import time +from collections import defaultdict +from dataclasses import dataclass from packaging.version import Version from ray.serve import shutdown from ray.serve.handle import DeploymentHandle -from typing import Dict, List, Optional +from typing import Dict, Iterable, List, Optional from .schema import schema as default_schema @@ -89,6 +94,7 @@ def __init__( namespace: str, path: Optional[str] = None, enable_perspective_dashboard: bool = False, + scrape_prometheus_metrics: bool = False, ): """An async Ray Actor Class to track task level metadata. @@ -166,6 +172,98 @@ def __init__( "error_message": "str", }, ) + if scrape_prometheus_metrics: + self.__scraping_job = self.scrape_prometheus_metrics() + + def scrape_prometheus_metrics(self): + """ + Provide a heper method to parse perspective style metrics, a helper dataclass, and launch a job which indefinitely + scrapes the NodeManagerAddress:MetricsExportPort/metrics and updates the appropriate perspective tables with metric values. + """ + from prometheus_client.openmetrics import parser + + @dataclass + class ParsedOpenMetricsData: + metric_name: str + metric_description: str + metric_type: str + metric_value: str + metric_metadata: str + + def _parse_response(text): + parsed_data = [] + metric_name = None + metric_description = None + for line in text.split("\n"): + if len(line) > 0: + if line.startswith("# HELP "): + metric_description = " ".join(line.split(" ")[3:]) + elif line.startswith("# TYPE "): + _, _, metric_name, metric_type = line.split(" ") + else: + matches = re.search(r".*\{(.*)\}(.*)", line) + if matches is not None: + metric_metadata, metric_value = matches.groups() + metric_metadata = parser._parse_labels_with_state_machine(metric_metadata)[0] + else: + _, metric_value = line.split(" ") + metric_metadata = dict() + parsed_data.append( + ParsedOpenMetricsData( + metric_name=metric_name, + metric_description=metric_description, + metric_type=metric_type, + metric_value=eval(metric_value), + metric_metadata=metric_metadata, + ) + ) + return parsed_data + + @ray.remote + def _scrape_prometheus_metrics(): + """ + Refreshing every os.environ.get("RAYDAR_PROMETHEUS_METRICS_REFRESH_INTERVAL_S", 2) seconds, attempt to connect to the + NodeManagerAddress:MetricsExportPort/metrics endpoint and parse the prometheus metrics provided by this endpoint. + Publish the parsed metrics to the appropriate perspective table (tables are created based on metrics_name). + """ + metrics = set() + while True: + time.sleep(int(os.environ.get("RAYDAR_PROMETHEUS_METRICS_REFRESH_INTERVAL_S", 2))) + for node in ray.nodes(): + all_values = defaultdict(list) + if node.get("Alive", False): + response = requests.get(f"http://{node.get('NodeManagerAddress')}:{node.get('MetricsExportPort')}/metrics") + if response.status_code == 200: + parsed_values = _parse_response(response.text) + for parsed_value in parsed_values: + data = dict( + metric_name=parsed_value.metric_name, + metric_description=parsed_value.metric_description, + metric_type=parsed_value.metric_type, + metric_value=parsed_value.metric_value, + timestamp=datetime.datetime.now(), + ) + for key, value in parsed_value.metric_metadata.items(): + data[key] = value + all_values[parsed_value.metric_name].append(data) + + for key, values in all_values.items(): + if key not in metrics: + metrics.add(key) + self.proxy_server.remote("new", key, values) + else: + self.proxy_server.remote("update", key, values) + + return _scrape_prometheus_metrics.remote() + + def get_proxy_server(self): + """A getter for this actors proxy server attribute. Can be used to create custom perspective visuals. + + Returns: this actor's proxy_server attribute + """ + if self.proxy_server: + return self.proxy_server + raise Exception("This task_tracker has no active proxy_server.") def callback(self, tasks: Iterable[ray.ObjectRef]) -> None: """A remote function used by this actor's processor actor attribute. Will be called by a separate actor @@ -218,7 +316,9 @@ def update_perspective_dashboard(self, completed_tasks) -> None: That proxy_server serves perspective tables which anticipate the data formats we provide. Args: - completed_tasks: A list of tuples of the form (ObjectReference, TaskMetadata), where the ObjectReferences are neither Running nor Pending Assignment. + completed_tasks: A list of tuples of the form (ObjectReference, TaskMetadata), where the + ObjectReferences are neither Running nor Pending Assignment. + """ data = [ dict( @@ -297,14 +397,6 @@ def get_df(self) -> pl.DataFrame: ) return self.df - def get_proxy_server(self) -> ray.serve.handle.DeploymentHandle: - """A getter for this actors proxy server attribute. Can be used to create custom perspective visuals. - Returns: this actors proxy_server attribute - """ - if self.proxy_server: - return self.proxy_server - raise Exception("This task_tracker has no active proxy_server.") - def save_df(self) -> None: """Saves the internally maintained dataframe of task related information from the ray GCS""" self.get_df() @@ -323,7 +415,7 @@ def clear_df(self) -> None: class RayTaskTracker: - def __init__(self, name: str = "task_tracker", namespace: str = None, **kwargs): + def __init__(self, name: Optional[str] = "task_tracker", namespace: Optional[str] = None, **kwargs): """A utility to construct AsyncMetadataTracker actors. Wraps several remote AsyncMetadataTracker functions in a ray.get() call for convenience. @@ -362,11 +454,11 @@ def get_df(self, process_user_metadata_column=False) -> pl.DataFrame: return df_with_user_metadata return df - def save_df(self) -> None: + def save_df(self): """Save the dataframe used by this object's AsyncMetadataTracker actor""" return ray.get(self.tracker.save_df.remote()) - def clear(self) -> None: + def clear(self): """Clear the dataframe used by this object's AsyncMetadataTracker actor""" return ray.get(self.tracker.clear_df.remote()) diff --git a/raydar/tests/test_task_tracker.py b/raydar/tests/test_task_tracker.py index 9a135ed..1404eb6 100644 --- a/raydar/tests/test_task_tracker.py +++ b/raydar/tests/test_task_tracker.py @@ -22,6 +22,7 @@ def test_construction_and_dataframe(self): time.sleep(30) df = task_tracker.get_df() assert df[["name", "state"]].row(0) == ("do_some_work", "FINISHED") + task_tracker.exit() def test_get_proxy_server(self): from raydar.dashboard.server import PerspectiveRayServer @@ -37,4 +38,19 @@ def test_get_proxy_server(self): server.remote("update", "test_table", [dict(a="foo", b=1, c=1.0, d=time.time())]) time.sleep(2) response = requests.get("http://localhost:8000/tables") - assert eval(response.text) == ["test_table"] + tables = eval(response.text) + assert "test_table" in tables + + def test_scrape_prometheus_metrics(self): + task_tracker = RayTaskTracker( + enable_perspective_dashboard=True, + scrape_prometheus_metrics=True, + ) + time.sleep(30) + response = requests.get("http://localhost:8000/tables") + tables = eval(response.text) + assert len(tables) > 100 + assert "ray_tasks" in tables + assert "ray_actors" in tables + assert "ray_resources" in tables + task_tracker.exit()