Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use prometheus to annotate pod/node data onto Job model #723

Merged
merged 19 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/custom_docker_builds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
- docker-image: ./images/cache-indexer
image-tags: ghcr.io/spack/cache-indexer:0.0.3
- docker-image: ./analytics
image-tags: ghcr.io/spack/django:0.0.6
image-tags: ghcr.io/spack/django:0.0.7
steps:
- name: Checkout
uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3
Expand Down
155 changes: 0 additions & 155 deletions analytics/analytics/build_timing_processor.py

This file was deleted.

67 changes: 67 additions & 0 deletions analytics/analytics/job_processor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import json
from datetime import timedelta

import gitlab
import sentry_sdk
from celery import shared_task
from dateutil.parser import isoparse
from django.conf import settings
from django.db import transaction
from gitlab.v4.objects import Project, ProjectJob

from analytics import setup_gitlab_job_sentry_tags
from analytics.job_processor.artifacts import annotate_job_with_artifacts_data
from analytics.job_processor.build_timings import create_build_timings
from analytics.job_processor.prometheus import (
JobPrometheusDataNotFound,
PrometheusClient,
)
from analytics.models import Job


def create_job(gl: gitlab.Gitlab, project: Project, gljob: ProjectJob) -> Job:
# Create base fields on job that are independent of where it ran
job = Job(
job_id=gljob.get_id(),
project_id=project.get_id(),
name=gljob.name,
started_at=isoparse(gljob.started_at),
duration=timedelta(seconds=gljob.duration),
ref=gljob.ref,
tags=gljob.tag_list,
aws=True, # Default until proven otherwise
)

# Prometheus data will either be found and the job annotated, or not, and set aws to False
try:
PrometheusClient(settings.PROMETHEUS_URL).annotate_job(job=job)

# Ensure node creation isn't caught in a race condition
job.save_or_set_node()
job.pod.save()
except JobPrometheusDataNotFound:
job.aws = False
annotate_job_with_artifacts_data(gljob=gljob, job=job)

# Save and return new job
job.save()
return job


@shared_task(name="process_job")
def process_job(job_input_data_json: str):
# Read input data and extract params
job_input_data = json.loads(job_input_data_json)
setup_gitlab_job_sentry_tags(job_input_data)
jjnesbitt marked this conversation as resolved.
Show resolved Hide resolved

# Retrieve project and job from gitlab API
gl = gitlab.Gitlab(
settings.GITLAB_ENDPOINT, settings.GITLAB_TOKEN, retry_transient_errors=True
)
gl_project = gl.projects.get(job_input_data["project_id"])
gl_job = gl_project.jobs.get(job_input_data["build_id"])

# Use a transaction, to account for transient failures
with transaction.atomic():
job = create_job(gl, gl_project, gl_job)
create_build_timings(job, gl_job)
54 changes: 54 additions & 0 deletions analytics/analytics/job_processor/artifacts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import tempfile
import zipfile
from contextlib import contextmanager

import yaml
from gitlab.v4.objects import ProjectJob

from analytics.models import Job


class JobArtifactFileNotFound(Exception):
def __init__(self, job: ProjectJob, filename: str):
message = f"File {filename} not found in job artifacts of job {job.id}"
super().__init__(message)


@contextmanager
def get_job_artifacts_file(job: ProjectJob, filename: str):
"""Yields a file IO, raises KeyError if the filename is not present"""
with tempfile.NamedTemporaryFile(suffix=".zip") as temp:
artifacts_file = temp.name
with open(artifacts_file, "wb") as f:
job.artifacts(streamed=True, action=f.write)

with zipfile.ZipFile(artifacts_file) as zfile:
try:
with zfile.open(filename) as timing_file:
yield timing_file
except KeyError:
raise JobArtifactFileNotFound(job, filename)


def annotate_job_with_artifacts_data(gljob: ProjectJob, job: Job):
"""Fetch the artifacts of a job to retrieve info about it."""
pipeline_yml_filename = "jobs_scratch_dir/reproduction/cloud-ci-pipeline.yml"
with get_job_artifacts_file(gljob, pipeline_yml_filename) as pipeline_file:
raw_pipeline = yaml.safe_load(pipeline_file)

pipeline_vars = raw_pipeline.get("variables", {})
job_vars = raw_pipeline.get(gljob.name, {}).get("variables", {})
if not job_vars:
raise Exception(f"Empty job variables for job {gljob.id}")

job.package_name = job_vars["SPACK_JOB_SPEC_PKG_NAME"]
jjnesbitt marked this conversation as resolved.
Show resolved Hide resolved
job.package_version = job_vars["SPACK_JOB_SPEC_PKG_VERSION"]
job.compiler_name = job_vars["SPACK_JOB_SPEC_COMPILER_NAME"]
job.compiler_version = job_vars["SPACK_JOB_SPEC_COMPILER_VERSION"]
job.arch = job_vars["SPACK_JOB_SPEC_ARCH"]
job.package_variants = job_vars["SPACK_JOB_SPEC_VARIANTS"]
job.job_size = job_vars["CI_JOB_SIZE"]
job.stack = pipeline_vars["SPACK_CI_STACK_NAME"]

# This var isn't guaranteed to be present
job.build_jobs = job_vars.get("SPACK_BUILD_JOBS")
56 changes: 56 additions & 0 deletions analytics/analytics/job_processor/build_timings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json

from gitlab.v4.objects import ProjectJob

from analytics.job_processor.artifacts import get_job_artifacts_file
from analytics.models import Job, Timer, TimerPhase


def get_timings_json(job: ProjectJob) -> list[dict]:
timing_filename = "jobs_scratch_dir/user_data/install_times.json"
with get_job_artifacts_file(job, timing_filename) as file:
return json.load(file)


def create_build_timings(job: Job, gl_job: ProjectJob):
timings = get_timings_json(gl_job)

# Iterate through each timer and create timers and phase results
phases = []
for entry in timings:
# Sometimes name can be missing, skip if so
name = entry.get("name")
if name is None:
continue

# Check for timer and skip if already exists
pkghash = entry.get("hash")
if Timer.objects.filter(job=job, name=name, hash=pkghash).exists():
continue

# Create timer
timer = Timer.objects.create(
job=job,
name=name,
hash=pkghash,
cache=entry["cache"],
time_total=entry["total"],
)

# Add all phases to bulk phase list
phases.extend(
[
TimerPhase(
timer=timer,
name=phase["name"],
path=phase["path"],
seconds=phase["seconds"],
count=phase["count"],
is_subphase=("/" in phase["path"]),
)
for phase in entry["phases"]
]
)

# Bulk create phases
TimerPhase.objects.bulk_create(phases)
Loading
Loading