Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨Maintenance: add dask security to clusters maintenance script #5288

Merged
merged 4 commits into from
Feb 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 123 additions & 35 deletions scripts/maintenance/computational-clusters/osparc_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dotenv import dotenv_values
from mypy_boto3_ec2 import EC2ServiceResource
from mypy_boto3_ec2.service_resource import Instance, ServiceResourceInstancesCollection
from pydantic import ByteSize, TypeAdapter
from rich import print # pylint: disable=redefined-builtin
from rich.progress import track
from rich.table import Column, Style, Table
Expand Down Expand Up @@ -61,6 +62,7 @@ class DynamicService:
@dataclass(frozen=True, slots=True, kw_only=True)
class DynamicInstance(AutoscaledInstance):
running_services: list[DynamicService]
disk_space: ByteSize


TaskId: TypeAlias = str
Expand All @@ -69,6 +71,12 @@ class DynamicInstance(AutoscaledInstance):
MINUTE: Final[int] = 60
HOUR: Final[int] = 60 * MINUTE

COMPUTATIONAL_INSTANCE_NAME_PARSER: Final[parse.Parser] = parse.compile(
r"osparc-computational-cluster-{role}-{swarm_stack_name}-user_id:{user_id:d}-wallet_id:{wallet_id:d}"
)

DEPLOY_SSH_KEY_PARSER: Final[parse.Parser] = parse.compile(r"osparc-{random_name}.pem")


@dataclass(frozen=True, slots=True, kw_only=True)
class ComputationalCluster:
Expand Down Expand Up @@ -107,14 +115,9 @@ def _timedelta_formatting(
return formatted_time_diff


computational_parser = parse.compile(
"osparc-computational-cluster-{role}-{swarm_stack_name}-user_id:{user_id:d}-wallet_id:{wallet_id:d}"
)


def _parse_computational(instance: Instance) -> ComputationalInstance | None:
name = _get_instance_name(instance)
if result := computational_parser.search(name):
if result := COMPUTATIONAL_INSTANCE_NAME_PARSER.search(name):
assert isinstance(result, parse.Result)
last_heartbeat = _get_last_heartbeat(instance)
return ComputationalInstance(
Expand Down Expand Up @@ -142,12 +145,20 @@ def _create_graylog_permalinks(
return f"https://monitoring.{environment['MACHINE_FQDN']}/graylog/search?q=source%3A%22ip-{source_name}%22&rangetype=relative&from={time_span}"


UNDEFINED_BYTESIZE: Final[ByteSize] = ByteSize(-1)


def _parse_dynamic(instance: Instance) -> DynamicInstance | None:
name = _get_instance_name(instance)
if result := state["dynamic_parser"].search(name):
assert isinstance(result, parse.Result)

return DynamicInstance(name=name, ec2_instance=instance, running_services=[])
return DynamicInstance(
name=name,
ec2_instance=instance,
running_services=[],
disk_space=UNDEFINED_BYTESIZE,
)
return None


Expand All @@ -170,6 +181,45 @@ def _parse_dynamic(instance: Instance) -> DynamicInstance | None:
)


def _ssh_and_get_available_disk_space(
instance: Instance, username: str, private_key_path: Path
) -> ByteSize:
# Establish SSH connection with key-based authentication
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(
instance.public_ip_address,
username=username,
key_filename=f"{private_key_path}",
timeout=5,
)
# Command to get disk space for /docker partition
disk_space_command = "df --block-size=1 /docker | awk 'NR==2{print $4}'"

# Run the command on the remote machine
_stdin, stdout, stderr = client.exec_command(disk_space_command)
error = stderr.read().decode()

if error:
print(error)
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
raise typer.Abort(error)

# Available disk space will be captured here
available_space = stdout.read().decode("utf-8").strip()
return ByteSize(available_space)
except (
paramiko.AuthenticationException,
paramiko.SSHException,
TimeoutError,
):
return ByteSize(0)

finally:
# Close the SSH connection
client.close()


def _ssh_and_list_running_dyn_services(
instance: Instance, username: str, private_key_path: Path
) -> list[DynamicService]:
Expand Down Expand Up @@ -263,7 +313,7 @@ def _print_dynamic_instances(
Column("Links", overflow="fold"),
Column(
"Running services",
footer="[red](need ssh access) - Intervention detection might show false positive if in transient state, be careful and always double-check!![/red]",
footer="[red]Intervention detection might show false positive if in transient state, be careful and always double-check!![/red]",
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
),
title="dynamic autoscaled instances",
show_footer=True,
Expand All @@ -273,11 +323,6 @@ def _print_dynamic_instances(
for instance in track(
instances, description="Preparing dynamic autoscaled instances details..."
):
instance_state = (
f"[green]{instance.ec2_instance.state['Name']}[/green]"
if instance.ec2_instance.state["Name"] == "running"
else f"[yellow]{instance.ec2_instance.state['Name']}[/yellow]"
)
service_table = "[i]n/a[/i]"
if instance.running_services:
service_table = Table(
Expand Down Expand Up @@ -309,6 +354,7 @@ def _print_dynamic_instances(
f"Up: {_timedelta_formatting(time_now - instance.ec2_instance.launch_time, color_code=True)}",
f"ExtIP: {instance.ec2_instance.public_ip_address}",
f"IntIP: {instance.ec2_instance.private_ip_address}",
f"/docker(free): {_color_encode_with_threshold(instance.disk_space.human_readable(), instance.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
]
),
f"Graylog: {_create_graylog_permalinks(environment, instance.ec2_instance)}",
Expand Down Expand Up @@ -336,6 +382,13 @@ def _color_encode_with_state(string: str, ec2_instance: Instance) -> str:
)


DANGER = "[red]{}[/red]"


def _color_encode_with_threshold(string: str, value, threshold) -> str:
return string if value > threshold else DANGER.format(string)


def _print_computational_clusters(
clusters: list[ComputationalCluster], environment: dict[str, str | None]
) -> None:
Expand Down Expand Up @@ -429,13 +482,25 @@ def _analyze_dynamic_instances_running_services(
)
)

return [
replace(
instance,
running_services=running_services,
all_disk_spaces = asyncio.get_event_loop().run_until_complete(
asyncio.gather(
*(
asyncio.get_event_loop().run_in_executor(
None,
_ssh_and_get_available_disk_space,
instance.ec2_instance,
"ubuntu",
ssh_key_path,
)
for instance in dynamic_instances
)
)
for instance, running_services in zip(
dynamic_instances, all_running_services, strict=True
)

return [
replace(instance, running_services=running_services, disk_space=disk_space)
for instance, running_services, disk_space in zip(
dynamic_instances, all_running_services, all_disk_spaces, strict=True
)
if (user_id is None or any(s.user_id == user_id for s in running_services))
]
Expand All @@ -458,6 +523,21 @@ def _list_tasks(
return list_of_tasks


def _dask_client(ip_address: str) -> distributed.Client:
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
security = distributed.Security()
dask_certificates = state["deploy_config"] / "dask-certificates"
if dask_certificates.exists():
security = distributed.Security(
tls_ca_file=f"{dask_certificates / 'dask-cert.pem'}",
tls_client_cert=f"{dask_certificates / 'dask-cert.pem'}",
tls_client_key=f"{dask_certificates / 'dask-key.pem'}",
require_encryption=True,
)
return distributed.Client(
f"tls://{ip_address}:8786", security=security, timeout="5"
)


def _analyze_computational_instances(
computational_instances: list[ComputationalInstance],
) -> list[ComputationalCluster]:
Expand All @@ -471,9 +551,8 @@ def _analyze_computational_instances(
processing_jobs = {}
unrunnable_tasks = {}
with contextlib.suppress(TimeoutError, OSError):
client = distributed.Client(
f"tcp://{instance.ec2_instance.public_ip_address}:8786", timeout="5"
)
client = _dask_client(instance.ec2_instance.public_ip_address)

scheduler_info = client.scheduler_info()
datasets_on_cluster = client.list_datasets()
processing_jobs = client.processing()
Expand Down Expand Up @@ -513,7 +592,6 @@ def _detect_instances(
) -> tuple[list[DynamicInstance], list[ComputationalCluster]]:
dynamic_instances = []
computational_instances = []

for instance in track(instances, description="Detecting running instances..."):
if comp_instance := _parse_computational(instance):
if (user_id is None or comp_instance.user_id == user_id) and (
Expand All @@ -537,16 +615,22 @@ def _detect_instances(
"environment": {},
"ec2_resource": None,
"dynamic_parser": parse.compile("osparc-dynamic-autoscaled-worker-{key_name}"),
"ssh_key_path": None,
}


@app.callback()
def main(
repo_config: Annotated[Path, typer.Option(help="path to the repo.config file")],
ssh_key_path: Annotated[Path, typer.Option(help="path to the repo ssh key")] = None, # type: ignore
deploy_config: Annotated[
Path, typer.Option(help="path to the deploy configuration")
]
):
"""Manages external clusters"""

state["deploy_config"] = deploy_config.expanduser()
assert deploy_config.is_dir()
# get the repo.config file
repo_config = deploy_config / "repo.config"
assert repo_config.exists()
environment = dotenv_values(repo_config)
assert environment
state["environment"] = environment
Expand All @@ -565,13 +649,22 @@ def main(
f"{environment['EC2_INSTANCES_NAME_PREFIX']}-{{key_name}}"
)

state["ssh_key_path"] = ssh_key_path.expanduser() if ssh_key_path else None
# find ssh key path
for file_path in deploy_config.glob("**/*.pem"):
# very bad HACK
if "sim4life.io" in f"{file_path}" and "openssh" not in f"{file_path}":
continue

if DEPLOY_SSH_KEY_PARSER.parse(f"{file_path.name}") is not None:
print(f"found following ssh_key_path: {file_path}")
state["ssh_key_path"] = file_path
break


@app.command()
def summary(
user_id: Annotated[int, typer.Option(help="the user ID")] = None, # type: ignore
wallet_id: Annotated[int, typer.Option(help="the wallet ID")] = None, # type: ignore
user_id: Annotated[int, typer.Option(help="the user ID")] = None,
wallet_id: Annotated[int, typer.Option(help="the wallet ID")] = None,
) -> None:
"""Show a summary of the current situation of autoscaled EC2 instances.

Expand All @@ -581,8 +674,6 @@ def summary(
Arguments:
repo_config -- path that shall point to a repo.config type of file (see osparc-ops-deployment-configuration repository)

Keyword Arguments:
ssh_key_path -- the ssh key that corresponds to above deployment to ssh into the autoscaled machines (e.g. ) (default: {None})
"""

# get all the running instances
Expand Down Expand Up @@ -640,10 +731,7 @@ def clear_jobs(
if typer.confirm("Are you sure you want to erase all the jobs from that cluster?"):
print("proceeding with reseting jobs from cluster...")
the_cluster = computational_clusters[0]
with distributed.Client(
f"tcp://{the_cluster.primary.ec2_instance.public_ip_address}:8786",
timeout="5",
) as client:
with _dask_client(the_cluster.primary.ec2_instance.public_ip_address) as client:
client.datasets.clear()
print("proceeding with reseting jobs from cluster done.")
print("Refreshing...")
Expand Down