Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Monitoring tools: show more information about computational clusters #5165

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions scripts/maintenance/computational-clusters/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
.DEFAULT_GOAL := install

SHELL := /bin/bash
pcrespov marked this conversation as resolved.
Show resolved Hide resolved

install:
# creating python virtual environment
@python3 -m venv .venv
# activating python virtual environment
@source .venv/bin/activate
# installing python dependencies
@.venv/bin/pip install --upgrade pip setuptools wheel
@.venv/bin/pip install -r requirements.txt
# now you can call the maintenance scripts
# source .venv/bin/activate
# e.g. ./osparc_clusters.py PATH/TO/REPO.CONFIG --ssh-key-path=PATH/TO/SSHKEY
167 changes: 117 additions & 50 deletions scripts/maintenance/computational-clusters/osparc_clusters.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
#! /usr/bin/env python3

import asyncio
import contextlib
import datetime
import json
import re
from collections import defaultdict, namedtuple
from dataclasses import dataclass, replace
from enum import Enum
from pathlib import Path
from typing import Final
from typing import Any, Final

import arrow
import boto3
import distributed
import paramiko
import parse
import typer
Expand Down Expand Up @@ -65,6 +67,10 @@ class ComputationalCluster:
primary: ComputationalInstance
workers: list[ComputationalInstance]

scheduler_info: dict[str, Any]
datasets: tuple[str, ...]
processing_jobs: dict[str, str]


def _get_instance_name(instance) -> str:
for tag in instance.tags:
Expand Down Expand Up @@ -111,6 +117,19 @@ def _parse_computational(instance: Instance) -> ComputationalInstance | None:
dynamic_parser = parse.compile("osparc-dynamic-autoscaled-worker-{key_name}")


def _create_graylog_permalinks(
environment: dict[str, str | None], instance: Instance
) -> str:
# https://monitoring.sim4life.io/graylog/search/6552235211aee4262e7f9f21?q=source%3A%22ip-10-0-1-67%22&rangetype=relative&from=28800
source_name = instance.private_ip_address.replace(".", "-")
time_span = int(
(
arrow.utcnow().datetime - instance.launch_time + datetime.timedelta(hours=1)
).total_seconds()
)
return f"https://monitoring.{environment['MACHINE_FQDN']}/graylog/search?q=source%3A%22ip-{source_name}%22&rangetype=relative&from={time_span}"


def _parse_dynamic(instance: Instance) -> DynamicInstance | None:
name = _get_instance_name(instance)
if result := dynamic_parser.search(name):
Expand Down Expand Up @@ -213,12 +232,16 @@ def _needs_manual_intervention(
]
except (paramiko.AuthenticationException, paramiko.SSHException) as exc:
raise typer.Abort from exc
except TimeoutError:
return []
finally:
# Close the SSH connection
client.close()


def _print_dynamic_instances(instances: list[DynamicInstance]) -> None:
def _print_dynamic_instances(
instances: list[DynamicInstance], environment: dict[str, str | None]
) -> None:
time_now = arrow.utcnow()
table = Table(
"Instance",
Expand Down Expand Up @@ -252,7 +275,7 @@ def _print_dynamic_instances(instances: list[DynamicInstance]) -> None:
"ServiceName",
"ServiceVersion",
"Created Since",
Column("Need intervention"),
"Need intervention",
)
for service in instance.running_services:
service_table.add_row(
Expand All @@ -270,7 +293,7 @@ def _print_dynamic_instances(instances: list[DynamicInstance]) -> None:
f"{instance.ec2_instance.instance_type}",
instance.ec2_instance.public_ip_address,
instance.ec2_instance.private_ip_address,
instance.name,
f"{instance.name}\n{_create_graylog_permalinks(environment, instance.ec2_instance)}",
_timedelta_formatting(time_now - instance.ec2_instance.launch_time),
instance_state,
service_table,
Expand All @@ -279,7 +302,9 @@ def _print_dynamic_instances(instances: list[DynamicInstance]) -> None:
print(table, flush=True)


def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None:
def _print_computational_clusters(
clusters: list[ComputationalCluster], environment: dict[str, str | None]
) -> None:
time_now = arrow.utcnow()
table = Table(
"Instance",
Expand All @@ -291,8 +316,10 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None:
"State",
"UserID",
"WalletID",
"DaskSchedulerUI",
"Dask (UI+scheduler)",
"last heartbeat since",
"known jobs",
"processing jobs",
title="computational clusters",
)

Expand All @@ -311,13 +338,15 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None:
f"{cluster.primary.ec2_instance.instance_type}",
cluster.primary.ec2_instance.public_ip_address,
cluster.primary.ec2_instance.private_ip_address,
cluster.primary.name,
f"{cluster.primary.name}\n{_create_graylog_permalinks(environment, cluster.primary.ec2_instance)}",
_timedelta_formatting(time_now - cluster.primary.ec2_instance.launch_time),
instance_state,
f"{cluster.primary.user_id}",
f"{cluster.primary.wallet_id}",
f"http://{cluster.primary.ec2_instance.public_ip_address}:8787",
f"http://{cluster.primary.ec2_instance.public_ip_address}:8787\ntcp://{cluster.primary.ec2_instance.public_ip_address}:8786",
_timedelta_formatting(time_now - cluster.primary.last_heartbeat),
f"{len(cluster.datasets)}",
json.dumps(cluster.processing_jobs),
)
# now add the workers
for worker in cluster.workers:
Expand All @@ -332,7 +361,7 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None:
f"{worker.ec2_instance.instance_type}",
worker.ec2_instance.public_ip_address,
worker.ec2_instance.private_ip_address,
worker.name,
f"{worker.name}\n{_create_graylog_permalinks(environment, worker.ec2_instance)}",
_timedelta_formatting(
time_now - arrow.get(worker.ec2_instance.launch_time)
),
Expand All @@ -346,51 +375,67 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None:
print(table)


def _detect_instances(
instances: ServiceResourceInstancesCollection, ssh_key_path: Path | None
) -> tuple[list[DynamicInstance], list[ComputationalCluster]]:
dynamic_instances = []
computational_instances = []

for instance in track(instances, description="Detecting running instances..."):
if comp_instance := _parse_computational(instance):
computational_instances.append(comp_instance)
elif dyn_instance := _parse_dynamic(instance):
dynamic_instances.append(dyn_instance)

if ssh_key_path:
# this construction makes the retrieval much faster
all_running_services = asyncio.get_event_loop().run_until_complete(
asyncio.gather(
*(
asyncio.get_event_loop().run_in_executor(
None,
_ssh_and_list_running_dyn_services,
instance.ec2_instance,
"ubuntu",
ssh_key_path,
)
for instance in dynamic_instances
def _analyze_dynamic_instances_running_services(
dynamic_instances: list[DynamicInstance], ssh_key_path: Path
) -> list[DynamicInstance]:
# this construction makes the retrieval much faster
all_running_services = asyncio.get_event_loop().run_until_complete(
asyncio.gather(
*(
asyncio.get_event_loop().run_in_executor(
None,
_ssh_and_list_running_dyn_services,
instance.ec2_instance,
"ubuntu",
ssh_key_path,
)
for instance in dynamic_instances
)
)
)

return [
replace(
instance,
running_services=running_services,
)
for instance, running_services in zip(
dynamic_instances, all_running_services, strict=True
)
]

more_detailed_instances = [
replace(
instance,
running_services=running_services,
)
for instance, running_services in zip(
dynamic_instances, all_running_services, strict=True

def _analyze_computational_instances(
computational_instances: list[ComputationalInstance],
) -> list[ComputationalCluster]:
computational_clusters = []
for instance in track(
computational_instances, description="Collecting computational clusters data..."
):
if instance.role is InstanceRole.manager:
scheduler_info = {}
datasets_on_cluster = ()
processing_jobs = {}
with contextlib.suppress(TimeoutError, OSError):
client = distributed.Client(
f"tcp://{instance.ec2_instance.public_ip_address}:8786", timeout=5
)
scheduler_info = client.scheduler_info()
datasets_on_cluster = client.list_datasets()
processing_jobs = client.processing()

assert isinstance(datasets_on_cluster, tuple)
assert isinstance(processing_jobs, dict)
computational_clusters.append(
ComputationalCluster(
primary=instance,
workers=[],
scheduler_info=scheduler_info,
datasets=datasets_on_cluster,
processing_jobs=processing_jobs,
)
)
]
dynamic_instances = more_detailed_instances

computational_clusters = [
ComputationalCluster(primary=instance, workers=[])
for instance in computational_instances
if instance.role is InstanceRole.manager
]
for instance in computational_instances:
if instance.role is InstanceRole.worker:
# assign the worker to correct cluster
Expand All @@ -401,6 +446,28 @@ def _detect_instances(
):
cluster.workers.append(instance)

return computational_clusters


def _detect_instances(
instances: ServiceResourceInstancesCollection, ssh_key_path: Path | None
) -> tuple[list[DynamicInstance], list[ComputationalCluster]]:
dynamic_instances = []
computational_instances = []

for instance in track(instances, description="Detecting running instances..."):
if comp_instance := _parse_computational(instance):
computational_instances.append(comp_instance)
elif dyn_instance := _parse_dynamic(instance):
dynamic_instances.append(dyn_instance)

if ssh_key_path:
dynamic_instances = _analyze_dynamic_instances_running_services(
dynamic_instances, ssh_key_path
)

computational_clusters = _analyze_computational_instances(computational_instances)

return dynamic_instances, computational_clusters


Expand Down Expand Up @@ -445,8 +512,8 @@ def summary(repo_config: Path, ssh_key_path: Path | None = None) -> None:
instances, ssh_key_path
)

_print_dynamic_instances(dynamic_autoscaled_instances)
_print_computational_clusters(computational_clusters)
_print_dynamic_instances(dynamic_autoscaled_instances, environment)
_print_computational_clusters(computational_clusters, environment)


if __name__ == "__main__":
Expand Down
12 changes: 12 additions & 0 deletions scripts/maintenance/computational-clusters/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
arrow
black
boto3
dask[distributed]
mypy_boto3_ec2
types-boto3
parse
paramiko
pydantic[email]
pylint
python-dotenv
typer[all]