diff --git a/analytics/analytics/management/commands/collect_running_jobs.py b/analytics/analytics/management/commands/collect_running_jobs.py new file mode 100644 index 000000000..b72b1afd1 --- /dev/null +++ b/analytics/analytics/management/commands/collect_running_jobs.py @@ -0,0 +1,229 @@ +import logging +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Literal + +import djclick as click +import kubernetes +from django.db.utils import IntegrityError +from kubernetes.client.models.v1_pod import V1Pod +from kubernetes.utils.quantity import parse_quantity + +from analytics.models import Job + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +# Ensure kubernetes API is setup +try: + kubernetes.config.load_config() +except kubernetes.config.config_exception.ConfigException as e: + logger.error("Could not load kubernetes config. Is KUBECONFIG set?") + logger.error(f"\tExact error from kubernetes: {e}") + exit(1) + + +client = kubernetes.client.CoreV1Api() + + +@dataclass +class PodMetadata: + project_id: str + job_id: str + job_name: str + job_started_at: str + job_size: str + job_ref: str + package_name: str + cpu_request: float | None + memory_request: int | None + package_version: str + compiler_name: str + compiler_version: str + arch: str + package_variants: str + stack: str + build_jobs: str | None = None + + +@dataclass +class NodeMetadata: + name: str + uid: str + instance_type: str + capacity_type: Literal["spot", "on-demand"] + cpu: int + mem: int + + +@dataclass +class JobMetadata: + node: NodeMetadata + pod: PodMetadata + + +def get_pod_metadata(pod: V1Pod) -> PodMetadata: + """Get data from the pod that's necessary for storing a job.""" + pod_dict = pod.to_dict() + pod_env = next( + (x["env"] for x in pod_dict["spec"]["containers"] if x["name"] == "build"), + None, + ) + if pod_env is None: + raise Exception( + f"Build container not found on pod {pod_dict['metadata']['name']}" + ) + + # Convert pod_env to a dictionary mapping keys to values + pod_env = {var["name"]: var["value"] for var in pod_env} + + # Retrieve labels and annotations + labels: dict = pod_dict["metadata"]["labels"] + annotations: dict = pod_dict["metadata"]["annotations"] + + # Retrieve k8s resource requests, if they're set + cpu_request = pod_env.get("KUBERNETES_CPU_REQUEST") + memory_request = pod_env.get("KUBERNETES_MEMORY_REQUEST") + + # Return data in one place + return PodMetadata( + project_id=pod_env["CI_PROJECT_ID"], + job_id=labels["gitlab/ci_job_id"], + job_name=pod_env["CI_JOB_NAME"], + job_started_at=pod_env["CI_JOB_STARTED_AT"], + job_size=labels["gitlab/ci_job_size"], + job_ref=pod_env["CI_COMMIT_REF_NAME"], + # Note: tags not provided here, will be populated in the gitlab webhook + package_name=labels["metrics/spack_job_spec_pkg_name"], + cpu_request=float(parse_quantity(cpu_request)) if cpu_request else None, + memory_request=int(parse_quantity(memory_request)) if memory_request else None, + package_version=annotations["metrics/spack_job_spec_pkg_version"], + compiler_name=annotations["metrics/spack_job_spec_compiler_name"], + compiler_version=annotations["metrics/spack_job_spec_compiler_version"], + arch=annotations["metrics/spack_job_spec_arch"], + package_variants=annotations["metrics/spack_job_spec_variants"], + stack=labels["metrics/spack_ci_stack_name"], + # build jobs isn't always present + build_jobs=annotations.get("metrics/spack_job_build_jobs"), + ) + + +def get_node_metadata(node: dict) -> NodeMetadata: + node_labels = node["metadata"]["labels"] + + return NodeMetadata( + name=node["metadata"]["name"], + uid=node["metadata"]["uid"], + instance_type=node_labels["node.kubernetes.io/instance-type"], + capacity_type=node_labels["karpenter.sh/capacity-type"], + cpu=int(node_labels["karpenter.k8s.aws/instance-cpu"]), + mem=int(node_labels["karpenter.k8s.aws/instance-memory"]), + ) + + +def handle_scheduled_pipeline_pod(wrapped_event: dict, start_time: datetime): + if wrapped_event["type"] != "ADDED": + return + + # Check that it's a current event + event: dict = wrapped_event["object"].to_dict() + created: datetime = event["metadata"]["creation_timestamp"] + if created < start_time: + logger.debug(f"Skipping event from {created.isoformat()}") + return + + # Retrieve pod. Pod retrieval could fail for various reasons, so handle this gracefully + pod_name = event["involved_object"]["name"] + try: + pod: V1Pod = client.read_namespaced_pod(namespace="pipeline", name=pod_name) # type: ignore + except kubernetes.client.ApiException: + logger.warning(f'Could not retrieve pod {pod_name} in namespace "pipeline"') + return + + # Ensure that this pod has a non-empty package name label before continuing + labels: dict = pod.to_dict()["metadata"]["labels"] + if labels.get("metrics/spack_job_spec_pkg_name", "") == "": + logger.debug(f"Skipping pod with missing package name: {pod_name}") + return + + # Retrieve node + node_name = pod.to_dict()["spec"]["node_name"] + node = client.read_node(name=node_name).to_dict() # type: ignore + item = JobMetadata( + node=get_node_metadata(node), + pod=get_pod_metadata(pod), + ) + + # Check to make sure job hasn't already been recorded + if Job.objects.filter( + project_id=item.pod.project_id, job_id=item.pod.job_id + ).exists(): + return + + # Tags, duration intentionally left blank, as they will be updated once the job finishes + try: + job = Job.objects.create( + # Core data + job_id=item.pod.job_id, + project_id=item.pod.project_id, + name=item.pod.job_name, + started_at=item.pod.job_started_at, + duration=None, + ref=item.pod.job_ref, + package_name=item.pod.package_name, + job_cpu_request=item.pod.cpu_request, + job_memory_request=item.pod.memory_request, + # Node data + node_name=item.node.name, + node_uid=item.node.uid, + node_instance_type=item.node.instance_type, + node_capacity_type=item.node.capacity_type, + node_cpu=item.node.cpu, + node_mem=item.node.mem, + # Extra data + package_version=item.pod.package_version, + compiler_name=item.pod.compiler_name, + compiler_version=item.pod.compiler_version, + arch=item.pod.arch, + package_variants=item.pod.package_variants, + build_jobs=item.pod.build_jobs, + job_size=item.pod.job_size, + stack=item.pod.stack, + # By defninition this is true, since this script runs in the cluster + aws=True, + ) + except IntegrityError as e: + logger.error( + f"Could not create Job with job_id {item.pod.job_id}. This may indicate a race " + "condition. The error is logged below." + ) + logger.error(e) + return + + logger.info(f"Processed job {job.job_id}") + + +@click.command() +def main(): + start_time = datetime.now(timezone.utc) + + # Setup event stream + watcher = kubernetes.watch.Watch() + events = watcher.stream( + client.list_namespaced_event, + namespace="pipeline", + field_selector="reason=Scheduled,involvedObject.kind=Pod", + ) + + logger.info("Listening for scheduled pipeline pods...") + logger.info(f"Start time is {start_time.isoformat()}") + logger.info("----------------------------------------") + + # Get events yielded from generator + for event in events: + assert isinstance(event, dict) + handle_scheduled_pipeline_pod(event, start_time) + + +if __name__ == "__main__": + main() diff --git a/analytics/analytics/management/commands/update_spot_prices.py b/analytics/analytics/management/commands/update_spot_prices.py new file mode 100644 index 000000000..e69de29bb diff --git a/analytics/analytics/management/commands/upload_build_timings.py b/analytics/analytics/management/commands/upload_build_timings.py index 94d151202..19f2c977a 100644 --- a/analytics/analytics/management/commands/upload_build_timings.py +++ b/analytics/analytics/management/commands/upload_build_timings.py @@ -3,6 +3,7 @@ import tempfile import zipfile from contextlib import contextmanager +from dataclasses import dataclass import djclick as click import gitlab @@ -20,6 +21,25 @@ JOB_INPUT_DATA = os.environ["JOB_INPUT_DATA"] +@dataclass +class JobMetadata: + package_name: str + package_version: str + compiler_name: str + compiler_version: str + arch: str + package_variants: str + job_size: str + stack: str + build_jobs: str | None = None + + +class UnprocessedAwsJob(Exception): + def __init__(self, job: ProjectJob): + message = f"AWS Job {job.get_id()} was not previously processed" + super().__init__(message) + + class JobArtifactFileNotFound(Exception): def __init__(self, job: ProjectJob, filename: str): message = f"File {filename} not found in job artifacts of job {job.id}" @@ -42,7 +62,7 @@ def get_job_artifacts_file(job: ProjectJob, filename: str): raise JobArtifactFileNotFound(job, filename) -def get_job_metadata(job: ProjectJob) -> dict: +def get_job_metadata(job: ProjectJob) -> JobMetadata: # parse the yaml from artifacts/jobs_scratch_dir/reproduction/cloud-ci-pipeline.yml pipeline_yml_filename = "jobs_scratch_dir/reproduction/cloud-ci-pipeline.yml" with get_job_artifacts_file(job, pipeline_yml_filename) as pipeline_file: @@ -54,25 +74,28 @@ def get_job_metadata(job: ProjectJob) -> dict: if not job_vars: raise Exception(f"Empty job variables for job {job.id}") - return { - "package_name": job_vars["SPACK_JOB_SPEC_PKG_NAME"], - "package_version": job_vars["SPACK_JOB_SPEC_PKG_VERSION"], - "compiler_name": job_vars["SPACK_JOB_SPEC_COMPILER_NAME"], - "compiler_version": job_vars["SPACK_JOB_SPEC_COMPILER_VERSION"], - "arch": job_vars["SPACK_JOB_SPEC_ARCH"], - "package_variants": job_vars["SPACK_JOB_SPEC_VARIANTS"], - "job_size": job_vars["CI_JOB_SIZE"], - "stack": pipeline_vars["SPACK_CI_STACK_NAME"], + return JobMetadata( + package_name=job_vars["SPACK_JOB_SPEC_PKG_NAME"], + package_version=job_vars["SPACK_JOB_SPEC_PKG_VERSION"], + compiler_name=job_vars["SPACK_JOB_SPEC_COMPILER_NAME"], + compiler_version=job_vars["SPACK_JOB_SPEC_COMPILER_VERSION"], + arch=job_vars["SPACK_JOB_SPEC_ARCH"], + package_variants=job_vars["SPACK_JOB_SPEC_VARIANTS"], + job_size=job_vars["CI_JOB_SIZE"], + stack=pipeline_vars["SPACK_CI_STACK_NAME"], # This var isn't guaranteed to be present - "build_jobs": job_vars.get("SPACK_BUILD_JOBS"), - } + build_jobs=job_vars.get("SPACK_BUILD_JOBS"), + ) -def create_job(project: Project, job: ProjectJob) -> Job: - # grab runner tags +def create_non_aws_job(project: Project, job: ProjectJob) -> Job: + # Raise exception if this is an AWS job, as it should have been processed already runner_tags = gl.runners.get(job.runner["id"]).tag_list + if "aws" in runner_tags: + raise UnprocessedAwsJob(job) # Return created job + job_metadata = get_job_metadata(job) return Job.objects.create( job_id=job.get_id(), project_id=project.get_id(), @@ -81,8 +104,17 @@ def create_job(project: Project, job: ProjectJob) -> Job: duration=job.duration, ref=job.ref, tags=job.tag_list, - aws=("aws" in runner_tags), - **get_job_metadata(job), + package_name=job_metadata.package_name, + aws=True, + # Extra fields + package_version=job_metadata.package_version, + compiler_name=job_metadata.compiler_name, + compiler_version=job_metadata.compiler_version, + arch=job_metadata.arch, + package_variants=job_metadata.package_variants, + build_jobs=job_metadata.build_jobs, + job_size=job_metadata.job_size, + stack=job_metadata.stack, ) @@ -96,16 +128,22 @@ def get_timings_json(job: ProjectJob) -> list[dict]: def main(): # Read input data and extract params job_input_data = json.loads(JOB_INPUT_DATA) + project_id = job_input_data["project_id"] job_id = job_input_data["build_id"] # Retrieve project and job from gitlab API - gl_project = gl.projects.get(job_input_data["project_id"]) - gl_job = gl_project.jobs.get(job_input_data["build_id"]) - - # Get or create job record - job = Job.objects.filter(job_id=job_id).first() - if job is None: - job = create_job(gl_project, gl_job) + gl_project = gl.projects.get(project_id) + gl_job = gl_project.jobs.get(job_id) + + # Get and update existing job, or create new job + try: + job = Job.objects.get(project_id=project_id, job_id=job_id) + Job.objects.filter(project_id=job.project_id, job_id=job.job_id).update( + tags=gl_job.tag_list, + duration=gl_job.duration, + ) + except Job.DoesNotExist: + job = create_non_aws_job(gl_project, gl_job) # Get timings timings = get_timings_json(gl_job) diff --git a/analytics/analytics/migrations/0002_timerphase_rename_and_more.py b/analytics/analytics/migrations/0002_timerphase_rename_and_more.py index ceb7679fc..c1f6bbcd8 100644 --- a/analytics/analytics/migrations/0002_timerphase_rename_and_more.py +++ b/analytics/analytics/migrations/0002_timerphase_rename_and_more.py @@ -42,8 +42,6 @@ class Migration(migrations.Migration): ), migrations.AddConstraint( model_name="timerphase", - constraint=models.UniqueConstraint( - fields=("path", "timer"), name="unique-phase-path" - ), + constraint=models.UniqueConstraint(fields=("path", "timer"), name="unique-phase-path"), ), ] diff --git a/analytics/analytics/migrations/0003_job_arch_job_build_jobs_job_compiler_name_and_more.py b/analytics/analytics/migrations/0003_job_arch_job_build_jobs_job_compiler_name_and_more.py index e24cce6a8..e013f7514 100644 --- a/analytics/analytics/migrations/0003_job_arch_job_build_jobs_job_compiler_name_and_more.py +++ b/analytics/analytics/migrations/0003_job_arch_job_build_jobs_job_compiler_name_and_more.py @@ -4,7 +4,6 @@ class Migration(migrations.Migration): - dependencies = [ ("analytics", "0002_timerphase_rename_and_more"), ] diff --git a/analytics/analytics/migrations/0005_job_job_cpu_request_job_job_memory_request_and_more.py b/analytics/analytics/migrations/0005_job_job_cpu_request_job_job_memory_request_and_more.py new file mode 100644 index 000000000..59cd3bb30 --- /dev/null +++ b/analytics/analytics/migrations/0005_job_job_cpu_request_job_job_memory_request_and_more.py @@ -0,0 +1,108 @@ +# Generated by Django 4.2.4 on 2023-12-08 19:28 + +import django.contrib.postgres.fields +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("analytics", "0004_errortaxonomy"), + ] + + operations = [ + migrations.AddField( + model_name="job", + name="job_cpu_request", + field=models.FloatField(default=None, null=True), + ), + migrations.AddField( + model_name="job", + name="job_memory_request", + field=models.PositiveBigIntegerField(default=None, null=True), + ), + migrations.AddField( + model_name="job", + name="node_capacity_type", + field=models.CharField( + choices=[("spot", "Spot"), ("on-demand", "On Demand")], + default=None, + max_length=12, + null=True, + ), + ), + migrations.AddField( + model_name="job", + name="node_cpu", + field=models.PositiveIntegerField(default=None, null=True), + ), + migrations.AddField( + model_name="job", + name="node_instance_type", + field=models.CharField(default=None, max_length=32, null=True), + ), + migrations.AddField( + model_name="job", + name="node_instance_type_spot_price", + field=models.FloatField( + default=None, + help_text="The price per hour (in USD) of the spot instnce this job ran on," + " at the time of running. If ever the job runs on an on-demand node," + " this field will be null.", + null=True, + ), + ), + migrations.AddField( + model_name="job", + name="node_mem", + field=models.PositiveIntegerField(default=None, null=True), + ), + migrations.AddField( + model_name="job", + name="node_name", + field=models.CharField(default=None, max_length=64, null=True), + ), + migrations.AddField( + model_name="job", + name="node_uid", + field=models.UUIDField(default=None, null=True), + ), + migrations.AlterField( + model_name="job", + name="duration", + field=models.FloatField(null=True), + ), + migrations.AlterField( + model_name="job", + name="tags", + field=django.contrib.postgres.fields.ArrayField( + base_field=models.CharField(max_length=32), + default=None, + null=True, + size=None, + ), + ), + migrations.AddConstraint( + model_name="job", + constraint=models.UniqueConstraint( + fields=("project_id", "job_id"), name="unique-project-job-id" + ), + ), + migrations.AddConstraint( + model_name="job", + constraint=models.CheckConstraint( + check=models.Q(("package_name", ""), _negated=True), + name="non-empty-package-name", + ), + ), + migrations.AddConstraint( + model_name="job", + constraint=models.CheckConstraint( + check=models.Q( + models.Q(("duration__isnull", True), ("tags__isnull", True)), + models.Q(("duration__isnull", False), ("tags__isnull", False)), + _connector="OR", + ), + name="consistent-temporary-null-values", + ), + ), + ] diff --git a/analytics/analytics/models.py b/analytics/analytics/models.py index b46f6dff2..c67fbe54b 100644 --- a/analytics/analytics/models.py +++ b/analytics/analytics/models.py @@ -2,18 +2,48 @@ from django.db import models +class NodeCapacityType(models.TextChoices): + SPOT = "spot" + ON_DEMAND = "on-demand" + + class Job(models.Model): # Core job fields job_id = models.PositiveBigIntegerField(primary_key=True) project_id = models.PositiveBigIntegerField() name = models.CharField(max_length=128) started_at = models.DateTimeField() - duration = models.FloatField() + duration = models.FloatField(null=True) ref = models.CharField(max_length=256) - tags = ArrayField(base_field=models.CharField(max_length=32), default=list) + tags = ArrayField( + base_field=models.CharField(max_length=32), null=True, default=None + ) package_name = models.CharField(max_length=128) - # Fields allow null to accomodate historical data + # Whether this job ran in the cluster or not + aws = models.BooleanField(default=True) + + # Kubernetes specific data (will be null for non-aws jobs) + job_cpu_request = models.FloatField(null=True, default=None) + job_memory_request = models.PositiveBigIntegerField(null=True, default=None) + node_name = models.CharField(max_length=64, null=True, default=None) + node_uid = models.UUIDField(null=True, default=None) + node_cpu = models.PositiveIntegerField(null=True, default=None) + node_mem = models.PositiveIntegerField(null=True, default=None) + node_capacity_type = models.CharField( + max_length=12, choices=NodeCapacityType.choices, null=True, default=None + ) + node_instance_type = models.CharField(max_length=32, null=True, default=None) + node_instance_type_spot_price = models.FloatField( + null=True, + default=None, + help_text=( + "The price per hour (in USD) of the spot instnce this job ran on, at the time of" + " running. If ever the job runs on an on-demand node, this field will be null." + ), + ) + + # Extra data fields (null allowed to accomodate historical data) package_version = models.CharField(max_length=128, null=True) compiler_name = models.CharField(max_length=128, null=True) compiler_version = models.CharField(max_length=128, null=True) @@ -22,7 +52,23 @@ class Job(models.Model): build_jobs = models.CharField(max_length=128, null=True) job_size = models.CharField(max_length=128, null=True) stack = models.CharField(max_length=128, null=True) - aws = models.BooleanField(default=True) + + class Meta: + constraints = [ + models.UniqueConstraint( + name="unique-project-job-id", fields=["project_id", "job_id"] + ), + models.CheckConstraint( + name="non-empty-package-name", check=~models.Q(package_name="") + ), + models.CheckConstraint( + name="consistent-temporary-null-values", + check=( + models.Q(tags__isnull=True, duration__isnull=True) + | models.Q(tags__isnull=False, duration__isnull=False) + ), + ), + ] class Timer(models.Model): diff --git a/analytics/analytics/settings.py b/analytics/analytics/settings.py index 2266770c3..77da81a33 100644 --- a/analytics/analytics/settings.py +++ b/analytics/analytics/settings.py @@ -22,6 +22,36 @@ "django_extensions", ] +LOGGING = { + "version": 1, + # Replace existing logging configuration + "incremental": False, + "formatters": {"rich": {"datefmt": "[%X]"}}, + "handlers": { + "console": { + "class": "rich.logging.RichHandler", + "formatter": "rich", + }, + }, + "loggers": { + # Configure the root logger to output to the console + "": {"level": "INFO", "handlers": ["console"], "propagate": False}, + # Django defines special configurations for the "django" and "django.server" loggers, + # but we will manage all content at the root logger instead, so reset those + # configurations. + "django": { + "handlers": [], + "level": "NOTSET", + "propagate": True, + }, + "django.server": { + "handlers": [], + "level": "NOTSET", + "propagate": True, + }, + }, +} + # Database # https://docs.djangoproject.com/en/4.2/ref/settings/#databases DATABASES = { diff --git a/analytics/requirements.txt b/analytics/requirements.txt index 96c0d4eaa..ddf8dde8d 100644 --- a/analytics/requirements.txt +++ b/analytics/requirements.txt @@ -8,4 +8,4 @@ opensearch-dsl==2.0.1 sentry-sdk[django] gunicorn celery[redis] - +rich