From 6baa4b5861b349c694363e7b167bc43160db11ed Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Thu, 14 Dec 2023 17:42:14 +0100 Subject: [PATCH 01/13] Extract AmazonSageMakerBaseExecutor --- .../backends/amazon_sagemaker_base.py | 697 +++++++++++++++++ .../backends/amazon_sagemaker_batch.py | 731 ++---------------- .../backends/amazon_sagemaker_training.py | 43 ++ .../components/backends/base.py | 7 +- .../components/backends/docker.py | 6 +- app/grandchallenge/components/tasks.py | 3 +- .../test_amazon_sagemaker_batch_backend.py | 14 +- 7 files changed, 824 insertions(+), 677 deletions(-) create mode 100644 app/grandchallenge/components/backends/amazon_sagemaker_base.py create mode 100644 app/grandchallenge/components/backends/amazon_sagemaker_training.py diff --git a/app/grandchallenge/components/backends/amazon_sagemaker_base.py b/app/grandchallenge/components/backends/amazon_sagemaker_base.py new file mode 100644 index 0000000000..48d875a847 --- /dev/null +++ b/app/grandchallenge/components/backends/amazon_sagemaker_base.py @@ -0,0 +1,697 @@ +import io +import json +import logging +import re +from abc import ABC, abstractmethod +from datetime import timedelta +from json import JSONDecodeError +from typing import NamedTuple + +import boto3 +import botocore +from django.conf import settings +from django.db.models import TextChoices +from django.utils._os import safe_join +from django.utils.functional import cached_property + +from grandchallenge.components.backends.base import Executor, JobParams +from grandchallenge.components.backends.exceptions import ( + ComponentException, + RetryStep, + RetryTask, + TaskCancelled, +) +from grandchallenge.components.backends.utils import ( + LOGLINES, + SourceChoices, + ms_timestamp_to_datetime, + parse_structured_log, + user_error, +) +from grandchallenge.components.models import GPUTypeChoices +from grandchallenge.evaluation.utils import get + +logger = logging.getLogger(__name__) + +UUID4_REGEX = ( + r"[0-9a-f]{8}\-[0-9a-f]{4}\-4[0-9a-f]{3}\-[89ab][0-9a-f]{3}\-[0-9a-f]{12}" +) + + +class LogStreamNotFound(Exception): + """Raised when a log stream could not be found""" + + +class InstanceType(NamedTuple): + name: str + cpu: int + memory: float + usd_cents_per_hour: int + gpus: int = 0 + gpu_type: GPUTypeChoices | None = None + + +INSTANCE_OPTIONS = [ + # Instance types and pricing from eu-west-1, retrieved 06-JUN-2022 + # https://aws.amazon.com/sagemaker/pricing/ + InstanceType( + name="ml.m5.large", + cpu=2, + memory=8, + usd_cents_per_hour=13, + ), + InstanceType( + name="ml.m5.xlarge", + cpu=4, + memory=16, + usd_cents_per_hour=26, + ), + InstanceType( + name="ml.m5.2xlarge", + cpu=8, + memory=32, + usd_cents_per_hour=51, + ), + InstanceType( + name="ml.m5.4xlarge", + cpu=16, + memory=64, + usd_cents_per_hour=103, + ), + InstanceType( + name="ml.m5.12xlarge", + cpu=48, + memory=192, + usd_cents_per_hour=308, + ), + InstanceType( + name="ml.m5.24xlarge", + cpu=96, + memory=384, + usd_cents_per_hour=616, + ), + InstanceType( + name="ml.m4.xlarge", + cpu=4, + memory=16, + usd_cents_per_hour=27, + ), + InstanceType( + name="ml.m4.2xlarge", + cpu=8, + memory=32, + usd_cents_per_hour=53, + ), + InstanceType( + name="ml.m4.4xlarge", + cpu=16, + memory=64, + usd_cents_per_hour=107, + ), + InstanceType( + name="ml.m4.10xlarge", + cpu=40, + memory=160, + usd_cents_per_hour=266, + ), + InstanceType( + name="ml.m4.16xlarge", + cpu=64, + memory=256, + usd_cents_per_hour=426, + ), + InstanceType( + name="ml.c5.xlarge", + cpu=4, + memory=8, + usd_cents_per_hour=23, + ), + InstanceType( + name="ml.c5.2xlarge", + cpu=8, + memory=16, + usd_cents_per_hour=46, + ), + InstanceType( + name="ml.c5.4xlarge", + cpu=16, + memory=32, + usd_cents_per_hour=92, + ), + InstanceType( + name="ml.c5.9xlarge", + cpu=36, + memory=72, + usd_cents_per_hour=207, + ), + InstanceType( + name="ml.c5.18xlarge", + cpu=72, + memory=144, + usd_cents_per_hour=415, + ), + InstanceType( + name="ml.c4.xlarge", + cpu=4, + memory=7.5, + usd_cents_per_hour=27, + ), + InstanceType( + name="ml.c4.2xlarge", + cpu=8, + memory=15, + usd_cents_per_hour=54, + ), + InstanceType( + name="ml.c4.4xlarge", + cpu=16, + memory=30, + usd_cents_per_hour=109, + ), + InstanceType( + name="ml.c4.8xlarge", + cpu=36, + memory=60, + usd_cents_per_hour=217, + ), + InstanceType( + name="ml.p3.2xlarge", + cpu=8, + memory=61, + usd_cents_per_hour=413, + gpus=1, + gpu_type=GPUTypeChoices.V100, + ), + InstanceType( + name="ml.p3.8xlarge", + cpu=32, + memory=244, + usd_cents_per_hour=1586, + gpus=4, + gpu_type=GPUTypeChoices.V100, + ), + InstanceType( + name="ml.p3.16xlarge", + cpu=64, + memory=488, + usd_cents_per_hour=3041, + gpus=8, + gpu_type=GPUTypeChoices.V100, + ), + InstanceType( + name="ml.p2.xlarge", + cpu=4, + memory=61, + usd_cents_per_hour=122, + gpus=1, + gpu_type=GPUTypeChoices.K80, + ), + InstanceType( + name="ml.p2.8xlarge", + cpu=32, + memory=488, + usd_cents_per_hour=933, + gpus=8, + gpu_type=GPUTypeChoices.K80, + ), + InstanceType( + name="ml.p2.16xlarge", + cpu=64, + memory=732, + usd_cents_per_hour=1789, + gpus=16, + gpu_type=GPUTypeChoices.K80, + ), + InstanceType( + name="ml.g4dn.xlarge", + cpu=4, + memory=16, + usd_cents_per_hour=82, + gpus=1, + gpu_type=GPUTypeChoices.T4, + ), + InstanceType( + name="ml.g4dn.2xlarge", + cpu=8, + memory=32, + usd_cents_per_hour=105, + gpus=1, + gpu_type=GPUTypeChoices.T4, + ), + InstanceType( + name="ml.g4dn.4xlarge", + cpu=16, + memory=64, + usd_cents_per_hour=168, + gpus=1, + gpu_type=GPUTypeChoices.T4, + ), + InstanceType( + name="ml.g4dn.12xlarge", + cpu=48, + memory=192, + usd_cents_per_hour=545, + gpus=4, + gpu_type=GPUTypeChoices.T4, + ), + InstanceType( + name="ml.g4dn.16xlarge", + cpu=64, + memory=256, + usd_cents_per_hour=607, + gpus=1, + gpu_type=GPUTypeChoices.T4, + ), +] + + +class ModelChoices(TextChoices): + # The values must be short + # The labels must be in the form "-" + ALGORITHMS_JOB = "A", "algorithms-job" + EVALUATION_EVALUATION = "E", "evaluation-evaluation" + + +class AmazonSageMakerBaseExecutor(Executor, ABC): + IS_EVENT_DRIVEN = True + + @property + @abstractmethod + def _log_group_name(self): + pass + + @abstractmethod + def _get_job_status(self, *, event): + pass + + @abstractmethod + def _get_start_time(self, *, event): + pass + + @abstractmethod + def _get_end_time(self, *, event): + pass + + @abstractmethod + def _get_instance_name(self, *, event): + pass + + @abstractmethod + def _create_job_boto(self): + pass + + @abstractmethod + def _stop_job_boto(self): + pass + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.__duration = None + self.__runtime_metrics = {} + + self.__sagemaker_client = None + self.__logs_client = None + self.__cloudwatch_client = None + + @staticmethod + def get_job_params(*, job_name): + prefix_regex = re.escape(settings.COMPONENTS_REGISTRY_PREFIX) + model_regex = r"|".join(ModelChoices.values) + pattern = rf"^{prefix_regex}\-(?P{model_regex})\-(?P{UUID4_REGEX})\-(?P\d{{2}})$" + + result = re.match(pattern, job_name) + + if result is None: + raise ValueError("Invalid job name") + else: + job_model = ModelChoices(result.group("job_model")).label + job_app_label, job_model_name = job_model.split("-") + job_pk = result.group("job_pk") + attempt = int(result.group("attempt")) + return JobParams( + app_label=job_app_label, + model_name=job_model_name, + pk=job_pk, + attempt=attempt, + ) + + @property + def _sagemaker_client(self): + if self.__sagemaker_client is None: + self.__sagemaker_client = boto3.client( + "sagemaker", + region_name=settings.COMPONENTS_AMAZON_ECR_REGION, + ) + return self.__sagemaker_client + + @property + def _logs_client(self): + if self.__logs_client is None: + self.__logs_client = boto3.client( + "logs", region_name=settings.COMPONENTS_AMAZON_ECR_REGION + ) + return self.__logs_client + + @property + def _cloudwatch_client(self): + if self.__cloudwatch_client is None: + self.__cloudwatch_client = boto3.client( + "cloudwatch", + region_name=settings.COMPONENTS_AMAZON_ECR_REGION, + ) + return self.__cloudwatch_client + + @property + def duration(self): + return self.__duration + + @property + def runtime_metrics(self): + return self.__runtime_metrics + + @property + def _invocation_prefix(self): + return safe_join("/invocations", *self.job_path_parts) + + @property + def _invocation_key(self): + return safe_join(self._invocation_prefix, "invocation.json") + + @property + def _result_key(self): + return safe_join( + self._io_prefix, ".sagemaker_shim", "inference_result.json" + ) + + @property + def _sagemaker_job_name(self): + # SageMaker requires job names to be less than 63 chars + job_name = f"{settings.COMPONENTS_REGISTRY_PREFIX}-{self._job_id}" + + for value, label in ModelChoices.choices: + job_name = job_name.replace(label, value) + + return job_name + + @cached_property + def _instance_type(self): + """Find the cheapest instance that can run this job""" + + if self._requires_gpu: + # For now only use a single gpu + n_gpu = 1 + gpu_type = self._desired_gpu_type + else: + n_gpu = 0 + gpu_type = None + + compatible_instances = [ + instance + for instance in INSTANCE_OPTIONS + if instance.gpus == n_gpu + and instance.gpu_type == gpu_type + and instance.memory >= self._memory_limit + ] + + if not compatible_instances: + raise ValueError("No suitable instance types for job") + + # Get the lowest priced instance + compatible_instances.sort(key=lambda x: x.usd_cents_per_hour) + return compatible_instances[0] + + @property + def usd_cents_per_hour(self): + return self._instance_type.usd_cents_per_hour + + def execute(self, *, input_civs, input_prefixes): + self._create_invocation_json( + input_civs=input_civs, input_prefixes=input_prefixes + ) + self._create_sagemaker_job() + + def handle_event(self, *, event): + job_status = self._get_job_status(event=event) + + if job_status == "Stopped": + raise TaskCancelled + elif job_status in {"Completed", "Failed"}: + self._set_duration(event=event) + self._set_task_logs() + self._set_runtime_metrics(event=event) + if job_status == "Completed": + self._handle_completed_job() + else: + self._handle_failed_job(event=event) + else: + raise ValueError("Invalid job status") + + def deprovision(self): + self._stop_running_jobs() + + super().deprovision() + + self._delete_objects( + bucket=settings.COMPONENTS_INPUT_BUCKET_NAME, + prefix=self._invocation_prefix, + ) + + def _create_invocation_json(self, *, input_civs, input_prefixes): + f = io.BytesIO( + json.dumps( + self._get_invocation_json( + input_civs=input_civs, input_prefixes=input_prefixes + ) + ).encode("utf-8") + ) + self._s3_client.upload_fileobj( + f, settings.COMPONENTS_INPUT_BUCKET_NAME, self._invocation_key + ) + + def _create_sagemaker_job(self): + try: + self._create_job_boto() + except self._sagemaker_client.exceptions.ResourceLimitExceeded as error: + raise RetryStep("Capacity Limit Exceeded") from error + except botocore.exceptions.ClientError as error: + if error.response["Error"]["Code"] == "ThrottlingException": + raise RetryStep("Request throttled") from error + else: + raise error + + def _set_duration(self, *, event): + try: + started = ms_timestamp_to_datetime( + self._get_start_time(event=event) + ) + stopped = ms_timestamp_to_datetime(self._get_end_time(event=event)) + self.__duration = stopped - started + except TypeError: + logger.warning("Invalid start or end time, duration undetermined") + self.__duration = None + + def _get_log_stream_name(self, *, data_log=False): + response = self._logs_client.describe_log_streams( + logGroupName=self._log_group_name, + logStreamNamePrefix=f"{self._sagemaker_job_name}", + ) + + if "nextToken" in response: + raise LogStreamNotFound("Too many log streams found") + + log_streams = { + s["logStreamName"] + for s in response["logStreams"] + if s["logStreamName"].endswith("/data-log") is data_log + } + + if len(log_streams) == 1: + return log_streams.pop() + else: + raise LogStreamNotFound("Log stream not found") + + def _set_task_logs(self): + try: + log_stream_name = self._get_log_stream_name(data_log=False) + except LogStreamNotFound as error: + logger.warning(str(error)) + return + + response = self._logs_client.get_log_events( + logGroupName=self._log_group_name, + logStreamName=log_stream_name, + limit=LOGLINES, + startFromHead=False, + ) + stdout = [] + stderr = [] + + for event in response["events"]: + try: + parsed_log = parse_structured_log( + log=event["message"].replace("\x00", "") + ) + timestamp = ms_timestamp_to_datetime(event["timestamp"]) + except (JSONDecodeError, KeyError, ValueError): + logger.warning("Could not parse log") + continue + + if parsed_log is not None: + output = f"{timestamp.isoformat()} {parsed_log.message}" + if parsed_log.source == SourceChoices.STDOUT: + stdout.append(output) + elif parsed_log.source == SourceChoices.STDERR: + stderr.append(output) + else: + logger.error("Invalid source") + + self._stdout = stdout + self._stderr = stderr + + def _set_runtime_metrics(self, *, event): + try: + started = ms_timestamp_to_datetime( + self._get_start_time(event=event) + ) + stopped = ms_timestamp_to_datetime(self._get_end_time(event=event)) + except TypeError: + logger.warning("Invalid start or end time, metrics undetermined") + return + + query_id = "q" + query = f"SEARCH('{{{self._log_group_name},Host}} Host={self._sagemaker_job_name}/i-', 'Average', 60)" + + instance_type = get( + [ + instance + for instance in INSTANCE_OPTIONS + if instance.name == self._get_instance_name(event=event) + ] + ) + + response = self._cloudwatch_client.get_metric_data( + MetricDataQueries=[{"Id": query_id, "Expression": query}], + # Add buffer time to allow metrics to be delivered + StartTime=started - timedelta(minutes=1), + EndTime=stopped + timedelta(minutes=5), + ) + + if "NextToken" in response: + logger.error("Too many metrics found") + + runtime_metrics = [ + { + "label": metric["Label"], + "status": metric["StatusCode"], + "timestamps": [t.isoformat() for t in metric["Timestamps"]], + "values": metric["Values"], + } + for metric in response["MetricDataResults"] + if metric["Id"] == query_id + ] + + self.__runtime_metrics = { + "instance": { + "name": instance_type.name, + "cpu": instance_type.cpu, + "memory": instance_type.memory, + "gpus": instance_type.gpus, + "gpu_type": None + if instance_type.gpu_type is None + else instance_type.gpu_type.value, + }, + "metrics": runtime_metrics, + } + + def _get_task_return_code(self): + with io.BytesIO() as fileobj: + self._s3_client.download_fileobj( + Fileobj=fileobj, + Bucket=settings.COMPONENTS_OUTPUT_BUCKET_NAME, + Key=self._result_key, + ) + fileobj.seek(0) + + try: + result = json.loads( + fileobj.read().decode("utf-8"), + ) + except JSONDecodeError: + raise ComponentException( + "The invocation request did not return valid json" + ) + + logger.info(f"{result=}") + + if result["pk"] != self._job_id: + raise RuntimeError("Wrong result key for this job") + + try: + return int(result["return_code"]) + except (KeyError, ValueError): + raise ComponentException( + "The invocation response object is not valid" + ) + + def _handle_completed_job(self): + return_code = self._get_task_return_code() + + if return_code == 0: + # Job's a good un + return + elif return_code == 137: + raise ComponentException("The container ran out of memory.") + else: + raise ComponentException(user_error(self.stderr)) + + def _handle_failed_job(self, *, event): + failure_reason = event.get("FailureReason") + + if failure_reason == ( + "CapacityError: Unable to provision requested ML compute capacity. " + "Please retry using a different ML instance type." + ): + raise RetryTask("No current capacity for the chosen instance type") + + if failure_reason == ( + "InternalServerError: We encountered an internal error. " + "Please try again." + ): + if ( + self.get_job_params( + job_name=self.get_job_name(event=event) + ).attempt + < 3 + ): + raise RetryTask("Retrying due to internal server error") + else: + raise ComponentException( + "Algorithm container image would not start" + ) + + # Anything else needs investigation by a site administrator + raise RuntimeError(failure_reason) + + def _stop_running_jobs(self): + try: + self._stop_job_boto() + except botocore.exceptions.ClientError as error: + okay_error_messages = { + # Unstoppable job: + "The request was rejected because the transform job is in status", + # Job was never created: + "Could not find job to update with name", + } + + if error.response["Error"]["Code"] == "ThrottlingException": + raise RetryStep("Request throttled") from error + elif error.response["Error"][ + "Code" + ] == "ValidationException" and any( + okay_message in error.response["Error"]["Message"] + for okay_message in okay_error_messages + ): + logger.info(f"The job could not be stopped: {error}") + else: + raise error diff --git a/app/grandchallenge/components/backends/amazon_sagemaker_batch.py b/app/grandchallenge/components/backends/amazon_sagemaker_batch.py index 1258affa46..820cea7fcd 100644 --- a/app/grandchallenge/components/backends/amazon_sagemaker_batch.py +++ b/app/grandchallenge/components/backends/amazon_sagemaker_batch.py @@ -1,555 +1,100 @@ -import io -import json import logging -import re -from datetime import timedelta -from json import JSONDecodeError -from typing import NamedTuple -import boto3 -import botocore from django.conf import settings -from django.db.models import TextChoices -from django.utils._os import safe_join -from django.utils.functional import cached_property -from grandchallenge.components.backends.base import Executor, JobParams -from grandchallenge.components.backends.exceptions import ( - ComponentException, - RetryStep, - RetryTask, - TaskCancelled, +from grandchallenge.components.backends.amazon_sagemaker_base import ( + AmazonSageMakerBaseExecutor, + LogStreamNotFound, ) +from grandchallenge.components.backends.exceptions import ComponentException from grandchallenge.components.backends.utils import ( LOGLINES, - SourceChoices, get_sagemaker_model_name, - ms_timestamp_to_datetime, - parse_structured_log, - user_error, ) -from grandchallenge.components.models import GPUTypeChoices -from grandchallenge.evaluation.utils import get logger = logging.getLogger(__name__) -UUID4_REGEX = ( - r"[0-9a-f]{8}\-[0-9a-f]{4}\-4[0-9a-f]{3}\-[89ab][0-9a-f]{3}\-[0-9a-f]{12}" -) - - -class LogStreamNotFound(Exception): - """Raised when a log stream could not be found""" - - -class InstanceType(NamedTuple): - name: str - cpu: int - memory: float - usd_cents_per_hour: int - gpus: int = 0 - gpu_type: GPUTypeChoices | None = None - - -INSTANCE_OPTIONS = [ - # Instance types and pricing from eu-west-1, retrieved 06-JUN-2022 - # https://aws.amazon.com/sagemaker/pricing/ - InstanceType( - name="ml.m5.large", - cpu=2, - memory=8, - usd_cents_per_hour=13, - ), - InstanceType( - name="ml.m5.xlarge", - cpu=4, - memory=16, - usd_cents_per_hour=26, - ), - InstanceType( - name="ml.m5.2xlarge", - cpu=8, - memory=32, - usd_cents_per_hour=51, - ), - InstanceType( - name="ml.m5.4xlarge", - cpu=16, - memory=64, - usd_cents_per_hour=103, - ), - InstanceType( - name="ml.m5.12xlarge", - cpu=48, - memory=192, - usd_cents_per_hour=308, - ), - InstanceType( - name="ml.m5.24xlarge", - cpu=96, - memory=384, - usd_cents_per_hour=616, - ), - InstanceType( - name="ml.m4.xlarge", - cpu=4, - memory=16, - usd_cents_per_hour=27, - ), - InstanceType( - name="ml.m4.2xlarge", - cpu=8, - memory=32, - usd_cents_per_hour=53, - ), - InstanceType( - name="ml.m4.4xlarge", - cpu=16, - memory=64, - usd_cents_per_hour=107, - ), - InstanceType( - name="ml.m4.10xlarge", - cpu=40, - memory=160, - usd_cents_per_hour=266, - ), - InstanceType( - name="ml.m4.16xlarge", - cpu=64, - memory=256, - usd_cents_per_hour=426, - ), - InstanceType( - name="ml.c5.xlarge", - cpu=4, - memory=8, - usd_cents_per_hour=23, - ), - InstanceType( - name="ml.c5.2xlarge", - cpu=8, - memory=16, - usd_cents_per_hour=46, - ), - InstanceType( - name="ml.c5.4xlarge", - cpu=16, - memory=32, - usd_cents_per_hour=92, - ), - InstanceType( - name="ml.c5.9xlarge", - cpu=36, - memory=72, - usd_cents_per_hour=207, - ), - InstanceType( - name="ml.c5.18xlarge", - cpu=72, - memory=144, - usd_cents_per_hour=415, - ), - InstanceType( - name="ml.c4.xlarge", - cpu=4, - memory=7.5, - usd_cents_per_hour=27, - ), - InstanceType( - name="ml.c4.2xlarge", - cpu=8, - memory=15, - usd_cents_per_hour=54, - ), - InstanceType( - name="ml.c4.4xlarge", - cpu=16, - memory=30, - usd_cents_per_hour=109, - ), - InstanceType( - name="ml.c4.8xlarge", - cpu=36, - memory=60, - usd_cents_per_hour=217, - ), - InstanceType( - name="ml.p3.2xlarge", - cpu=8, - memory=61, - usd_cents_per_hour=413, - gpus=1, - gpu_type=GPUTypeChoices.V100, - ), - InstanceType( - name="ml.p3.8xlarge", - cpu=32, - memory=244, - usd_cents_per_hour=1586, - gpus=4, - gpu_type=GPUTypeChoices.V100, - ), - InstanceType( - name="ml.p3.16xlarge", - cpu=64, - memory=488, - usd_cents_per_hour=3041, - gpus=8, - gpu_type=GPUTypeChoices.V100, - ), - InstanceType( - name="ml.p2.xlarge", - cpu=4, - memory=61, - usd_cents_per_hour=122, - gpus=1, - gpu_type=GPUTypeChoices.K80, - ), - InstanceType( - name="ml.p2.8xlarge", - cpu=32, - memory=488, - usd_cents_per_hour=933, - gpus=8, - gpu_type=GPUTypeChoices.K80, - ), - InstanceType( - name="ml.p2.16xlarge", - cpu=64, - memory=732, - usd_cents_per_hour=1789, - gpus=16, - gpu_type=GPUTypeChoices.K80, - ), - InstanceType( - name="ml.g4dn.xlarge", - cpu=4, - memory=16, - usd_cents_per_hour=82, - gpus=1, - gpu_type=GPUTypeChoices.T4, - ), - InstanceType( - name="ml.g4dn.2xlarge", - cpu=8, - memory=32, - usd_cents_per_hour=105, - gpus=1, - gpu_type=GPUTypeChoices.T4, - ), - InstanceType( - name="ml.g4dn.4xlarge", - cpu=16, - memory=64, - usd_cents_per_hour=168, - gpus=1, - gpu_type=GPUTypeChoices.T4, - ), - InstanceType( - name="ml.g4dn.12xlarge", - cpu=48, - memory=192, - usd_cents_per_hour=545, - gpus=4, - gpu_type=GPUTypeChoices.T4, - ), - InstanceType( - name="ml.g4dn.16xlarge", - cpu=64, - memory=256, - usd_cents_per_hour=607, - gpus=1, - gpu_type=GPUTypeChoices.T4, - ), -] - - -class ModelChoices(TextChoices): - # The values must be short - # The labels must be in the form "-" - ALGORITHMS_JOB = "A", "algorithms-job" - EVALUATION_EVALUATION = "E", "evaluation-evaluation" - - -class AmazonSageMakerBatchExecutor(Executor): - IS_EVENT_DRIVEN = True - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - self.__duration = None - self.__runtime_metrics = {} - - self.__sagemaker_client = None - self.__logs_client = None - self.__cloudwatch_client = None - - @staticmethod - def get_job_params(*, event): - job_name = event["TransformJobName"] - - prefix_regex = re.escape(settings.COMPONENTS_REGISTRY_PREFIX) - model_regex = r"|".join(ModelChoices.values) - pattern = rf"^{prefix_regex}\-(?P{model_regex})\-(?P{UUID4_REGEX})\-(?P\d{{2}})$" - - result = re.match(pattern, job_name) - - if result is None: - raise ValueError("Invalid job name") - else: - job_model = ModelChoices(result.group("job_model")).label - job_app_label, job_model_name = job_model.split("-") - job_pk = result.group("job_pk") - attempt = int(result.group("attempt")) - return JobParams( - app_label=job_app_label, - model_name=job_model_name, - pk=job_pk, - attempt=attempt, - ) - - @property - def _sagemaker_client(self): - if self.__sagemaker_client is None: - self.__sagemaker_client = boto3.client( - "sagemaker", - region_name=settings.COMPONENTS_AMAZON_ECR_REGION, - ) - return self.__sagemaker_client - - @property - def _logs_client(self): - if self.__logs_client is None: - self.__logs_client = boto3.client( - "logs", region_name=settings.COMPONENTS_AMAZON_ECR_REGION - ) - return self.__logs_client - - @property - def _cloudwatch_client(self): - if self.__cloudwatch_client is None: - self.__cloudwatch_client = boto3.client( - "cloudwatch", - region_name=settings.COMPONENTS_AMAZON_ECR_REGION, - ) - return self.__cloudwatch_client - - @property - def duration(self): - return self.__duration - - @property - def runtime_metrics(self): - return self.__runtime_metrics - - @property - def _invocation_prefix(self): - return safe_join("/invocations", *self.job_path_parts) - - @property - def _invocation_key(self): - return safe_join(self._invocation_prefix, "invocation.json") - - @property - def _transform_job_name(self): - # SageMaker requires job names to be less than 63 chars - job_name = f"{settings.COMPONENTS_REGISTRY_PREFIX}-{self._job_id}" - - for value, label in ModelChoices.choices: - job_name = job_name.replace(label, value) - - return job_name +class AmazonSageMakerBatchExecutor(AmazonSageMakerBaseExecutor): @property def _log_group_name(self): # Hardcoded by AWS return "/aws/sagemaker/TransformJobs" - @cached_property - def _instance_type(self): - """Find the cheapest instance that can run this job""" - - if self._requires_gpu: - # For now only use a single gpu - n_gpu = 1 - gpu_type = self._desired_gpu_type - else: - n_gpu = 0 - gpu_type = None - - compatible_instances = [ - instance - for instance in INSTANCE_OPTIONS - if instance.gpus == n_gpu - and instance.gpu_type == gpu_type - and instance.memory >= self._memory_limit - ] - - if not compatible_instances: - raise ValueError("No suitable instance types for job") - - # Get the lowest priced instance - compatible_instances.sort(key=lambda x: x.usd_cents_per_hour) - return compatible_instances[0] - - @property - def usd_cents_per_hour(self): - return self._instance_type.usd_cents_per_hour - - def execute(self, *, input_civs, input_prefixes): - self._create_invocation_json( - input_civs=input_civs, input_prefixes=input_prefixes + @staticmethod + def get_job_name(*, event): + return event["TransformJobName"] + + def _get_job_status(self, *, event): + return event["TransformJobStatus"] + + def _get_start_time(self, *, event): + return event.get("TransformStartTime") + + def _get_end_time(self, *, event): + return event.get("TransformEndTime") + + def _get_instance_name(self, *, event): + return event["TransformResources"]["InstanceType"] + + def _create_job_boto(self): + self._sagemaker_client.create_transform_job( + TransformJobName=self._sagemaker_job_name, + ModelName=get_sagemaker_model_name( + repo_tag=self._exec_image_repo_tag + ), + TransformInput={ + "DataSource": { + "S3DataSource": { + "S3DataType": "S3Prefix", + "S3Uri": f"s3://{settings.COMPONENTS_INPUT_BUCKET_NAME}/{self._invocation_key}", + } + } + }, + TransformOutput={ + "S3OutputPath": f"s3://{settings.COMPONENTS_OUTPUT_BUCKET_NAME}/{self._invocation_prefix}" + }, + TransformResources={ + "InstanceType": self._instance_type.name, + "InstanceCount": 1, + }, + Environment={ # Up to 16 pairs + "LOG_LEVEL": "INFO", + "no_proxy": "amazonaws.com", + }, + ModelClientConfig={ + "InvocationsTimeoutInSeconds": self._time_limit, + "InvocationsMaxRetries": 0, + }, ) - self._create_transform_job() - - def handle_event(self, *, event): - job_status = event["TransformJobStatus"] - if job_status == "Stopped": - raise TaskCancelled - elif job_status in {"Completed", "Failed"}: - self._set_duration(event=event) - self._set_task_logs() - self._set_runtime_metrics(event=event) - if job_status == "Completed": - self._handle_completed_job() - else: - self._handle_failed_job(event=event) - else: - raise ValueError("Invalid job status") + def _stop_job_boto(self): + self._sagemaker_client.stop_transform_job( + TransformJobName=self._sagemaker_job_name + ) def deprovision(self): - self._stop_running_jobs() - super().deprovision() - self._delete_objects( - bucket=settings.COMPONENTS_INPUT_BUCKET_NAME, - prefix=self._invocation_prefix, - ) self._delete_objects( bucket=settings.COMPONENTS_OUTPUT_BUCKET_NAME, prefix=self._invocation_prefix, ) - def _create_invocation_json(self, *, input_civs, input_prefixes): - f = io.BytesIO( - json.dumps( - self._get_invocation_json( - input_civs=input_civs, input_prefixes=input_prefixes - ) - ).encode("utf-8") - ) - self._s3_client.upload_fileobj( - f, settings.COMPONENTS_INPUT_BUCKET_NAME, self._invocation_key - ) - - def _create_transform_job(self): - try: - self._sagemaker_client.create_transform_job( - TransformJobName=self._transform_job_name, - ModelName=get_sagemaker_model_name( - repo_tag=self._exec_image_repo_tag - ), - TransformInput={ - "DataSource": { - "S3DataSource": { - "S3DataType": "S3Prefix", - "S3Uri": f"s3://{settings.COMPONENTS_INPUT_BUCKET_NAME}/{self._invocation_key}", - } - } - }, - TransformOutput={ - "S3OutputPath": f"s3://{settings.COMPONENTS_OUTPUT_BUCKET_NAME}/{self._invocation_prefix}" - }, - TransformResources={ - "InstanceType": self._instance_type.name, - "InstanceCount": 1, - }, - Environment={ # Up to 16 pairs - "LOG_LEVEL": "INFO", - "no_proxy": "amazonaws.com", - }, - ModelClientConfig={ - "InvocationsTimeoutInSeconds": self._time_limit, - "InvocationsMaxRetries": 0, - }, - ) - except self._sagemaker_client.exceptions.ResourceLimitExceeded as error: - raise RetryStep("Capacity Limit Exceeded") from error - except botocore.exceptions.ClientError as error: - if error.response["Error"]["Code"] == "ThrottlingException": - raise RetryStep("Request throttled") from error - else: - raise error - - def _set_duration(self, *, event): - try: - started = ms_timestamp_to_datetime(event.get("TransformStartTime")) - stopped = ms_timestamp_to_datetime(event.get("TransformEndTime")) - self.__duration = stopped - started - except TypeError: - logger.warning("Invalid start or end time, duration undetermined") - self.__duration = None - - def _get_log_stream_name(self, *, data_log=False): - response = self._logs_client.describe_log_streams( - logGroupName=self._log_group_name, - logStreamNamePrefix=f"{self._transform_job_name}", - ) - - if "nextToken" in response: - raise LogStreamNotFound("Too many log streams found") - - log_streams = { - s["logStreamName"] - for s in response["logStreams"] - if s["logStreamName"].endswith("/data-log") is data_log - } - - if len(log_streams) == 1: - return log_streams.pop() - else: - raise LogStreamNotFound("Log stream not found") - - def _set_task_logs(self): + def _handle_failed_job(self, *args, **kwargs): try: - log_stream_name = self._get_log_stream_name(data_log=False) + data_log = self._get_job_data_log() except LogStreamNotFound as error: logger.warning(str(error)) - return - - response = self._logs_client.get_log_events( - logGroupName=self._log_group_name, - logStreamName=log_stream_name, - limit=LOGLINES, - startFromHead=False, - ) - stdout = [] - stderr = [] - - for event in response["events"]: - try: - parsed_log = parse_structured_log( - log=event["message"].replace("\x00", "") - ) - timestamp = ms_timestamp_to_datetime(event["timestamp"]) - except (JSONDecodeError, KeyError, ValueError): - logger.warning("Could not parse log") - continue + data_log = [] - if parsed_log is not None: - output = f"{timestamp.isoformat()} {parsed_log.message}" - if parsed_log.source == SourceChoices.STDOUT: - stdout.append(output) - elif parsed_log.source == SourceChoices.STDERR: - stderr.append(output) - else: - logger.error("Invalid source") + if any( + "Model server did not respond to /invocations request within" in e + for e in data_log + ): + raise ComponentException("Time limit exceeded") - self._stdout = stdout - self._stderr = stderr + super()._handle_failed_job(*args, **kwargs) def _get_job_data_log(self): response = self._logs_client.get_log_events( @@ -559,153 +104,3 @@ def _get_job_data_log(self): startFromHead=False, ) return [event["message"] for event in response["events"]] - - def _set_runtime_metrics(self, *, event): - try: - started = ms_timestamp_to_datetime(event.get("TransformStartTime")) - stopped = ms_timestamp_to_datetime(event.get("TransformEndTime")) - except TypeError: - logger.warning("Invalid start or end time, metrics undetermined") - return - - query_id = "q" - query = f"SEARCH('{{{self._log_group_name},Host}} Host={self._transform_job_name}/i-', 'Average', 60)" - - instance_type = get( - [ - instance - for instance in INSTANCE_OPTIONS - if instance.name == event["TransformResources"]["InstanceType"] - ] - ) - - response = self._cloudwatch_client.get_metric_data( - MetricDataQueries=[{"Id": query_id, "Expression": query}], - # Add buffer time to allow metrics to be delivered - StartTime=started - timedelta(minutes=1), - EndTime=stopped + timedelta(minutes=5), - ) - - if "NextToken" in response: - logger.error("Too many metrics found") - - runtime_metrics = [ - { - "label": metric["Label"], - "status": metric["StatusCode"], - "timestamps": [t.isoformat() for t in metric["Timestamps"]], - "values": metric["Values"], - } - for metric in response["MetricDataResults"] - if metric["Id"] == query_id - ] - - self.__runtime_metrics = { - "instance": { - "name": instance_type.name, - "cpu": instance_type.cpu, - "memory": instance_type.memory, - "gpus": instance_type.gpus, - "gpu_type": None - if instance_type.gpu_type is None - else instance_type.gpu_type.value, - }, - "metrics": runtime_metrics, - } - - def _get_task_return_code(self): - with io.BytesIO() as fileobj: - self._s3_client.download_fileobj( - Fileobj=fileobj, - Bucket=settings.COMPONENTS_OUTPUT_BUCKET_NAME, - Key=f"{self._invocation_key}.out", - ) - fileobj.seek(0) - - try: - result = json.loads( - fileobj.read().decode("utf-8"), - ) - except JSONDecodeError: - raise ComponentException( - "The invocation request did not return valid json" - ) - - try: - logger.info(f"{result=}") - return int(result["return_code"]) - except (KeyError, ValueError): - raise ComponentException( - "The invocation response object is not valid" - ) - - def _handle_completed_job(self): - return_code = self._get_task_return_code() - - if return_code == 0: - # Job's a good un - return - elif return_code == 137: - raise ComponentException("The container ran out of memory.") - else: - raise ComponentException(user_error(self.stderr)) - - def _handle_failed_job(self, *, event): - failure_reason = event.get("FailureReason") - - if failure_reason == ( - "CapacityError: Unable to provision requested ML compute capacity. " - "Please retry using a different ML instance type." - ): - raise RetryTask("No current capacity for the chosen instance type") - - if failure_reason == ( - "InternalServerError: We encountered an internal error. " - "Please try again." - ): - if self.get_job_params(event=event).attempt < 3: - raise RetryTask("Retrying due to internal server error") - else: - raise ComponentException( - "Algorithm container image would not start" - ) - - try: - data_log = self._get_job_data_log() - except LogStreamNotFound as error: - logger.warning(str(error)) - data_log = [] - - if any( - "Model server did not respond to /invocations request within" in e - for e in data_log - ): - raise ComponentException("Time limit exceeded") - - # Anything else needs investigation by a site administrator - raise RuntimeError(failure_reason) - - def _stop_running_jobs(self): - try: - self._sagemaker_client.stop_transform_job( - TransformJobName=self._transform_job_name - ) - except botocore.exceptions.ClientError as error: - okay_error_messages = { - # Unstoppable job: - "The request was rejected because the transform job is in status", - # Job was never created: - "Could not find job to update with name", - } - - if error.response["Error"]["Code"] == "ThrottlingException": - raise RetryStep("Request throttled") from error - elif error.response["Error"][ - "Code" - ] == "ValidationException" and any( - okay_message in error.response["Error"]["Message"] - for okay_message in okay_error_messages - ): - logger.info(f"The job could not be stopped: {error}") - else: - raise error diff --git a/app/grandchallenge/components/backends/amazon_sagemaker_training.py b/app/grandchallenge/components/backends/amazon_sagemaker_training.py new file mode 100644 index 0000000000..c7fe265c83 --- /dev/null +++ b/app/grandchallenge/components/backends/amazon_sagemaker_training.py @@ -0,0 +1,43 @@ +from grandchallenge.components.backends.amazon_sagemaker_base import ( + AmazonSageMakerBaseExecutor, +) + + +class AmazonSageMakerTrainingExecutor(AmazonSageMakerBaseExecutor): + @property + def _log_group_name(self): + # Hardcoded by AWS + return "/aws/sagemaker/TrainingJobs" + + @staticmethod + def get_job_name(*, event): + raise NotImplementedError + + def _get_job_status(self, *, event): + raise NotImplementedError + + def _get_start_time(self, *, event): + raise NotImplementedError + + def _get_end_time(self, *, event): + raise NotImplementedError + + def _get_instance_name(self, *, event): + raise NotImplementedError + + def _create_job_boto(self): + raise NotImplementedError + + def _stop_job_boto(self): + raise NotImplementedError + + def _get_invocation_json(self, *args, **kwargs): + # SageMaker Training Jobs expect a list + invocation_json = super()._get_invocation_json(*args, **kwargs) + + if not isinstance(invocation_json, dict): + raise RuntimeError( + "Expected to receive a single invocation JSON object" + ) + + return [invocation_json] diff --git a/app/grandchallenge/components/backends/base.py b/app/grandchallenge/components/backends/base.py index 6f14503605..f4e6cd2382 100644 --- a/app/grandchallenge/components/backends/base.py +++ b/app/grandchallenge/components/backends/base.py @@ -103,7 +103,12 @@ def deprovision(self): @staticmethod @abstractmethod - def get_job_params(*, event): + def get_job_name(*, event): + pass + + @staticmethod + @abstractmethod + def get_job_params(*, job_name): ... @property diff --git a/app/grandchallenge/components/backends/docker.py b/app/grandchallenge/components/backends/docker.py index ac932810dc..87f8bc4966 100644 --- a/app/grandchallenge/components/backends/docker.py +++ b/app/grandchallenge/components/backends/docker.py @@ -73,7 +73,11 @@ def deprovision(self): docker_client.remove_container(name=self.container_name) @staticmethod - def get_job_params(*, event): + def get_job_name(*, event): + raise NotImplementedError + + @staticmethod + def get_job_params(*, job_name): raise NotImplementedError @property diff --git a/app/grandchallenge/components/tasks.py b/app/grandchallenge/components/tasks.py index 1548dbabb9..ccfe4761d2 100644 --- a/app/grandchallenge/components/tasks.py +++ b/app/grandchallenge/components/tasks.py @@ -762,7 +762,8 @@ def handle_event(*, event, backend, retries=0): # noqa: C901 """ Backend = import_string(backend) # noqa: N806 - job_params = Backend.get_job_params(event=event) + job_name = Backend.get_job_name(event=event) + job_params = Backend.get_job_params(job_name=job_name) job = get_model_instance( pk=job_params.pk, diff --git a/app/tests/components_tests/test_amazon_sagemaker_batch_backend.py b/app/tests/components_tests/test_amazon_sagemaker_batch_backend.py index a894568245..24fe1abdc1 100644 --- a/app/tests/components_tests/test_amazon_sagemaker_batch_backend.py +++ b/app/tests/components_tests/test_amazon_sagemaker_batch_backend.py @@ -89,7 +89,8 @@ def test_get_job_params_match(key, model_name, app_label): event = { "TransformJobName": f"{settings.COMPONENTS_REGISTRY_PREFIX}-{key}-{pk}-00" } - job_params = AmazonSageMakerBatchExecutor.get_job_params(event=event) + job_name = AmazonSageMakerBatchExecutor.get_job_name(event=event) + job_params = AmazonSageMakerBatchExecutor.get_job_params(job_name=job_name) assert job_params.pk == str(pk) assert job_params.model_name == model_name @@ -131,12 +132,13 @@ def test_transform_job_name(model, container, container_model, key): executor = AmazonSageMakerBatchExecutor(**j.executor_kwargs) assert ( - executor._transform_job_name + executor._sagemaker_job_name == f"{settings.COMPONENTS_REGISTRY_PREFIX}-{key}-{j.pk}-00" ) - event = {"TransformJobName": executor._transform_job_name} - job_params = AmazonSageMakerBatchExecutor.get_job_params(event=event) + event = {"TransformJobName": executor._sagemaker_job_name} + job_name = AmazonSageMakerBatchExecutor.get_job_name(event=event) + job_params = AmazonSageMakerBatchExecutor.get_job_params(job_name=job_name) assert job_params.pk == str(j.pk) assert job_params.model_name == j._meta.model_name @@ -162,7 +164,7 @@ def test_execute(settings): method="create_transform_job", service_response={"TransformJobArn": "string"}, expected_params={ - "TransformJobName": executor._transform_job_name, + "TransformJobName": executor._sagemaker_job_name, "Environment": { "LOG_LEVEL": "INFO", "no_proxy": "amazonaws.com", @@ -540,7 +542,7 @@ def test_handle_completed_job(): executor._s3_client.upload_fileobj( Fileobj=f, Bucket=settings.COMPONENTS_OUTPUT_BUCKET_NAME, - Key=f"{executor._invocation_key}.out", + Key=executor._result_key, ) assert executor._handle_completed_job() is None From 0ec6ac8d9c07f92ca6f4b1c5b2ea1e3c05b9337b Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Thu, 14 Dec 2023 18:46:46 +0100 Subject: [PATCH 02/13] Add implementation of the SageMaker Training Backend --- .../backends/amazon_sagemaker_batch.py | 5 +- .../backends/amazon_sagemaker_training.py | 54 ++++++++++++++++--- .../components/backends/base.py | 8 +++ .../components/backends/docker.py | 3 +- app/grandchallenge/components/tasks.py | 8 +-- 5 files changed, 62 insertions(+), 16 deletions(-) diff --git a/app/grandchallenge/components/backends/amazon_sagemaker_batch.py b/app/grandchallenge/components/backends/amazon_sagemaker_batch.py index 820cea7fcd..2f4e4dff0f 100644 --- a/app/grandchallenge/components/backends/amazon_sagemaker_batch.py +++ b/app/grandchallenge/components/backends/amazon_sagemaker_batch.py @@ -58,10 +58,7 @@ def _create_job_boto(self): "InstanceType": self._instance_type.name, "InstanceCount": 1, }, - Environment={ # Up to 16 pairs - "LOG_LEVEL": "INFO", - "no_proxy": "amazonaws.com", - }, + Environment=self.invocation_environment, ModelClientConfig={ "InvocationsTimeoutInSeconds": self._time_limit, "InvocationsMaxRetries": 0, diff --git a/app/grandchallenge/components/backends/amazon_sagemaker_training.py b/app/grandchallenge/components/backends/amazon_sagemaker_training.py index c7fe265c83..7b18f7c283 100644 --- a/app/grandchallenge/components/backends/amazon_sagemaker_training.py +++ b/app/grandchallenge/components/backends/amazon_sagemaker_training.py @@ -1,3 +1,5 @@ +from django.conf import settings + from grandchallenge.components.backends.amazon_sagemaker_base import ( AmazonSageMakerBaseExecutor, ) @@ -11,25 +13,63 @@ def _log_group_name(self): @staticmethod def get_job_name(*, event): - raise NotImplementedError + return event["TrainingJobName"] def _get_job_status(self, *, event): - raise NotImplementedError + return event["TrainingJobStatus"] def _get_start_time(self, *, event): - raise NotImplementedError + return event.get("TrainingStartTime") def _get_end_time(self, *, event): - raise NotImplementedError + return event.get("TrainingEndTime") def _get_instance_name(self, *, event): - raise NotImplementedError + return event["ResourceConfig"]["InstanceType"] def _create_job_boto(self): - raise NotImplementedError + self._sagemaker_client.create_training_job( + TrainingJobName=self._sagemaker_job_name, + AlgorithmSpecification={ + # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_AlgorithmSpecification.html + "TrainingInputMode": "File", # Pipe | File | FastFile + "TrainingImage": self._exec_image_repo_tag, + "ContainerArguments": [ + "invoke", + "--file", + f"s3://{settings.COMPONENTS_INPUT_BUCKET_NAME}/{self._invocation_key}", + ], + }, + RoleArn=settings.COMPONENTS_AMAZON_SAGEMAKER_EXECUTION_ROLE_ARN, + OutputDataConfig={ + # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_OutputDataConfig.html + # TODO maybe don't put this in the io + "S3OutputPath": f"s3://{settings.COMPONENTS_OUTPUT_BUCKET_NAME}/{self._io_prefix}/.sagemaker-outputs", + }, + ResourceConfig={ + # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ResourceConfig.html + "VolumeSizeInGB": 30, # Matches SageMaker Batch Inference + "InstanceType": self._instance_type.name, + "InstanceCount": 1, + }, + StoppingCondition={ + # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StoppingCondition.html + "MaxRuntimeInSeconds": self._time_limit, + }, + # TODO Retry strategy? + Environment=self.invocation_environment, + VpcConfig={ + "SecurityGroupIds": [ + settings.COMPONENTS_AMAZON_SAGEMAKER_SECURITY_GROUP_ID + ], + "Subnets": settings.COMPONENTS_AMAZON_SAGEMAKER_SUBNETS, + }, + ) def _stop_job_boto(self): - raise NotImplementedError + self._sagemaker_client.stop_training_job( + TrainingJobName=self._sagemaker_job_name + ) def _get_invocation_json(self, *args, **kwargs): # SageMaker Training Jobs expect a list diff --git a/app/grandchallenge/components/backends/base.py b/app/grandchallenge/components/backends/base.py index f4e6cd2382..d20f9f216a 100644 --- a/app/grandchallenge/components/backends/base.py +++ b/app/grandchallenge/components/backends/base.py @@ -134,6 +134,14 @@ def usd_cents_per_hour(self): def runtime_metrics(self): ... + @property + def invocation_environment(self): + return { # Up to 16 pairs + "LOG_LEVEL": "INFO", + "PYTHONUNBUFFERED": "1", + "no_proxy": "amazonaws.com", + } + @property def compute_cost_euro_millicents(self): duration = self.duration diff --git a/app/grandchallenge/components/backends/docker.py b/app/grandchallenge/components/backends/docker.py index 87f8bc4966..acb9f053a6 100644 --- a/app/grandchallenge/components/backends/docker.py +++ b/app/grandchallenge/components/backends/docker.py @@ -104,7 +104,8 @@ def runtime_metrics(self): def _execute_container(self, *, input_civs, input_prefixes) -> None: environment = { - "NVIDIA_VISIBLE_DEVICES": settings.COMPONENTS_NVIDIA_VISIBLE_DEVICES + **self.invocation_environment, + "NVIDIA_VISIBLE_DEVICES": settings.COMPONENTS_NVIDIA_VISIBLE_DEVICES, } if settings.COMPONENTS_DOCKER_TASK_SET_AWS_ENV: diff --git a/app/grandchallenge/components/tasks.py b/app/grandchallenge/components/tasks.py index ccfe4761d2..a8e78ccad5 100644 --- a/app/grandchallenge/components/tasks.py +++ b/app/grandchallenge/components/tasks.py @@ -372,8 +372,7 @@ def _get_shim_env_vars(*, original_config): "GRAND_CHALLENGE_COMPONENT_ENTRYPOINT_B64J": encode_b64j( val=entrypoint ), - "no_proxy": "amazonaws.com", - "PYTHONUNBUFFERED": "1", + "TMPDIR": "/sagemaker-shim-unpacked", } @@ -404,8 +403,9 @@ def _set_root_555_perms( filter=_set_root_555_perms, ) - for dir in ["/input", "/output", "/tmp"]: - # /tmp is required by staticx + for dir in ["/input", "/output", "/sagemaker-shim-unpacked"]: + # staticx will unpack into ${TMPDIR} + # In sagemaker-shim we then set this back to /tmp tarinfo = tarfile.TarInfo(dir) tarinfo.type = tarfile.DIRTYPE tarinfo.uid = 0 From 7a67e724fb57ab50e7229e9edfcbe22067d7a3fc Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Fri, 15 Dec 2023 12:51:27 +0100 Subject: [PATCH 03/13] Update the demo algorithm --- Makefile | 10 +-- .../backends/amazon_sagemaker_base.py | 1 + app/grandchallenge/components/tasks.py | 11 ++- .../resources/gc_demo_algorithm/Dockerfile | 15 ++-- .../resources/gc_demo_algorithm/copy_io.py | 78 ++++++++++++++++++- scripts/algorithm_evaluation_fixtures.py | 55 +++++++------ scripts/cost_fixtures.py | 30 ++----- 7 files changed, 136 insertions(+), 64 deletions(-) diff --git a/Makefile b/Makefile index 5b00ee4472..f432870d07 100644 --- a/Makefile +++ b/Makefile @@ -113,13 +113,7 @@ retina_archive_structures: bash -c "python manage.py runscript create_retina_archive_structures" -scripts/algorithm_io.tar: - docker buildx build --platform linux/amd64 -t algorithm_io app/tests/resources/gc_demo_algorithm/ - docker save algorithm_io -o scripts/algorithm_io.tar - chmod a+r scripts/algorithm_io.tar - - -algorithm_evaluation_fixtures: scripts/algorithm_io.tar +algorithm_evaluation_fixtures: docker compose run \ -v $(shell readlink -f ./scripts/):/app/scripts:ro \ --rm \ @@ -127,7 +121,7 @@ algorithm_evaluation_fixtures: scripts/algorithm_io.tar python manage.py runscript algorithm_evaluation_fixtures -cost_fixtures: scripts/algorithm_io.tar +cost_fixtures: docker compose run \ -v $(shell readlink -f ./scripts/):/app/scripts:ro \ --rm \ diff --git a/app/grandchallenge/components/backends/amazon_sagemaker_base.py b/app/grandchallenge/components/backends/amazon_sagemaker_base.py index 48d875a847..3922181fe7 100644 --- a/app/grandchallenge/components/backends/amazon_sagemaker_base.py +++ b/app/grandchallenge/components/backends/amazon_sagemaker_base.py @@ -680,6 +680,7 @@ def _stop_running_jobs(self): okay_error_messages = { # Unstoppable job: "The request was rejected because the transform job is in status", + "The request was rejected because the training job is in status", # Job was never created: "Could not find job to update with name", } diff --git a/app/grandchallenge/components/tasks.py b/app/grandchallenge/components/tasks.py index a8e78ccad5..dfb7abd6c7 100644 --- a/app/grandchallenge/components/tasks.py +++ b/app/grandchallenge/components/tasks.py @@ -372,7 +372,6 @@ def _get_shim_env_vars(*, original_config): "GRAND_CHALLENGE_COMPONENT_ENTRYPOINT_B64J": encode_b64j( val=entrypoint ), - "TMPDIR": "/sagemaker-shim-unpacked", } @@ -403,9 +402,8 @@ def _set_root_555_perms( filter=_set_root_555_perms, ) - for dir in ["/input", "/output", "/sagemaker-shim-unpacked"]: - # staticx will unpack into ${TMPDIR} - # In sagemaker-shim we then set this back to /tmp + for dir in ["/input", "/output", "/tmp"]: + # staticx will unpack into /tmp tarinfo = tarfile.TarInfo(dir) tarinfo.type = tarfile.DIRTYPE tarinfo.uid = 0 @@ -418,6 +416,11 @@ def _set_root_555_perms( "crane", "mutate", original_repo_tag, + # Running as root is required on SageMaker Training + # due to the permissions of most of the filesystem + # including /tmp which we need to use + "--user", + "0:0", "--cmd", "", "--entrypoint", diff --git a/app/tests/resources/gc_demo_algorithm/Dockerfile b/app/tests/resources/gc_demo_algorithm/Dockerfile index 6123603477..11c770a432 100644 --- a/app/tests/resources/gc_demo_algorithm/Dockerfile +++ b/app/tests/resources/gc_demo_algorithm/Dockerfile @@ -1,12 +1,17 @@ -FROM python:3.10-alpine +FROM python:3.10-slim ENV PYTHONUNBUFFERED 1 -WORKDIR /tmp +RUN useradd -ms /bin/bash myuser +RUN groupadd -r mygroup +RUN usermod -a -G mygroup myuser -RUN addgroup -S app && adduser -S -G app app -USER app +WORKDIR /home/myuser -ADD copy_io.py /tmp +USER myuser + +RUN python3 -m pip install pynvml psutil + +ADD copy_io.py . ENTRYPOINT ["python", "copy_io.py"] diff --git a/app/tests/resources/gc_demo_algorithm/copy_io.py b/app/tests/resources/gc_demo_algorithm/copy_io.py index 7cc303c32e..ca5c806ea6 100644 --- a/app/tests/resources/gc_demo_algorithm/copy_io.py +++ b/app/tests/resources/gc_demo_algorithm/copy_io.py @@ -1,9 +1,72 @@ import json +import os +import urllib.error +import urllib.request from pathlib import Path from shutil import copy from warnings import warn -if __name__ == "__main__": +# noinspection PyUnresolvedReferences +import psutil + +# noinspection PyUnresolvedReferences +import pynvml + + +def check_connectivity(): + try: + urllib.request.urlopen("https://google.com/", timeout=5) + warn("COULD GOOGLE!") + except urllib.error.URLError as e: + print(f"CONNECTIVITY - Could not google: {e.reason}") + + +def check_partitions(): + disk_partitions = psutil.disk_partitions(all=True) + + print( + f"{'Filesystem':<32}" + f"{'Mountpoint':<64}" + f"Total / GB\t" + f"Free / GB\t" + f"Owner\t" + f"Permissions" + ) + + for partition in disk_partitions: + partition_info = psutil.disk_usage(partition.mountpoint) + partition_stat = os.stat(Path(partition.mountpoint)) + print( + f"{partition.device:<32}" + f"{partition.mountpoint:<64}" + f"{partition_info.total / (1024 * 1024 * 1024):.2f}\t\t" + f"{partition_info.free / (1024 * 1024 * 1024):.2f}\t" + f"{partition_stat.st_uid}:{partition_stat.st_gid}\t\t" + f"{oct(partition_stat.st_mode)}" + ) + + +def check_cuda(): + try: + pynvml.nvmlInit() + pynvml.nvmlDeviceGetCount() + + print(f"CUDA - Driver Version: {pynvml.nvmlSystemGetDriverVersion()}") + print( + f"CUDA - CUDA Driver Version: {pynvml.nvmlSystemGetCudaDriverVersion()}" + ) + + device_count = pynvml.nvmlDeviceGetCount() + for ii in range(device_count): + handle = pynvml.nvmlDeviceGetHandleByIndex(ii) + print(f"CUDA - Device {ii}: {pynvml.nvmlDeviceGetName(handle)}") + + pynvml.nvmlShutdown() + except pynvml.NVMLError as error: + print(f"CUDA - Pynvml error: {error}") + + +def create_output(): res = {"score": 1} # dummy metric for ranking on leaderboard files = {x for x in Path("/input").rglob("*") if x.is_file()} @@ -25,3 +88,16 @@ for output_filename in ["results", "metrics"]: with open(f"/output/{output_filename}.json", "w") as f: f.write(json.dumps(res)) + + +if __name__ == "__main__": + check_connectivity() + print("") + + check_partitions() + print("") + + check_cuda() + print("") + + create_output() diff --git a/scripts/algorithm_evaluation_fixtures.py b/scripts/algorithm_evaluation_fixtures.py index e7b2ef3576..ea99b84112 100644 --- a/scripts/algorithm_evaluation_fixtures.py +++ b/scripts/algorithm_evaluation_fixtures.py @@ -1,6 +1,9 @@ +import gzip import os +import shutil from contextlib import contextmanager from pathlib import Path +from tempfile import TemporaryDirectory from django.conf import settings from django.contrib.auth import get_user_model @@ -10,6 +13,7 @@ from grandchallenge.archives.models import Archive, ArchiveItem from grandchallenge.cases.models import Image, ImageFile from grandchallenge.challenges.models import Challenge +from grandchallenge.components.backends import docker_client from grandchallenge.components.models import ( ComponentInterface, ComponentInterfaceValue, @@ -45,12 +49,6 @@ def run(): outputs=outputs, suffix=f"Image {challenge_count}", ) - _create_algorithm( - creator=users["demop"], - inputs=_get_json_file_inputs(), - outputs=outputs, - suffix=f"File {challenge_count}", - ) def _get_users(): @@ -64,17 +62,6 @@ def _get_inputs(): ) -def _get_json_file_inputs(): - return [ - ComponentInterface.objects.get_or_create( - title="JSON File", - relative_path="json-file", - kind=ComponentInterface.Kind.ANY, - store_in_database=False, - )[0] - ] - - def _get_outputs(): return ComponentInterface.objects.filter( slug__in=["generic-medical-image", "results-json-file"] @@ -146,7 +133,7 @@ def _create_challenge( m = Method(creator=creator, phase=p) - with _uploaded_container_image() as container: + with _gc_demo_algorithm() as container: m.image.save("algorithm_io.tar", container) @@ -161,14 +148,38 @@ def _create_algorithm(*, creator, inputs, outputs, suffix): algorithm_image = AlgorithmImage(creator=creator, algorithm=algorithm) - with _uploaded_container_image() as container: + with _gc_demo_algorithm() as container: algorithm_image.image.save("algorithm_io.tar", container) @contextmanager -def _uploaded_container_image(): - path = Path(__file__).parent / "algorithm_io.tar" - yield from _uploaded_file(path=path) +def _gc_demo_algorithm(): + with TemporaryDirectory() as tmp_dir: + tmp_path = Path(tmp_dir) + + repo_tag = "fixtures-algorithm-io:latest" + demo_algorithm_path = ( + Path(__file__).parent.parent + / "app" + / "tests" + / "resources" + / "gc_demo_algorithm" + ) + + docker_client.build_image( + path=str(demo_algorithm_path.absolute()), repo_tag=repo_tag + ) + + outfile = tmp_path / f"{repo_tag}.tar" + output_gz = f"{outfile}.gz" + + docker_client.save_image(repo_tag=repo_tag, output=outfile) + + with open(outfile, "rb") as f_in: + with gzip.open(output_gz, "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + + yield from _uploaded_file(path=output_gz) @contextmanager diff --git a/scripts/cost_fixtures.py b/scripts/cost_fixtures.py index 3aa9d7abbb..fc0b980a25 100644 --- a/scripts/cost_fixtures.py +++ b/scripts/cost_fixtures.py @@ -1,12 +1,8 @@ -import os import random -from contextlib import contextmanager from datetime import timedelta -from pathlib import Path from django.conf import settings from django.contrib.auth import get_user_model -from django.core.files.base import ContentFile from django.utils.timezone import now from grandchallenge.algorithms.models import Algorithm, AlgorithmImage, Job @@ -21,6 +17,10 @@ from grandchallenge.evaluation.models import Evaluation, Method, Submission from grandchallenge.evaluation.utils import SubmissionKindChoices from grandchallenge.workstations.models import Workstation +from scripts.algorithm_evaluation_fixtures import ( + _gc_demo_algorithm, + _uploaded_image_file, +) def run(): @@ -144,7 +144,7 @@ def _create_challenge( m = Method(creator=creator, phase=p) - with _uploaded_container_image() as container: + with _gc_demo_algorithm() as container: m.image.save("algorithm_io.tar", container) return c @@ -161,7 +161,7 @@ def _create_algorithm(*, creator, inputs, outputs, suffix): algorithm_image = AlgorithmImage(creator=creator, algorithm=algorithm) - with _uploaded_container_image() as container: + with _gc_demo_algorithm() as container: algorithm_image.image.save("algorithm_io.tar", container) return algorithm @@ -194,21 +194,3 @@ def _create_submission(algorithm, challenge, archive_items): submission=sub, method=phase.method_set.last() ) e1.inputs.add(*eval_inputs) - - -@contextmanager -def _uploaded_container_image(): - path = Path(__file__).parent / "algorithm_io.tar" - yield from _uploaded_file(path=path) - - -@contextmanager -def _uploaded_image_file(): - path = Path(__file__).parent / "image10x10x10.mha" - yield from _uploaded_file(path=path) - - -def _uploaded_file(*, path): - with open(os.path.join(settings.SITE_ROOT, path), "rb") as f: - with ContentFile(f.read()) as content: - yield content From fe95f523aeaac65dd870d1f2d36b735607f5c713 Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Sat, 16 Dec 2023 16:12:34 +0100 Subject: [PATCH 04/13] Update sagemaker shim --- app/grandchallenge/components/tasks.py | 4 +++- app/tests/resources/gc_demo_algorithm/copy_io.py | 10 ++++++++++ dockerfiles/web-base/Dockerfile | 4 ++-- poetry.lock | 1 + 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/app/grandchallenge/components/tasks.py b/app/grandchallenge/components/tasks.py index dfb7abd6c7..ab7634656d 100644 --- a/app/grandchallenge/components/tasks.py +++ b/app/grandchallenge/components/tasks.py @@ -366,12 +366,14 @@ def _get_shim_env_vars(*, original_config): """Get the environment variables for a shimmed container image""" cmd = original_config["config"].get("Cmd") entrypoint = original_config["config"].get("Entrypoint") + user = original_config["config"]["User"] return { "GRAND_CHALLENGE_COMPONENT_CMD_B64J": encode_b64j(val=cmd), "GRAND_CHALLENGE_COMPONENT_ENTRYPOINT_B64J": encode_b64j( val=entrypoint ), + "GRAND_CHALLENGE_COMPONENT_USER": user, } @@ -420,7 +422,7 @@ def _set_root_555_perms( # due to the permissions of most of the filesystem # including /tmp which we need to use "--user", - "0:0", + "0", "--cmd", "", "--entrypoint", diff --git a/app/tests/resources/gc_demo_algorithm/copy_io.py b/app/tests/resources/gc_demo_algorithm/copy_io.py index ca5c806ea6..643cb0af93 100644 --- a/app/tests/resources/gc_demo_algorithm/copy_io.py +++ b/app/tests/resources/gc_demo_algorithm/copy_io.py @@ -1,5 +1,7 @@ +import grp import json import os +import pwd import urllib.error import urllib.request from pathlib import Path @@ -91,6 +93,14 @@ def create_output(): if __name__ == "__main__": + print(f"Current user: {pwd.getpwuid(os.getuid())}") + print(f"Current group: {grp.getgrgid(os.getgid())}") + print("") + + for k, v in os.environ.items(): + print(f"{k}={v}") + print("") + check_connectivity() print("") diff --git a/dockerfiles/web-base/Dockerfile b/dockerfiles/web-base/Dockerfile index 10bfa12bb1..7f86f2d8c8 100644 --- a/dockerfiles/web-base/Dockerfile +++ b/dockerfiles/web-base/Dockerfile @@ -76,7 +76,7 @@ RUN mkdir -p /opt/docker \ ENV PYTHONUNBUFFERED=1\ AWS_XRAY_SDK_ENABLED=false\ - COMPONENTS_SAGEMAKER_SHIM_VERSION=0.2.0\ + COMPONENTS_SAGEMAKER_SHIM_VERSION=0.2.1a2\ PATH="/opt/poetry/.venv/bin:/home/django/.local/bin:${PATH}" RUN mkdir -p /opt/poetry /app /static /opt/sagemaker-shim \ @@ -88,7 +88,7 @@ USER django:django # Fetch and install sagemaker shim for shimming containers RUN mkdir -p /opt/sagemaker-shim \ && wget "https://github.com/DIAGNijmegen/rse-sagemaker-shim/releases/download/v${COMPONENTS_SAGEMAKER_SHIM_VERSION}/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" -P /opt/sagemaker-shim/ \ - && echo "287d900f8c86723ee4d32e7a2520d83490b4b6d647c2dd65c0995824394ae7c2 /opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" | shasum -c - || exit 1 \ + && echo "605f232c468cf0d07c06cb999f9bfb86a5d4f6adb85f3d689a1f993a36a9e8ff /opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" | shasum -c - || exit 1 \ && tar -C /opt/sagemaker-shim/ -xzvf "/opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" \ && rm "/opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" diff --git a/poetry.lock b/poetry.lock index 04b87883a6..d564c09b4c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,6 @@ # This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. + [[package]] name = "aiohttp" version = "3.9.1" From f497970068211b59a8988dbfb58479641e70334f Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Sat, 16 Dec 2023 17:38:53 +0100 Subject: [PATCH 05/13] Add taking directory ownership on SageMaker Training --- .../backends/amazon_sagemaker_training.py | 6 ++++- app/grandchallenge/components/tasks.py | 2 +- .../resources/gc_demo_algorithm/copy_io.py | 25 +++++++++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/app/grandchallenge/components/backends/amazon_sagemaker_training.py b/app/grandchallenge/components/backends/amazon_sagemaker_training.py index 7b18f7c283..235929c303 100644 --- a/app/grandchallenge/components/backends/amazon_sagemaker_training.py +++ b/app/grandchallenge/components/backends/amazon_sagemaker_training.py @@ -57,7 +57,11 @@ def _create_job_boto(self): "MaxRuntimeInSeconds": self._time_limit, }, # TODO Retry strategy? - Environment=self.invocation_environment, + Environment={ + **self.invocation_environment, + # https://docs.aws.amazon.com/sagemaker/latest/dg/model-train-storage.html#model-train-storage-env-var-summary + "GRAND_CHALLENGE_COMPONENT_WRITABLE_DIRECTORIES": "/opt/ml/output/data:/opt/ml/model:/opt/ml/checkpoints:/tmp", + }, VpcConfig={ "SecurityGroupIds": [ settings.COMPONENTS_AMAZON_SAGEMAKER_SECURITY_GROUP_ID diff --git a/app/grandchallenge/components/tasks.py b/app/grandchallenge/components/tasks.py index ab7634656d..00c6c170c7 100644 --- a/app/grandchallenge/components/tasks.py +++ b/app/grandchallenge/components/tasks.py @@ -410,7 +410,7 @@ def _set_root_555_perms( tarinfo.type = tarfile.DIRTYPE tarinfo.uid = 0 tarinfo.gid = 0 - tarinfo.mode = 0o777 + tarinfo.mode = 0o755 if dir == "/input" else 0o777 f.addfile(tarinfo=tarinfo) _repo_login_and_run( diff --git a/app/tests/resources/gc_demo_algorithm/copy_io.py b/app/tests/resources/gc_demo_algorithm/copy_io.py index 643cb0af93..bcb154a2c2 100644 --- a/app/tests/resources/gc_demo_algorithm/copy_io.py +++ b/app/tests/resources/gc_demo_algorithm/copy_io.py @@ -6,6 +6,7 @@ import urllib.request from pathlib import Path from shutil import copy +from tempfile import TemporaryDirectory from warnings import warn # noinspection PyUnresolvedReferences @@ -68,6 +69,27 @@ def check_cuda(): print(f"CUDA - Pynvml error: {error}") +def check_temporary_file(): + with TemporaryDirectory() as tmp_dir: + file = Path(tmp_dir) / "test" + file.touch() + print(file) + + directory = Path(tmp_dir) / "1" / "2" + directory.mkdir(parents=True) + print(directory) + + file = Path("/tmp") / "test" + file.touch() + print(file) + + tmp_directory = Path("/tmp") / "1" / "2" + tmp_directory.mkdir(parents=True) + print(directory) + + print("TEMPORARY FILE - successfully created") + + def create_output(): res = {"score": 1} # dummy metric for ranking on leaderboard files = {x for x in Path("/input").rglob("*") if x.is_file()} @@ -110,4 +132,7 @@ def create_output(): check_cuda() print("") + check_temporary_file() + print("") + create_output() From b6a6597697cca7eb9f70cb8c954894d0286d08cc Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Sat, 16 Dec 2023 17:46:07 +0100 Subject: [PATCH 06/13] Update SageMaker Shim --- dockerfiles/web-base/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dockerfiles/web-base/Dockerfile b/dockerfiles/web-base/Dockerfile index 7f86f2d8c8..e672673e91 100644 --- a/dockerfiles/web-base/Dockerfile +++ b/dockerfiles/web-base/Dockerfile @@ -76,7 +76,7 @@ RUN mkdir -p /opt/docker \ ENV PYTHONUNBUFFERED=1\ AWS_XRAY_SDK_ENABLED=false\ - COMPONENTS_SAGEMAKER_SHIM_VERSION=0.2.1a2\ + COMPONENTS_SAGEMAKER_SHIM_VERSION=0.2.1\ PATH="/opt/poetry/.venv/bin:/home/django/.local/bin:${PATH}" RUN mkdir -p /opt/poetry /app /static /opt/sagemaker-shim \ @@ -88,7 +88,7 @@ USER django:django # Fetch and install sagemaker shim for shimming containers RUN mkdir -p /opt/sagemaker-shim \ && wget "https://github.com/DIAGNijmegen/rse-sagemaker-shim/releases/download/v${COMPONENTS_SAGEMAKER_SHIM_VERSION}/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" -P /opt/sagemaker-shim/ \ - && echo "605f232c468cf0d07c06cb999f9bfb86a5d4f6adb85f3d689a1f993a36a9e8ff /opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" | shasum -c - || exit 1 \ + && echo "4cfb76aed25d48d7be21598fb7dde2ba44a669550d8222d687df4f76e4a34267 /opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" | shasum -c - || exit 1 \ && tar -C /opt/sagemaker-shim/ -xzvf "/opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" \ && rm "/opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" From 4705786310ccd51d0fe19d055afeae06e6226118 Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Sat, 16 Dec 2023 19:29:35 +0100 Subject: [PATCH 07/13] Add training output prefix --- .../backends/amazon_sagemaker_training.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/app/grandchallenge/components/backends/amazon_sagemaker_training.py b/app/grandchallenge/components/backends/amazon_sagemaker_training.py index 235929c303..bc4e59f756 100644 --- a/app/grandchallenge/components/backends/amazon_sagemaker_training.py +++ b/app/grandchallenge/components/backends/amazon_sagemaker_training.py @@ -1,4 +1,5 @@ from django.conf import settings +from django.utils._os import safe_join from grandchallenge.components.backends.amazon_sagemaker_base import ( AmazonSageMakerBaseExecutor, @@ -11,6 +12,10 @@ def _log_group_name(self): # Hardcoded by AWS return "/aws/sagemaker/TrainingJobs" + @property + def _training_output_prefix(self): + return safe_join("/training-outputs", *self.job_path_parts) + @staticmethod def get_job_name(*, event): return event["TrainingJobName"] @@ -43,8 +48,7 @@ def _create_job_boto(self): RoleArn=settings.COMPONENTS_AMAZON_SAGEMAKER_EXECUTION_ROLE_ARN, OutputDataConfig={ # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_OutputDataConfig.html - # TODO maybe don't put this in the io - "S3OutputPath": f"s3://{settings.COMPONENTS_OUTPUT_BUCKET_NAME}/{self._io_prefix}/.sagemaker-outputs", + "S3OutputPath": f"s3://{settings.COMPONENTS_OUTPUT_BUCKET_NAME}/{self._training_output_prefix}", }, ResourceConfig={ # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ResourceConfig.html @@ -56,7 +60,6 @@ def _create_job_boto(self): # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StoppingCondition.html "MaxRuntimeInSeconds": self._time_limit, }, - # TODO Retry strategy? Environment={ **self.invocation_environment, # https://docs.aws.amazon.com/sagemaker/latest/dg/model-train-storage.html#model-train-storage-env-var-summary @@ -75,6 +78,14 @@ def _stop_job_boto(self): TrainingJobName=self._sagemaker_job_name ) + def deprovision(self): + super().deprovision() + + self._delete_objects( + bucket=settings.COMPONENTS_OUTPUT_BUCKET_NAME, + prefix=self._training_output_prefix, + ) + def _get_invocation_json(self, *args, **kwargs): # SageMaker Training Jobs expect a list invocation_json = super()._get_invocation_json(*args, **kwargs) From b6fffae05b87a5a554add8ddb24eabb972633d6e Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Sat, 16 Dec 2023 19:35:33 +0100 Subject: [PATCH 08/13] Fix component tests --- .../test_amazon_sagemaker_batch_backend.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/app/tests/components_tests/test_amazon_sagemaker_batch_backend.py b/app/tests/components_tests/test_amazon_sagemaker_batch_backend.py index 24fe1abdc1..082a9a46d6 100644 --- a/app/tests/components_tests/test_amazon_sagemaker_batch_backend.py +++ b/app/tests/components_tests/test_amazon_sagemaker_batch_backend.py @@ -167,6 +167,7 @@ def test_execute(settings): "TransformJobName": executor._sagemaker_job_name, "Environment": { "LOG_LEVEL": "INFO", + "PYTHONUNBUFFERED": "1", "no_proxy": "amazonaws.com", }, "ModelClientConfig": { @@ -537,7 +538,11 @@ def test_handle_completed_job(): return_code = 0 with io.BytesIO() as f: - f.write(json.dumps({"return_code": return_code}).encode("utf-8")) + f.write( + json.dumps( + {"return_code": return_code, "pk": f"algorithms-job-{pk}"} + ).encode("utf-8") + ) f.seek(0) executor._s3_client.upload_fileobj( Fileobj=f, From 21099d1b456e8749876eb82a39e7251d97527c4e Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Sat, 16 Dec 2023 20:53:24 +0100 Subject: [PATCH 09/13] Fix docker tests --- app/config/settings.py | 3 +++ .../components/backends/docker_client.py | 10 ++++++++++ app/tests/settings.py | 1 + dockerfiles/web-base/Dockerfile | 4 ++-- 4 files changed, 16 insertions(+), 2 deletions(-) diff --git a/app/config/settings.py b/app/config/settings.py index de1ff62c36..d8945fb70e 100644 --- a/app/config/settings.py +++ b/app/config/settings.py @@ -1077,6 +1077,9 @@ COMPONENTS_DOCKER_TASK_AWS_SECRET_ACCESS_KEY = os.environ.get( "COMPONENTS_DOCKER_TASK_AWS_SECRET_ACCESS_KEY", "componentstask123" ) +COMPONENTS_DOCKER_ADD_CAP_SET_UID_GID_UNSAFE = strtobool( + os.environ.get("COMPONENTS_DOCKER_ADD_CAP_SET_UID_GID_UNSAFE", "False") +) COMPONENTS_PUBLISH_PORTS = strtobool( os.environ.get("COMPONENTS_PUBLISH_PORTS", "False") ) diff --git a/app/grandchallenge/components/backends/docker_client.py b/app/grandchallenge/components/backends/docker_client.py index f3a31f4013..193ceb29ed 100644 --- a/app/grandchallenge/components/backends/docker_client.py +++ b/app/grandchallenge/components/backends/docker_client.py @@ -188,6 +188,16 @@ def run_container( # noqa: C901 if remove: docker_args.append("--rm") + if settings.COMPONENTS_DOCKER_ADD_CAP_SET_UID_GID_UNSAFE: + docker_args.extend( + [ + "--cap-add", + "CAP_SETGID", + "--cap-add", + "CAP_SETUID", + ] + ) + if settings.COMPONENTS_DOCKER_RUNTIME is not None: docker_args.extend(["--runtime", settings.COMPONENTS_DOCKER_RUNTIME]) diff --git a/app/tests/settings.py b/app/tests/settings.py index 18626b2b7d..6b65c85fb1 100644 --- a/app/tests/settings.py +++ b/app/tests/settings.py @@ -14,6 +14,7 @@ SECURE_SSL_REDIRECT = False DEFAULT_SCHEME = "https" COMPONENTS_REGISTRY_PREFIX = "localhost" +COMPONENTS_DOCKER_ADD_CAP_SET_UID_GID_UNSAFE = True TEMPLATES[0]["DIRS"].append(SITE_ROOT / "tests" / "templates") # noqa 405 diff --git a/dockerfiles/web-base/Dockerfile b/dockerfiles/web-base/Dockerfile index e672673e91..97b528bc51 100644 --- a/dockerfiles/web-base/Dockerfile +++ b/dockerfiles/web-base/Dockerfile @@ -76,7 +76,7 @@ RUN mkdir -p /opt/docker \ ENV PYTHONUNBUFFERED=1\ AWS_XRAY_SDK_ENABLED=false\ - COMPONENTS_SAGEMAKER_SHIM_VERSION=0.2.1\ + COMPONENTS_SAGEMAKER_SHIM_VERSION=0.2.2\ PATH="/opt/poetry/.venv/bin:/home/django/.local/bin:${PATH}" RUN mkdir -p /opt/poetry /app /static /opt/sagemaker-shim \ @@ -88,7 +88,7 @@ USER django:django # Fetch and install sagemaker shim for shimming containers RUN mkdir -p /opt/sagemaker-shim \ && wget "https://github.com/DIAGNijmegen/rse-sagemaker-shim/releases/download/v${COMPONENTS_SAGEMAKER_SHIM_VERSION}/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" -P /opt/sagemaker-shim/ \ - && echo "4cfb76aed25d48d7be21598fb7dde2ba44a669550d8222d687df4f76e4a34267 /opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" | shasum -c - || exit 1 \ + && echo "9af7362f5c7f43b3665611bbac28168516dfabb3797d3f0e4e676f47681c63f6 /opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" | shasum -c - || exit 1 \ && tar -C /opt/sagemaker-shim/ -xzvf "/opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" \ && rm "/opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" From e22cccdeefb792401e62da3d851dc0d202187311 Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Sat, 16 Dec 2023 20:58:17 +0100 Subject: [PATCH 10/13] Fix cost fixtures --- scripts/cost_fixtures.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/scripts/cost_fixtures.py b/scripts/cost_fixtures.py index fc0b980a25..38a3c2ffc3 100644 --- a/scripts/cost_fixtures.py +++ b/scripts/cost_fixtures.py @@ -14,7 +14,12 @@ ComponentInterfaceValue, ) from grandchallenge.core.fixtures import create_uploaded_image -from grandchallenge.evaluation.models import Evaluation, Method, Submission +from grandchallenge.evaluation.models import ( + Evaluation, + Method, + Phase, + Submission, +) from grandchallenge.evaluation.utils import SubmissionKindChoices from grandchallenge.workstations.models import Workstation from scripts.algorithm_evaluation_fixtures import ( @@ -130,7 +135,7 @@ def _create_challenge( for participant in participants: c.add_participant(participant) - p = c.phase_set.first() + p = Phase.objects.create(challenge=c, title="Phase 1") p.algorithm_inputs.set(inputs) p.algorithm_outputs.set(outputs) From 0ceeee240db974d71db2851fc078951ecfef088c Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Sat, 16 Dec 2023 21:11:58 +0100 Subject: [PATCH 11/13] Fix test fixture --- app/tests/algorithms_tests/resources/docker/Dockerfile | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/app/tests/algorithms_tests/resources/docker/Dockerfile b/app/tests/algorithms_tests/resources/docker/Dockerfile index e03af581d1..f3eea38b00 100644 --- a/app/tests/algorithms_tests/resources/docker/Dockerfile +++ b/app/tests/algorithms_tests/resources/docker/Dockerfile @@ -2,11 +2,9 @@ FROM python:3.10-alpine ENV PYTHONUNBUFFERED 1 -WORKDIR /tmp - RUN addgroup -S app && adduser -S -G app app USER app -ADD run_algorithm.py /tmp +COPY run_algorithm.py . ENTRYPOINT ["python", "run_algorithm.py"] From 32f40b3632145d1425c35e9e11e20cec5fbef016 Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Sat, 16 Dec 2023 21:33:21 +0100 Subject: [PATCH 12/13] Fix test fixture --- .../algorithms_tests/resources/docker/Dockerfile | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/app/tests/algorithms_tests/resources/docker/Dockerfile b/app/tests/algorithms_tests/resources/docker/Dockerfile index f3eea38b00..9d99774502 100644 --- a/app/tests/algorithms_tests/resources/docker/Dockerfile +++ b/app/tests/algorithms_tests/resources/docker/Dockerfile @@ -1,9 +1,14 @@ -FROM python:3.10-alpine +FROM python:3.10-slim ENV PYTHONUNBUFFERED 1 -RUN addgroup -S app && adduser -S -G app app -USER app +RUN useradd -ms /bin/bash myuser +RUN groupadd -r mygroup +RUN usermod -a -G mygroup myuser + +WORKDIR /home/myuser + +USER myuser COPY run_algorithm.py . From 01e3ab3a6756a53096c533cb153447bca72391a2 Mon Sep 17 00:00:00 2001 From: James Meakin <12661555+jmsmkn@users.noreply.github.com> Date: Sat, 16 Dec 2023 22:58:19 +0100 Subject: [PATCH 13/13] Fix undeletable directories --- app/config/settings.py | 4 ++-- .../components/backends/docker_client.py | 13 ++----------- app/grandchallenge/components/tasks.py | 6 +++--- app/tests/resources/gc_demo_algorithm/copy_io.py | 11 +++++++++++ app/tests/settings.py | 2 +- dockerfiles/web-base/Dockerfile | 4 ++-- 6 files changed, 21 insertions(+), 19 deletions(-) diff --git a/app/config/settings.py b/app/config/settings.py index d8945fb70e..ea535732f7 100644 --- a/app/config/settings.py +++ b/app/config/settings.py @@ -1077,8 +1077,8 @@ COMPONENTS_DOCKER_TASK_AWS_SECRET_ACCESS_KEY = os.environ.get( "COMPONENTS_DOCKER_TASK_AWS_SECRET_ACCESS_KEY", "componentstask123" ) -COMPONENTS_DOCKER_ADD_CAP_SET_UID_GID_UNSAFE = strtobool( - os.environ.get("COMPONENTS_DOCKER_ADD_CAP_SET_UID_GID_UNSAFE", "False") +COMPONENTS_DOCKER_KEEP_CAPS_UNSAFE = strtobool( + os.environ.get("COMPONENTS_DOCKER_KEEP_CAPS_UNSAFE", "False") ) COMPONENTS_PUBLISH_PORTS = strtobool( os.environ.get("COMPONENTS_PUBLISH_PORTS", "False") diff --git a/app/grandchallenge/components/backends/docker_client.py b/app/grandchallenge/components/backends/docker_client.py index 193ceb29ed..324b46ebbb 100644 --- a/app/grandchallenge/components/backends/docker_client.py +++ b/app/grandchallenge/components/backends/docker_client.py @@ -167,8 +167,6 @@ def run_container( # noqa: C901 str(settings.COMPONENTS_CPU_SHARES), "--cpuset-cpus", _get_cpuset_cpus(), - "--cap-drop", - "all", "--security-opt", "no-new-privileges", "--pids-limit", @@ -188,15 +186,8 @@ def run_container( # noqa: C901 if remove: docker_args.append("--rm") - if settings.COMPONENTS_DOCKER_ADD_CAP_SET_UID_GID_UNSAFE: - docker_args.extend( - [ - "--cap-add", - "CAP_SETGID", - "--cap-add", - "CAP_SETUID", - ] - ) + if not settings.COMPONENTS_DOCKER_KEEP_CAPS_UNSAFE: + docker_args.extend(["--cap-drop", "all"]) if settings.COMPONENTS_DOCKER_RUNTIME is not None: docker_args.extend(["--runtime", settings.COMPONENTS_DOCKER_RUNTIME]) diff --git a/app/grandchallenge/components/tasks.py b/app/grandchallenge/components/tasks.py index 00c6c170c7..12a70cd100 100644 --- a/app/grandchallenge/components/tasks.py +++ b/app/grandchallenge/components/tasks.py @@ -387,12 +387,12 @@ def _mutate_container_image( with tarfile.open(new_layer, "w") as f: - def _set_root_555_perms( + def _set_root_500_perms( tarinfo, ): tarinfo.uid = 0 tarinfo.gid = 0 - tarinfo.mode = 0o555 + tarinfo.mode = 0o500 return tarinfo f.add( @@ -401,7 +401,7 @@ def _set_root_555_perms( f"sagemaker-shim-{version}-Linux-x86_64" ), arcname="/sagemaker-shim", - filter=_set_root_555_perms, + filter=_set_root_500_perms, ) for dir in ["/input", "/output", "/tmp"]: diff --git a/app/tests/resources/gc_demo_algorithm/copy_io.py b/app/tests/resources/gc_demo_algorithm/copy_io.py index bcb154a2c2..5f3354fd08 100644 --- a/app/tests/resources/gc_demo_algorithm/copy_io.py +++ b/app/tests/resources/gc_demo_algorithm/copy_io.py @@ -90,6 +90,14 @@ def check_temporary_file(): print("TEMPORARY FILE - successfully created") +def check_permissions(): + try: + Path("/tmp").chmod(0o777) + warn("COULD CHANGE DIRECTORY PERMS!") + except PermissionError as e: + print(f"CHMOD PERMISSIONS - Could not change permissions {e}") + + def create_output(): res = {"score": 1} # dummy metric for ranking on leaderboard files = {x for x in Path("/input").rglob("*") if x.is_file()} @@ -135,4 +143,7 @@ def create_output(): check_temporary_file() print("") + check_permissions() + print("") + create_output() diff --git a/app/tests/settings.py b/app/tests/settings.py index 6b65c85fb1..47b153f233 100644 --- a/app/tests/settings.py +++ b/app/tests/settings.py @@ -14,7 +14,7 @@ SECURE_SSL_REDIRECT = False DEFAULT_SCHEME = "https" COMPONENTS_REGISTRY_PREFIX = "localhost" -COMPONENTS_DOCKER_ADD_CAP_SET_UID_GID_UNSAFE = True +COMPONENTS_DOCKER_KEEP_CAPS_UNSAFE = True TEMPLATES[0]["DIRS"].append(SITE_ROOT / "tests" / "templates") # noqa 405 diff --git a/dockerfiles/web-base/Dockerfile b/dockerfiles/web-base/Dockerfile index 97b528bc51..c5f32fd2db 100644 --- a/dockerfiles/web-base/Dockerfile +++ b/dockerfiles/web-base/Dockerfile @@ -76,7 +76,7 @@ RUN mkdir -p /opt/docker \ ENV PYTHONUNBUFFERED=1\ AWS_XRAY_SDK_ENABLED=false\ - COMPONENTS_SAGEMAKER_SHIM_VERSION=0.2.2\ + COMPONENTS_SAGEMAKER_SHIM_VERSION=0.2.3\ PATH="/opt/poetry/.venv/bin:/home/django/.local/bin:${PATH}" RUN mkdir -p /opt/poetry /app /static /opt/sagemaker-shim \ @@ -88,7 +88,7 @@ USER django:django # Fetch and install sagemaker shim for shimming containers RUN mkdir -p /opt/sagemaker-shim \ && wget "https://github.com/DIAGNijmegen/rse-sagemaker-shim/releases/download/v${COMPONENTS_SAGEMAKER_SHIM_VERSION}/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" -P /opt/sagemaker-shim/ \ - && echo "9af7362f5c7f43b3665611bbac28168516dfabb3797d3f0e4e676f47681c63f6 /opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" | shasum -c - || exit 1 \ + && echo "e118f740ad304a1e63a820a0ec42422f9ffd5d21f067de5e2d6efa7013fa61b3 /opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" | shasum -c - || exit 1 \ && tar -C /opt/sagemaker-shim/ -xzvf "/opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz" \ && rm "/opt/sagemaker-shim/sagemaker-shim-${COMPONENTS_SAGEMAKER_SHIM_VERSION}-Linux-x86_64.tar.gz"