From f4a99e8bcdb78af83df81488e7e30916aa92fc56 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Wed, 13 Dec 2023 07:46:07 +0100 Subject: [PATCH 1/6] added easy install means --- .../maintenance/computational-clusters/Makefile | 14 ++++++++++++++ .../computational-clusters/requirements.txt | 10 ++++++++++ 2 files changed, 24 insertions(+) create mode 100644 scripts/maintenance/computational-clusters/Makefile create mode 100644 scripts/maintenance/computational-clusters/requirements.txt diff --git a/scripts/maintenance/computational-clusters/Makefile b/scripts/maintenance/computational-clusters/Makefile new file mode 100644 index 00000000000..d8a574d5208 --- /dev/null +++ b/scripts/maintenance/computational-clusters/Makefile @@ -0,0 +1,14 @@ +.DEFAULT_GOAL := install + +SHELL := /bin/bash + +install: + # creating python virtual environment + @python3 -m venv .venv + # activating python virtual environment + @source .venv/bin/activate + # installing python dependencies + @.venv/bin/pip install --upgrade pip setuptools wheel + @.venv/bin/pip install -r requirements.txt + # now you can call the maintenance scripts + # e.g. ./osparc_clusters.py PATH/TO/REPO.CONFIG --ssh-key-path=PATH/TO/SSHKEY diff --git a/scripts/maintenance/computational-clusters/requirements.txt b/scripts/maintenance/computational-clusters/requirements.txt new file mode 100644 index 00000000000..551d1d844a2 --- /dev/null +++ b/scripts/maintenance/computational-clusters/requirements.txt @@ -0,0 +1,10 @@ +black +boto3 +dask[distributed] +types-boto3 +parse +paramiko +pydantic[email] +pylint +python-dotenv +typer[all] From d72ad57ee585fadadfe79de9e27f5d852501f6e0 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Wed, 13 Dec 2023 08:36:07 +0100 Subject: [PATCH 2/6] refactor --- scripts/maintenance/computational-clusters/Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/maintenance/computational-clusters/Makefile b/scripts/maintenance/computational-clusters/Makefile index d8a574d5208..fbe4822c50f 100644 --- a/scripts/maintenance/computational-clusters/Makefile +++ b/scripts/maintenance/computational-clusters/Makefile @@ -11,4 +11,5 @@ install: @.venv/bin/pip install --upgrade pip setuptools wheel @.venv/bin/pip install -r requirements.txt # now you can call the maintenance scripts + # source .venv/bin/activate # e.g. ./osparc_clusters.py PATH/TO/REPO.CONFIG --ssh-key-path=PATH/TO/SSHKEY From 0100c0fd759dc89da618ce7efa1d09000b4e692c Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Wed, 13 Dec 2023 08:36:21 +0100 Subject: [PATCH 3/6] add missing dependencies --- scripts/maintenance/computational-clusters/requirements.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/maintenance/computational-clusters/requirements.txt b/scripts/maintenance/computational-clusters/requirements.txt index 551d1d844a2..40871756899 100644 --- a/scripts/maintenance/computational-clusters/requirements.txt +++ b/scripts/maintenance/computational-clusters/requirements.txt @@ -1,6 +1,8 @@ +arrow black boto3 dask[distributed] +mypy_boto3_ec2 types-boto3 parse paramiko From 26018d0295cc29a10723440f99e40b439b733609 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Wed, 13 Dec 2023 08:36:34 +0100 Subject: [PATCH 4/6] now prints dask information as well --- .../computational-clusters/osparc_clusters.py | 134 ++++++++++++------ 1 file changed, 92 insertions(+), 42 deletions(-) diff --git a/scripts/maintenance/computational-clusters/osparc_clusters.py b/scripts/maintenance/computational-clusters/osparc_clusters.py index 8ab772b447c..e0d7b530d08 100755 --- a/scripts/maintenance/computational-clusters/osparc_clusters.py +++ b/scripts/maintenance/computational-clusters/osparc_clusters.py @@ -1,6 +1,7 @@ #! /usr/bin/env python3 import asyncio +import contextlib import datetime import json import re @@ -8,10 +9,11 @@ from dataclasses import dataclass, replace from enum import Enum from pathlib import Path -from typing import Final +from typing import Any, Final import arrow import boto3 +import distributed import paramiko import parse import typer @@ -65,6 +67,10 @@ class ComputationalCluster: primary: ComputationalInstance workers: list[ComputationalInstance] + scheduler_info: dict[str, Any] + datasets: tuple[str, ...] + processing_jobs: dict[str, str] + def _get_instance_name(instance) -> str: for tag in instance.tags: @@ -213,6 +219,8 @@ def _needs_manual_intervention( ] except (paramiko.AuthenticationException, paramiko.SSHException) as exc: raise typer.Abort from exc + except TimeoutError: + return [] finally: # Close the SSH connection client.close() @@ -291,8 +299,10 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None: "State", "UserID", "WalletID", - "DaskSchedulerUI", + "Dask (UI+scheduler)", "last heartbeat since", + "known jobs", + "processing jobs", title="computational clusters", ) @@ -316,8 +326,10 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None: instance_state, f"{cluster.primary.user_id}", f"{cluster.primary.wallet_id}", - f"http://{cluster.primary.ec2_instance.public_ip_address}:8787", + f"http://{cluster.primary.ec2_instance.public_ip_address}:8787\ntcp://{cluster.primary.ec2_instance.public_ip_address}:8786", _timedelta_formatting(time_now - cluster.primary.last_heartbeat), + f"{len(cluster.datasets)}", + json.dumps(cluster.processing_jobs), ) # now add the workers for worker in cluster.workers: @@ -346,51 +358,67 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None: print(table) -def _detect_instances( - instances: ServiceResourceInstancesCollection, ssh_key_path: Path | None -) -> tuple[list[DynamicInstance], list[ComputationalCluster]]: - dynamic_instances = [] - computational_instances = [] - - for instance in track(instances, description="Detecting running instances..."): - if comp_instance := _parse_computational(instance): - computational_instances.append(comp_instance) - elif dyn_instance := _parse_dynamic(instance): - dynamic_instances.append(dyn_instance) - - if ssh_key_path: - # this construction makes the retrieval much faster - all_running_services = asyncio.get_event_loop().run_until_complete( - asyncio.gather( - *( - asyncio.get_event_loop().run_in_executor( - None, - _ssh_and_list_running_dyn_services, - instance.ec2_instance, - "ubuntu", - ssh_key_path, - ) - for instance in dynamic_instances +def _analyze_dynamic_instances_running_services( + dynamic_instances: list[DynamicInstance], ssh_key_path: Path +) -> list[DynamicInstance]: + # this construction makes the retrieval much faster + all_running_services = asyncio.get_event_loop().run_until_complete( + asyncio.gather( + *( + asyncio.get_event_loop().run_in_executor( + None, + _ssh_and_list_running_dyn_services, + instance.ec2_instance, + "ubuntu", + ssh_key_path, ) + for instance in dynamic_instances ) ) + ) - more_detailed_instances = [ - replace( - instance, - running_services=running_services, - ) - for instance, running_services in zip( - dynamic_instances, all_running_services, strict=True + return [ + replace( + instance, + running_services=running_services, + ) + for instance, running_services in zip( + dynamic_instances, all_running_services, strict=True + ) + ] + + +def _analyze_computational_instances( + computational_instances: list[ComputationalInstance], +) -> list[ComputationalCluster]: + computational_clusters = [] + for instance in track( + computational_instances, description="Collecting computational clusters data..." + ): + if instance.role is InstanceRole.manager: + scheduler_info = {} + datasets_on_cluster = () + processing_jobs = {} + with contextlib.suppress(TimeoutError, OSError): + client = distributed.Client( + f"tcp://{instance.ec2_instance.public_ip_address}:8786", timeout=5 + ) + scheduler_info = client.scheduler_info() + datasets_on_cluster = client.list_datasets() + processing_jobs = client.processing() + + assert isinstance(datasets_on_cluster, tuple) + assert isinstance(processing_jobs, dict) + computational_clusters.append( + ComputationalCluster( + primary=instance, + workers=[], + scheduler_info=scheduler_info, + datasets=datasets_on_cluster, + processing_jobs=processing_jobs, + ) ) - ] - dynamic_instances = more_detailed_instances - computational_clusters = [ - ComputationalCluster(primary=instance, workers=[]) - for instance in computational_instances - if instance.role is InstanceRole.manager - ] for instance in computational_instances: if instance.role is InstanceRole.worker: # assign the worker to correct cluster @@ -401,6 +429,28 @@ def _detect_instances( ): cluster.workers.append(instance) + return computational_clusters + + +def _detect_instances( + instances: ServiceResourceInstancesCollection, ssh_key_path: Path | None +) -> tuple[list[DynamicInstance], list[ComputationalCluster]]: + dynamic_instances = [] + computational_instances = [] + + for instance in track(instances, description="Detecting running instances..."): + if comp_instance := _parse_computational(instance): + computational_instances.append(comp_instance) + elif dyn_instance := _parse_dynamic(instance): + dynamic_instances.append(dyn_instance) + + if ssh_key_path: + dynamic_instances = _analyze_dynamic_instances_running_services( + dynamic_instances, ssh_key_path + ) + + computational_clusters = _analyze_computational_instances(computational_instances) + return dynamic_instances, computational_clusters From be39ff2674bb289bdae2b3506bb659d2561a037d Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Wed, 13 Dec 2023 09:01:56 +0100 Subject: [PATCH 5/6] added graylog permalinks --- .../computational-clusters/osparc_clusters.py | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/scripts/maintenance/computational-clusters/osparc_clusters.py b/scripts/maintenance/computational-clusters/osparc_clusters.py index e0d7b530d08..62f0021f465 100755 --- a/scripts/maintenance/computational-clusters/osparc_clusters.py +++ b/scripts/maintenance/computational-clusters/osparc_clusters.py @@ -117,6 +117,17 @@ def _parse_computational(instance: Instance) -> ComputationalInstance | None: dynamic_parser = parse.compile("osparc-dynamic-autoscaled-worker-{key_name}") +def _create_graylog_permalinks( + environment: dict[str, str | None], instance: Instance +) -> str: + # https://monitoring.sim4life.io/graylog/search/6552235211aee4262e7f9f21?q=source%3A%22ip-10-0-1-67%22&rangetype=relative&from=28800 + source_name = instance.private_ip_address.replace(".", "-") + time_span = ( + arrow.utcnow().datetime - instance.launch_time + datetime.timedelta(hours=1) + ).total_seconds() + return f"https://monitoring.{environment['MACHINE_FQDN']}/graylog/search?q=source%3A%22ip-{source_name}%22&rangetype=relative&from={time_span}" + + def _parse_dynamic(instance: Instance) -> DynamicInstance | None: name = _get_instance_name(instance) if result := dynamic_parser.search(name): @@ -226,7 +237,9 @@ def _needs_manual_intervention( client.close() -def _print_dynamic_instances(instances: list[DynamicInstance]) -> None: +def _print_dynamic_instances( + instances: list[DynamicInstance], environment: dict[str, str | None] +) -> None: time_now = arrow.utcnow() table = Table( "Instance", @@ -260,7 +273,7 @@ def _print_dynamic_instances(instances: list[DynamicInstance]) -> None: "ServiceName", "ServiceVersion", "Created Since", - Column("Need intervention"), + "Need intervention", ) for service in instance.running_services: service_table.add_row( @@ -278,7 +291,7 @@ def _print_dynamic_instances(instances: list[DynamicInstance]) -> None: f"{instance.ec2_instance.instance_type}", instance.ec2_instance.public_ip_address, instance.ec2_instance.private_ip_address, - instance.name, + f"{instance.name}\n{_create_graylog_permalinks(environment, instance.ec2_instance)}", _timedelta_formatting(time_now - instance.ec2_instance.launch_time), instance_state, service_table, @@ -287,7 +300,9 @@ def _print_dynamic_instances(instances: list[DynamicInstance]) -> None: print(table, flush=True) -def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None: +def _print_computational_clusters( + clusters: list[ComputationalCluster], environment: dict[str, str | None] +) -> None: time_now = arrow.utcnow() table = Table( "Instance", @@ -321,7 +336,7 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None: f"{cluster.primary.ec2_instance.instance_type}", cluster.primary.ec2_instance.public_ip_address, cluster.primary.ec2_instance.private_ip_address, - cluster.primary.name, + f"{cluster.primary.name}\n{_create_graylog_permalinks(environment, cluster.primary.ec2_instance)}", _timedelta_formatting(time_now - cluster.primary.ec2_instance.launch_time), instance_state, f"{cluster.primary.user_id}", @@ -344,7 +359,7 @@ def _print_computational_clusters(clusters: list[ComputationalCluster]) -> None: f"{worker.ec2_instance.instance_type}", worker.ec2_instance.public_ip_address, worker.ec2_instance.private_ip_address, - worker.name, + f"{worker.name}\n{_create_graylog_permalinks(environment, worker.ec2_instance)}", _timedelta_formatting( time_now - arrow.get(worker.ec2_instance.launch_time) ), @@ -495,8 +510,8 @@ def summary(repo_config: Path, ssh_key_path: Path | None = None) -> None: instances, ssh_key_path ) - _print_dynamic_instances(dynamic_autoscaled_instances) - _print_computational_clusters(computational_clusters) + _print_dynamic_instances(dynamic_autoscaled_instances, environment) + _print_computational_clusters(computational_clusters, environment) if __name__ == "__main__": From 394dce09da08d21bd0abc0e7ac7e573833368c9a Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Wed, 13 Dec 2023 09:24:53 +0100 Subject: [PATCH 6/6] no need for float --- .../maintenance/computational-clusters/osparc_clusters.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/scripts/maintenance/computational-clusters/osparc_clusters.py b/scripts/maintenance/computational-clusters/osparc_clusters.py index 62f0021f465..21a89403975 100755 --- a/scripts/maintenance/computational-clusters/osparc_clusters.py +++ b/scripts/maintenance/computational-clusters/osparc_clusters.py @@ -122,9 +122,11 @@ def _create_graylog_permalinks( ) -> str: # https://monitoring.sim4life.io/graylog/search/6552235211aee4262e7f9f21?q=source%3A%22ip-10-0-1-67%22&rangetype=relative&from=28800 source_name = instance.private_ip_address.replace(".", "-") - time_span = ( - arrow.utcnow().datetime - instance.launch_time + datetime.timedelta(hours=1) - ).total_seconds() + time_span = int( + ( + arrow.utcnow().datetime - instance.launch_time + datetime.timedelta(hours=1) + ).total_seconds() + ) return f"https://monitoring.{environment['MACHINE_FQDN']}/graylog/search?q=source%3A%22ip-{source_name}%22&rangetype=relative&from={time_span}"