Skip to content

Commit

Permalink
Merge pull request #2 from anyscale/hd-e2e-impl
Browse files Browse the repository at this point in the history
[Hosted Dashboard] End to end flow
  • Loading branch information
rkooo567 authored Feb 17, 2020
2 parents 7d25376 + 9c48eff commit 01d32c2
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 48 deletions.
36 changes: 36 additions & 0 deletions python/ray/dashboard/closed_source/auth_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import argparse

from aiohttp import web


async def health(request):
return web.json_response({"Status": "Healty"})


async def return_ingest_server_url(request):
# TODO(sang): Prepare the proper authorization process.
result = {"ingestor_url": "localhost:50051", "access_token": "1234"}
return web.json_response(result)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--host",
required=False,
type=str,
default="127.0.0.1",
help="The host to use for the GRPC server.")
parser.add_argument(
"--port",
required=False,
default=8080,
type=str,
help="The port to use for the GRPC server.")
args = parser.parse_args()
app = web.Application()
app.add_routes([
web.get("/auth", return_ingest_server_url),
web.get("/health", health)
])
web.run_app(app, host=args.host, port=args.port, shutdown_timeout=5.0)
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
args.redis_address,
args.temp_dir,
redis_password=args.redis_password,
hosted_dashboard_client=False,
DashboardController=HostedDashboardController)
dashboard.run()
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import json
import logging
import redis

# TODO(sang): Add this module to requirements
from prometheus_client import start_http_server, Gauge

import ray

from ray.dashboard.closed_source.ingest_server \
import NODE_INFO_CHANNEL, RAY_INFO_CHANNEL

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# TODO(sang) Refactor to have more scalable structure
# TODO(sang) Add args
if __name__ == "__main__":
redis_client = redis.StrictRedis(host="127.0.0.1", port=6379)
p = redis_client.pubsub(ignore_subscribe_messages=True)
p.psubscribe(RAY_INFO_CHANNEL)
p.psubscribe(NODE_INFO_CHANNEL)
metrics_cpu_usage = Gauge(
"ray_metrics_cpu_usage",
"CPU usage of pid residing in a host.",
labelnames=("pid", "host", "ip"))
metrics_mem_usage = Gauge(
"ray_metrics_mem_usage",
"Memory usage of pid residing in a host.",
labelnames=("pid", "host", "ip"))

# Start up the server to expose the metrics.
# TODO(sang) Read from args
host, port = "127.0.0.1", 8000
logger.info("Server listening on port {} at address {}".format(host, port))
start_http_server(port)

# Read data from Redis.
for x in p.listen():
data = x["data"]
channel = ray.utils.decode(x["channel"])
data = json.loads(ray.utils.decode(data))

if channel == NODE_INFO_CHANNEL:
clients = data["clients"]
for client in clients:
host = client["hostname"]
ip = client["ip"]
workers = client["workers"]

for worker in workers:
pid = worker["pid"]
cpu_usage = worker["cpu_percent"]
mem_usage = worker["memory_info"]["rss"]
metrics_cpu_usage.labels(
pid=pid, host=host, ip=ip).set(cpu_usage)
metrics_mem_usage.labels(
pid=pid, host=host, ip=ip).set(mem_usage)
50 changes: 29 additions & 21 deletions python/ray/dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ def b64_decode(reply):
return b64decode(reply).decode("utf-8")


def get_host_and_port(addr):
return addr.strip().split(":")


class DashboardController(BaseDashboardController):
"""Perform data fetching and other actions required by HTTP endpoints."""

Expand All @@ -115,8 +119,8 @@ def _construct_raylet_info(self):
# ready_tasks are used to render tasks that are not schedulable
# due to resource limitations.
# (e.g., Actor requires 2 GPUs but there is only 1 gpu available).
ready_tasks = sum(
(data.get("readyTasks", []) for data in D.values()), [])
ready_tasks = sum((data.get("readyTasks", []) for data in D.values()),
[])
actor_tree = self.node_stats.get_actor_tree(
workers_info_by_node, infeasible_tasks, ready_tasks)
for address, data in D.items():
Expand All @@ -125,8 +129,8 @@ def _construct_raylet_info(self):
for view_data in data["viewData"]:
view_name = view_data["viewName"]
if view_name in ("local_available_resource",
"local_total_resource",
"object_manager_stats"):
"local_total_resource",
"object_manager_stats"):
measures_dicts[view_name] = measures_to_dict(
view_data["measures"])
# process resources info
Expand All @@ -150,9 +154,8 @@ def _construct_raylet_info(self):
for stats_name in [
"used_object_store_memory", "num_local_objects"
]:
stats_value = measures_dicts[
"object_manager_stats"].get(
prefix + stats_name, .0)
stats_value = measures_dicts["object_manager_stats"].get(
prefix + stats_name, .0)
extra_info_strings.append("{}: {}".format(
stats_name, stats_value))
data["extraInfo"] += ", ".join(extra_info_strings)
Expand All @@ -163,8 +166,7 @@ def _construct_raylet_info(self):
max_line_length = max(map(len, lines))
to_print = []
for line in lines:
to_print.append(line +
(max_line_length - len(line)) * " ")
to_print.append(line + (max_line_length - len(line)) * " ")
data["extraInfo"] += "\n" + "\n".join(to_print)
return {"nodes": D, "actors": actor_tree}

Expand Down Expand Up @@ -213,8 +215,7 @@ class Dashboard(object):
temp_dir (str): The temporary directory used for log files and
information for this Ray session.
redis_passord(str): Redis password to access GCS
hosted_dashboard_client(bool): True if a server runs as a
hosted dashboard client mode.
hosted_dashboard_addr(str): The address users host their dashboard.
update_frequency(float): Frequency where metrics are updated.
DashboardController(DashboardController): DashboardController
that defines the business logic of a Dashboard server.
Expand All @@ -226,7 +227,7 @@ def __init__(self,
redis_address,
temp_dir,
redis_password=None,
hosted_dashboard_client=False,
hosted_dashboard_addr=None,
update_frequency=1.0,
DashboardController=DashboardController):
self.host = host
Expand All @@ -240,10 +241,15 @@ def __init__(self,
if Analysis is not None:
self.tune_stats = TuneCollector(DEFAULT_RESULTS_DIR, 2.0)

self.hosted_dashboard_client = hosted_dashboard_client
self.hosted_dashboard_addr = hosted_dashboard_addr
self.dashboard_client = None
if self.hosted_dashboard_client:
self.dashboard_client = DashboardClient(self.dashboard_controller)

if self.hosted_dashboard_addr:
hosted_dashboard_host, hosted_dashboard_port = get_host_and_port(
self.hosted_dashboard_addr)
self.dashboard_client = DashboardClient(hosted_dashboard_host,
hosted_dashboard_port,
self.dashboard_controller)

# Setting the environment variable RAY_DASHBOARD_DEV=1 disables some
# security checks in the dashboard server to ease development while
Expand Down Expand Up @@ -381,7 +387,7 @@ async def errors(req) -> aiohttp.web.Response:
result = self.dashboard_controller.get_errors(hostname, pid)
return await json_response(result=result)

if not self.hosted_dashboard_client:
if self.hosted_dashboard_addr is None:
# Hosted dashboard mode won't use local dashboard frontend.
self.app.router.add_get("/", get_index)
self.app.router.add_get("/favicon.ico", get_favicon)
Expand Down Expand Up @@ -428,7 +434,7 @@ def log_dashboard_url(self):
def run(self):
self.log_dashboard_url()
self.dashboard_controller.start_collecting_metrics()
if self.hosted_dashboard_client:
if self.hosted_dashboard_addr:
self.dashboard_client.start_exporting_metrics()
if Analysis is not None:
self.tune_stats.start()
Expand Down Expand Up @@ -832,10 +838,12 @@ def __init__(self, logdir, reload_interval):
try:
logger.info("Create a directory at {}".format(self._logdir))
os.mkdir(self._logdir)
except:
FileNotFoundError("Log directory {} does not exist. "
"Please create the directory to use Tune "
"collector".format(self._logdir))
except OSError as e:
logger.warning(e)
raise FileNotFoundError(
"Log directory {} does not exist. "
"Please create the directory to use Tune "
"collector".format(self._logdir))

super().__init__()

Expand Down
9 changes: 8 additions & 1 deletion python/ray/dashboard/dashboard_controller_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@


class BaseDashboardController(ABC):
"""Interface to get Ray cluster metrics and control actions
"""Perform data fetching and other actions required by Dashboard
Make sure you run start_collecting_metrics function before using
get_[stats]_info methods.
TODO(sang): Write descriptions and requirements for each interface
"""

@abstractmethod
Expand Down Expand Up @@ -42,4 +44,9 @@ def get_errors(self, hostname, pid):

@abstractmethod
def start_collecting_metrics(self):
"""Start threads/processes/actors to collect metrics
NOTE: This interface should be called before using other
interface.
"""
raise NotImplementedError("Please implement this method.")
11 changes: 7 additions & 4 deletions python/ray/dashboard/dashboard_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@
type=str,
default=None,
help="Specify the path of the temporary directory use by Ray process.")
parser.add_argument(
"--hosted-dashboard-addr",
required=False,
type=str,
default=None,
help="Specify the address where user dashboard will be hosted.")
args = parser.parse_args()
ray.utils.setup_logger(args.logging_level, args.logging_format)

Expand All @@ -61,9 +67,7 @@
args.port,
args.redis_address,
args.temp_dir,
# TODO(sang): Make this value configurable
# through Ray API
hosted_dashboard_client=False,
hosted_dashboard_addr=args.hosted_dashboard_addr,
redis_password=args.redis_password)
dashboard.run()
except Exception as e:
Expand All @@ -77,4 +81,3 @@
ray.utils.push_error_to_driver_through_redis(
redis_client, ray_constants.DASHBOARD_DIED_ERROR, message)
raise e

41 changes: 34 additions & 7 deletions python/ray/dashboard/hosted_dashboard/dashboard_client.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,48 @@
import requests

from ray.dashboard.hosted_dashboard.exporter import Exporter


class DashboardClient:
"""Managing communication to hosted dashboard.
"""Managing the authentication to external services.
Args:
host: Host address of service that are used to authenticate.
port: Port of the host that are used to authenticate.
Attributes:
ingestor_url(str): Address that metrics will be exported.
exporter(Exporter): Exporter thread that keeps exporting
metrics to the external services.
"""

def __init__(self, dashboard_controller):
# TODO(sang): Remove hard coded ingestor url.
self.ingestor_url = "127.0.0.1:50051"
self.exporter = Exporter(self.ingestor_url, dashboard_controller)
def __init__(self, host, port, dashboard_controller):
self.auth_url = "http://{}:{}/auth".format(host, port)
self.timeout = 5.0

self.auth_info = self._connect()
self.exporter = Exporter(
self.auth_info.get("ingestor_url"),
self.auth_info.get("access_token"), dashboard_controller)

def _authorize(self):
resp = requests.get(self.auth_url, timeout=self.timeout)
status = resp.status_code
json_response = resp.json()
return status, json_response["ingestor_url"], json_response[
"access_token"]

def _connect(self):
status, ingestor_url, access_token = self._authorize()
if status != 200:
raise ConnectionError(
"Failed to authorize to hosted dashbaord server.")

auth_info = {
"ingestor_url": ingestor_url,
"access_token": access_token
}
return auth_info

def start_exporting_metrics(self):
"""Run an exporter thread to export metrics"""
# TODO(sang): Add a health check.
self.exporter.start()
2 changes: 2 additions & 0 deletions python/ray/dashboard/hosted_dashboard/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Exporter(threading.Thread):

def __init__(self,
export_address,
access_token,
dashboard_controller,
update_frequency=1.0):
self.dashboard_controller = dashboard_controller
Expand Down Expand Up @@ -46,6 +47,7 @@ def export_profiling_info(self, data: dict):
raise NotImplementedError("Not implemented yet.")

def run(self):
# TODO(sang): Health check.
# TODO(sang): Add error handling.
while True:
time.sleep(self.update_frequency)
Expand Down
11 changes: 6 additions & 5 deletions python/ray/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,20 +472,22 @@ def start_reporter(self):
process_info
]

def start_dashboard(self, require_webui):
def start_dashboard(self, require_webui, hosted_dashboard_addr):
"""Start the dashboard.
Args:
require_webui (bool): If true, this will raise an exception if we
fail to start the webui. Otherwise it will print a warning if
we fail to start the webui.
hosted_dashboard_addr (str): The address users host dashboard.
"""
stdout_file, stderr_file = self.new_log_files("dashboard", True)
self._webui_url, process_info = ray.services.start_dashboard(
require_webui,
self._ray_params.webui_host,
self.redis_address,
self._temp_dir,
hosted_dashboard_addr,
stdout_file=stdout_file,
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password)
Expand Down Expand Up @@ -614,10 +616,9 @@ def start_head_processes(self):

self.start_monitor()
self.start_raylet_monitor()
if self._ray_params.include_webui:
self.start_dashboard(require_webui=True)
elif self._ray_params.include_webui is None:
self.start_dashboard(require_webui=False)
self.start_dashboard(
require_webui=self._ray_params.include_webui,
hosted_dashboard_addr=self._ray_params.hosted_dashboard_addr)

def start_ray_processes(self):
"""Start all of the processes on the node."""
Expand Down
Loading

0 comments on commit 01d32c2

Please sign in to comment.