Skip to content

Commit

Permalink
Merge pull request #25 from Point72/tbg/expose-perspective-proxy-server
Browse files Browse the repository at this point in the history
Exposing more utilities so that users can interact with a PerspectiveProxyRayServer
  • Loading branch information
gauglertodd authored Sep 9, 2024
2 parents 26a9c97 + 83acac2 commit f689ff8
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 22 deletions.
3 changes: 0 additions & 3 deletions raydar/dashboard/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ def clear_table(self, tablename: str, schema) -> None:
if tablename in self._tables:
self._tables[tablename].clear()

def get_table(self, tablename: str) -> None:
return self._schemas.get(tablename, None)

@app.get("/")
async def site(self):
return FileResponse(join(static_files_dir, "index.html"))
Expand Down
2 changes: 1 addition & 1 deletion raydar/task_tracker/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .task_tracker import AsyncMetadataTracker, RayTaskTracker
from .task_tracker import *
50 changes: 34 additions & 16 deletions raydar/task_tracker/task_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,41 @@
from collections.abc import Iterable
from packaging.version import Version
from ray.serve import shutdown
from ray.serve.handle import DeploymentHandle
from typing import Dict, List, Optional

from .schema import schema as default_schema

logger = logging.getLogger(__name__)

__all__ = ("AsyncMetadataTracker", "RayTaskTracker")
__all__ = ("AsyncMetadataTracker", "RayTaskTracker", "setup_proxy_server")


def get_callback_actor_name(name: str) -> str:
return f"{name}_callback_actor"


def setup_proxy_server(proxy_server_name="proxy", proxy_server_route_prefix="/proxy", **kwargs) -> DeploymentHandle:
"""Construct a webserver, and bind it to a PerspectiveProxyRayServer.
Args:
proxy_server_name: the name passed to ray.serve.run for the PerspectiveProxyRayServer
proxy_server_route_prefix: the route_prefix passed to ray.serve.run for the PerspectiveProxyRayServer
**kwargs: arguments forwarded to ray.serve.run() for the webserver
Returns: A DeploymentHandle for the PerspectiveProxyRayServer
"""
from raydar.dashboard.server import PerspectiveProxyRayServer

webserver = ray.serve.run(**kwargs)
proxy_server = ray.serve.run(
PerspectiveProxyRayServer.bind(webserver),
name=proxy_server_name,
route_prefix=proxy_server_route_prefix,
)
return proxy_server


@ray.remote(resources={"node:__internal_head__": 0.1}, num_cpus=0)
class AsyncMetadataTrackerCallback:
"""
Expand Down Expand Up @@ -107,22 +129,18 @@ def __init__(
self.client = StateApiClient(address=ray.get_runtime_context().gcs_address)

if self.perspective_dashboard_enabled:
from raydar.dashboard.server import PerspectiveProxyRayServer, PerspectiveRayServer
from raydar.dashboard.server import PerspectiveRayServer

kwargs = dict(
target=PerspectiveRayServer.bind(),
name="webserver",
route_prefix="/",
)

if Version(ray.__version__) < Version("2.10"):
kwargs["port"] = os.environ.get("RAYDAR_PORT", 8000)

self.webserver = ray.serve.run(**kwargs)
self.proxy_server = ray.serve.run(
PerspectiveProxyRayServer.bind(self.webserver),
name="proxy",
route_prefix="/proxy",
)
self.proxy_server = setup_proxy_server(**kwargs)
self.proxy_server.remote(
"new",
self.perspective_table_name,
Expand All @@ -149,14 +167,6 @@ def __init__(
},
)

def get_proxy_server(self) -> ray.serve.handle.DeploymentHandle:
"""A getter for this actors proxy server attribute. Can be used to create custom perspective visuals.
Returns: this actors proxy_server attribute
"""
if self.proxy_server:
return self.proxy_server
raise Exception("This task_tracker has no active proxy_server.")

def callback(self, tasks: Iterable[ray.ObjectRef]) -> None:
"""A remote function used by this actor's processor actor attribute. Will be called by a separate actor
with a collection of ray object references once those ObjectReferences are not in the "RUNNING" or
Expand Down Expand Up @@ -287,6 +297,14 @@ def get_df(self) -> pl.DataFrame:
)
return self.df

def get_proxy_server(self) -> ray.serve.handle.DeploymentHandle:
"""A getter for this actors proxy server attribute. Can be used to create custom perspective visuals.
Returns: this actors proxy_server attribute
"""
if self.proxy_server:
return self.proxy_server
raise Exception("This task_tracker has no active proxy_server.")

def save_df(self) -> None:
"""Saves the internally maintained dataframe of task related information from the ray GCS"""
self.get_df()
Expand Down
1 change: 0 additions & 1 deletion raydar/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def unittest_ray_config():
config = dict(
num_cpus=5,
include_dashboard=True,
dashboard_host="0.0.0.0",
)
return config

Expand Down
19 changes: 18 additions & 1 deletion raydar/tests/test_task_tracker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import pytest
import ray
import requests
import time

from raydar import RayTaskTracker
from raydar import RayTaskTracker, setup_proxy_server


@ray.remote
Expand All @@ -21,3 +22,19 @@ def test_construction_and_dataframe(self):
time.sleep(30)
df = task_tracker.get_df()
assert df[["name", "state"]].row(0) == ("do_some_work", "FINISHED")

def test_get_proxy_server(self):
from raydar.dashboard.server import PerspectiveRayServer

kwargs = dict(
target=PerspectiveRayServer.bind(),
name="webserver",
route_prefix="/",
)
server = setup_proxy_server(**kwargs)
server.remote("new", "test_table", dict(a="str", b="int", c="float", d="datetime"))
time.sleep(2)
server.remote("update", "test_table", [dict(a="foo", b=1, c=1.0, d=time.time())])
time.sleep(2)
response = requests.get("http://localhost:8000/tables")
assert eval(response.text) == ["test_table"]

0 comments on commit f689ff8

Please sign in to comment.