Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "[WIP] CPU/memory usage per func class (#31234)" (#313… #31454

Merged
merged 5 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dashboard/client/src/pages/metrics/Metrics.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ const METRICS_CONFIG = [
title: "Node Memory by Component",
path: "/d-solo/rayDefaultDashboard/default-dashboard?orgId=1&theme=light&panelId=34",
},
{
title: "Node CPU by Component",
path: "/d-solo/rayDefaultDashboard/default-dashboard?orgId=1&theme=light&panelId=37",
},
{
title: "Node GPU Memory (GRAM)",
path: "/d-solo/rayDefaultDashboard/default-dashboard?orgId=1&theme=light&panelId=18",
Expand Down
13 changes: 13 additions & 0 deletions dashboard/modules/metrics/grafana_dashboard_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,19 @@ def max_plus_pending(max_resource, pending_resource):
)
],
),
Panel(
id=37,
title="Node CPU by Component",
description="The physical (hardware) CPU usage across the cluster, broken down by component. This reports the summed CPU usage per Ray component.",
unit="cores",
targets=[
Target(
# ray_component_cpu_percentage returns a percentage that can be > 100. It means that it uses more than 1 CPU.
expr="sum(ray_component_cpu_percentage{{{global_filters}}}) by (Component) / 100",
legend="{{Component}}",
)
],
),
Panel(
id=18,
title="Node GPU Memory (GRAM)",
Expand Down
220 changes: 159 additions & 61 deletions dashboard/modules/reporter/reporter_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import psutil

from typing import List, Optional, Tuple
from collections import defaultdict

import ray
import ray._private.services
import ray._private.utils
from ray.dashboard.consts import (
GCS_RPC_TIMEOUT_SECONDS,
COMPONENT_METRICS_TAG_KEYS,
AVAILABLE_COMPONENT_NAMES_FOR_METRICS,
)
from ray.dashboard.modules.reporter.profile_manager import CpuProfilingManager
import ray.dashboard.modules.reporter.reporter_consts as reporter_consts
Expand Down Expand Up @@ -287,6 +287,12 @@ def __init__(self, dashboard_agent):
self._hostname = socket.gethostname()
# (pid, created_time) -> psutil.Process
self._workers = {}
# psutil.Process of the parent.
self._raylet_proc = None
# psutil.Process of the current process.
self._agent_proc = None
# The last reported worker proc names (e.g., ray::*).
self._latest_worker_proc_names = set()
self._network_stats_hist = [(0, (0.0, 0.0))] # time, (sent, recv)
self._disk_io_stats_hist = [
(0, (0.0, 0.0, 0, 0))
Expand Down Expand Up @@ -506,19 +512,20 @@ def _get_workers(self):
if w.status() != psutil.STATUS_ZOMBIE
]

@staticmethod
def _get_raylet_proc():
def _get_raylet_proc(self):
try:
curr_proc = psutil.Process()
# Here, parent is always raylet because the
# dashboard agent is a child of the raylet process.
parent = curr_proc.parent()
if parent is not None:
if parent.pid == 1:
if not self._raylet_proc:
curr_proc = psutil.Process()
# Here, parent is always raylet because the
# dashboard agent is a child of the raylet process.
self._raylet_proc = curr_proc.parent()

if self._raylet_proc is not None:
if self._raylet_proc.pid == 1:
return None
if parent.status() == psutil.STATUS_ZOMBIE:
if self._raylet_proc.status() == psutil.STATUS_ZOMBIE:
return None
return parent
return self._raylet_proc
except (psutil.AccessDenied, ProcessLookupError):
pass
return None
Expand All @@ -542,8 +549,9 @@ def _get_raylet(self):

def _get_agent(self):
# Current proc == agent proc
agent_proc = psutil.Process()
return agent_proc.as_dict(
if not self._agent_proc:
self._agent_proc = psutil.Process()
return self._agent_proc.as_dict(
attrs=[
"pid",
"create_time",
Expand Down Expand Up @@ -605,6 +613,137 @@ def _get_all_stats(self):
"cmdline": self._get_raylet().get("cmdline", []),
}

def _generate_reseted_stats_record(self, component_name: str) -> List[Record]:
"""Return a list of Record that will reset
the system metrics of a given component name.

Args:
component_name: a component name for a given stats.

Returns:
a list of Record instances of all values 0.
"""
tags = {"ip": self._ip, "Component": component_name}

records = []
records.append(
Record(
gauge=METRICS_GAUGES["component_cpu_percentage"],
value=0.0,
tags=tags,
)
)
records.append(
Record(
gauge=METRICS_GAUGES["component_rss_mb"],
value=0.0,
tags=tags,
)
)
records.append(
Record(
gauge=METRICS_GAUGES["component_uss_mb"],
value=0.0,
tags=tags,
)
)
return records

def _generate_system_stats_record(
self, stats: List[dict], component_name: str, pid: Optional[str] = None
) -> List[Record]:
"""Generate a list of Record class from a given component names.

Args:
stats: a list of stats dict generated by `psutil.as_dict`.
If empty, it will create the metrics of a given "component_name"
which has all 0 values.
component_name: a component name for a given stats.
pid: optionally provided pids.

Returns:
a list of Record class that will be exposed to Prometheus.
"""
total_cpu_percentage = 0.0
total_rss = 0.0
total_uss = 0.0

for stat in stats:
total_cpu_percentage += float(stat.get("cpu_percent", 0.0)) # noqa
memory_info = stat.get("memory_info")
if memory_info:
total_rss += float(stat["memory_info"].rss) / 1.0e6 # noqa
mem_full_info = stat.get("memory_full_info")
if mem_full_info is not None:
total_uss += float(mem_full_info.uss) / 1.0e6

tags = {"ip": self._ip, "Component": component_name}
if pid:
tags["pid"] = pid

records = []
records.append(
Record(
gauge=METRICS_GAUGES["component_cpu_percentage"],
value=total_cpu_percentage,
tags=tags,
)
)
records.append(
Record(
gauge=METRICS_GAUGES["component_rss_mb"],
value=total_rss,
tags=tags,
)
)
if total_uss > 0.0:
records.append(
Record(
gauge=METRICS_GAUGES["component_uss_mb"],
value=total_uss,
tags=tags,
)
)

return records

def generate_worker_stats_record(self, worker_stats: List[dict]) -> List[Record]:
"""Generate a list of Record class for worker proceses.

This API automatically sets the component_name of record as
the name of worker processes. I.e., ray::* so that we can report
per task/actor (grouped by a func/class name) resource usages.

Args:
stats: a list of stats dict generated by `psutil.as_dict`
for worker processes.
"""
# worekr cmd name (ray::*) -> stats dict.
proc_name_to_stats = defaultdict(list)
for stat in worker_stats:
cmdline = stat.get("cmdline")
# All ray processes start with ray::
if cmdline and len(cmdline) > 0 and cmdline[0].startswith("ray::"):
proc_name = cmdline[0]
proc_name_to_stats[proc_name].append(stat)
# We will lose worker stats that don't follow the ray worker proc
# naming convention. Theoretically, there should be no data loss here
# because all worker processes are renamed to ray::.

records = []
for proc_name, stats in proc_name_to_stats.items():
records.extend(self._generate_system_stats_record(stats, proc_name))

# Reset worker metrics that are from finished processes.
new_proc_names = set(proc_name_to_stats.keys())
stale_procs = self._latest_worker_proc_names - new_proc_names
self._latest_worker_proc_names = new_proc_names

for stale_proc_name in stale_procs:
records.extend(self._generate_reseted_stats_record(stale_proc_name))

return records

def _record_stats(self, stats, cluster_stats):
records_reported = []
ip = stats["ip"]
Expand Down Expand Up @@ -815,66 +954,25 @@ def _record_stats(self, stats, cluster_stats):
Record system stats.
"""

def record_system_stats(
stats: List[dict], component_name: str, pid: Optional[str] = None
) -> List[Record]:
assert component_name in AVAILABLE_COMPONENT_NAMES_FOR_METRICS
records = []
total_cpu_percentage = 0.0
total_rss = 0.0
total_uss = 0.0
for stat in stats:
total_cpu_percentage += float(stat["cpu_percent"]) * 100.0
total_rss += float(stat["memory_info"].rss) / 1.0e6
mem_full_info = stat.get("memory_full_info")
if mem_full_info is not None:
total_uss += float(mem_full_info.uss) / 1.0e6

tags = {"ip": ip, "Component": component_name}
if pid:
tags["pid"] = pid

records.append(
Record(
gauge=METRICS_GAUGES["component_cpu_percentage"],
value=total_cpu_percentage,
tags=tags,
)
)
records.append(
Record(
gauge=METRICS_GAUGES["component_rss_mb"],
value=total_rss,
tags=tags,
)
)
if total_uss > 0.0:
records.append(
Record(
gauge=METRICS_GAUGES["component_uss_mb"],
value=total_uss,
tags=tags,
)
)

return records

# Record component metrics.
raylet_stats = stats["raylet"]
if raylet_stats:
raylet_pid = str(raylet_stats["pid"])
records_reported.extend(
record_system_stats([raylet_stats], "raylet", pid=raylet_pid)
self._generate_system_stats_record(
[raylet_stats], "raylet", pid=raylet_pid
)
)
workers_stats = stats["workers"]
if workers_stats:
# TODO(sang): Maybe we can report per worker memory usage.
records_reported.extend(record_system_stats(workers_stats, "workers"))
records_reported.extend(self.generate_worker_stats_record(workers_stats))
agent_stats = stats["agent"]
if agent_stats:
agent_pid = str(agent_stats["pid"])
records_reported.extend(
record_system_stats([agent_stats], "agent", pid=agent_pid)
self._generate_system_stats_record(
[agent_stats], "agent", pid=agent_pid
)
)

# TODO(sang): Record GCS metrics.
Expand Down
Loading