Skip to content

Commit

Permalink
Merge pull request #942 from spack/store-failed-jobs
Browse files Browse the repository at this point in the history
Store failed and non-build jobs in the analytics database
  • Loading branch information
jjnesbitt authored Aug 21, 2024
2 parents e721012 + 3c243e3 commit 22c7c9a
Show file tree
Hide file tree
Showing 15 changed files with 512 additions and 214 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/custom_docker_builds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
- docker-image: ./images/cache-indexer
image-tags: ghcr.io/spack/cache-indexer:0.0.3
- docker-image: ./analytics
image-tags: ghcr.io/spack/django:0.3.14
image-tags: ghcr.io/spack/django:0.3.15
- docker-image: ./images/ci-prune-buildcache
image-tags: ghcr.io/spack/ci-prune-buildcache:0.0.4
- docker-image: ./images/protected-publish
Expand Down
60 changes: 1 addition & 59 deletions analytics/analytics/core/job_log_uploader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,14 @@

import gitlab
from celery import shared_task
from dateutil.parser import isoparse
from django.conf import settings
from gitlab.v4.objects import Project, ProjectJob
from opensearch_dsl import Date, Document, connections
from opensearchpy import ConnectionTimeout
from requests.exceptions import ReadTimeout
from urllib3.exceptions import ReadTimeoutError

from analytics import setup_gitlab_job_sentry_tags
from analytics.core.job_failure_classifier import (
_assign_error_taxonomy,
_job_retry_data,
)
from analytics.core.models.legacy import LegacyJobAttempt
from analytics.core.job_failure_classifier import _job_retry_data


class JobLog(Document):
Expand All @@ -38,56 +32,6 @@ def save(self, **kwargs):
return super().save(**kwargs)


def _get_section_timers(job_trace: str) -> dict[str, int]:
timers: dict[str, int] = {}

# See https://docs.gitlab.com/ee/ci/jobs/index.html#custom-collapsible-sections for the format
# of section names.
r = re.findall(r"section_(start|end):(\d+):([A-Za-z0-9_\-\.]+)", job_trace)
for start, end in zip(r[::2], r[1::2]):
timers[start[2]] = int(end[1]) - int(start[1])

return timers


def _create_job_attempt(
project: Project,
gl_job: ProjectJob,
webhook_payload: dict[str, Any],
job_trace: str,
) -> LegacyJobAttempt:
retry_info = _job_retry_data(
job_id=gl_job.get_id(),
job_name=gl_job.name,
job_commit_id=webhook_payload["commit"]["id"],
job_failure_reason=webhook_payload["build_failure_reason"],
)

section_timers = _get_section_timers(job_trace)
error_taxonomy = (
_assign_error_taxonomy(webhook_payload, job_trace)[0]
if webhook_payload["build_status"] == "failed"
else None
)

return LegacyJobAttempt.objects.create(
job_id=gl_job.get_id(),
project_id=project.get_id(),
commit_id=webhook_payload["commit"]["id"],
name=gl_job.name,
started_at=isoparse(gl_job.started_at),
finished_at=isoparse(gl_job.finished_at),
ref=gl_job.ref,
is_retry=retry_info.is_retry,
is_manual_retry=retry_info.is_manual_retry,
attempt_number=retry_info.attempt_number,
final_attempt=retry_info.final_attempt,
status=webhook_payload["build_status"],
error_taxonomy=error_taxonomy,
section_timers=section_timers,
)


@shared_task(
name="store_job_data",
soft_time_limit=60,
Expand Down Expand Up @@ -141,5 +85,3 @@ def store_job_data(job_input_data_json: str) -> None:
job_trace=job_trace,
)
doc.save()

_create_job_attempt(project, job, job_input_data, job_trace)
139 changes: 139 additions & 0 deletions analytics/analytics/core/management/commands/migrate_job_attempt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import json
import re
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import timedelta

import djclick as click
from django.db import connection
from gitlab.v4.objects import ProjectJob
from tqdm import tqdm

from analytics.core.models.dimensions import (
JobDataDimension,
NodeDimension,
PackageDimension,
PackageSpecDimension,
)
from analytics.core.models.facts import JobFact
from analytics.core.models.legacy import LegacyJobAttempt
from analytics.job_processor import process_job
from analytics.job_processor.dimensions import (
create_date_time_dimensions,
create_job_data_dimension,
create_runner_dimension,
)
from analytics.job_processor.utils import (
get_gitlab_handle,
get_gitlab_job,
get_gitlab_project,
)

# Setup gitlab connection
gl = get_gitlab_handle()
gl_project = get_gitlab_project(2)

BUILD_NAME_REGEX = re.compile(r"^[^@]+@\S+ \/[a-z0-9]{7} %")
ALT_BUILD_NAME_REGEX = re.compile(r"^\(specs\) [^/]+\/[a-z0-9]{7}")


def augment_job_data(job_attempt: LegacyJobAttempt):
job_id = job_attempt.job_id
job_data = JobDataDimension.objects.get(job_id=job_id)

job_data.error_taxonomy = job_attempt.error_taxonomy
job_data.gitlab_section_timers = job_attempt.section_timers

# Copy retry data
job_data.is_retry = job_attempt.is_retry
job_data.is_manual_retry = job_attempt.is_manual_retry
job_data.attempt_number = job_attempt.attempt_number
job_data.final_attempt = job_attempt.final_attempt

job_data.save()


def create_basic_job_fact(gljob: ProjectJob, job_input_data: dict):
date_dim, time_dim = create_date_time_dimensions(gljob=gljob)
runner_dim = create_runner_dimension(gl=gl, gljob=gljob)
job_dim = create_job_data_dimension(
job_input_data=job_input_data,
pod_info=None,
misc_info=None,
gljob=gljob,
job_trace="",
)

JobFact.objects.create(
# Foreign Keys
start_date=date_dim,
start_time=time_dim,
node=NodeDimension.get_empty_row(),
runner=runner_dim,
package=PackageDimension.get_empty_row(),
spec=PackageSpecDimension.get_empty_row(),
job=job_dim,
# Numeric
duration=timedelta(seconds=gljob.duration),
duration_seconds=gljob.duration,
)


def migrate_job_attempt(job_id: int):
job_attempt = LegacyJobAttempt.objects.get(job_id=job_id)
gl_job = get_gitlab_job(gl_project, job_attempt.job_id)

# It seems that even if a job has a status of "success", it always at
# least has a "build_failure_reason" of "unknown_failure"
failure_reason = getattr(gl_job, "job_failure_reason", "unknown_failure")

# Determine whether this is a build job or not from the name, and mock the stage field to match that
is_build = (
BUILD_NAME_REGEX.match(job_attempt.name) is not None
or ALT_BUILD_NAME_REGEX.match(job_attempt.name) is not None
)
build_stage = "stage-1" if is_build else ""

# Reconstruct the job_input_data dict, to pass to create_job_fact
job_input_data = {
"project_id": 2,
"build_id": job_attempt.job_id,
"build_name": job_attempt.name,
"commit": {"id": job_attempt.commit_id},
"build_failure_reason": failure_reason,
"build_status": job_attempt.status,
"build_stage": build_stage,
"ref": job_attempt.ref,
}

try:
process_job(json.dumps(job_input_data))
except Exception:
# Default to this if errored
create_basic_job_fact(gljob=gl_job, job_input_data=job_input_data)

# Augment remaining data from existing job_attempt record
augment_job_data(job_attempt=job_attempt)


@click.command()
def migrate_all_job_attempts():
# Get all job attempts that don't already have a record in the fact table
# Use raw SQL query as otherwise an inefficient subquery is required
cursor = connection.cursor()
cursor.execute("""
SELECT lja.job_id
FROM core_legacyjobattempt lja
LEFT JOIN core_jobfact on core_jobfact.job_id = lja.job_id
WHERE core_jobfact.id IS NULL
""")
job_ids = cursor.fetchall()

with tqdm(total=len(job_ids)) as pbar:
with ThreadPoolExecutor(max_workers=10) as e:
futures = [e.submit(migrate_job_attempt, job_id) for (job_id,) in job_ids]
for future in as_completed(futures):
pbar.update(1)
pbar.set_description()
if future.exception() is not None:
traceback.print_exception(future.exception())
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Generated by Django 4.2.13 on 2024-08-08 16:23

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("core", "0019_alter_runnerdimension_name"),
]

operations = [
migrations.RemoveField(
model_name="jobdatadimension",
name="is_build",
),
migrations.AddField(
model_name="jobdatadimension",
name="gitlab_section_timers",
field=models.JSONField(
db_comment="The GitLab CI section timers for this job.", default=dict
),
),
migrations.AddField(
model_name="jobdatadimension",
name="job_type",
field=models.CharField(
choices=[
("build", "Build"),
("generate", "Generate"),
("no-specs-to-rebuild", "No Specs to Rebuild"),
("rebuild-index", "Rebuild Index"),
("copy", "Copy"),
("unsupported-copy", "Unsupported Copy"),
("sign-pkgs", "Sign Packages"),
("protected-publish", "Protected Publish"),
],
default="build",
max_length=19,
),
preserve_default=False,
),
]
33 changes: 32 additions & 1 deletion analytics/analytics/core/models/dimensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ def ensure_exists(cls, d: datetime.datetime | datetime.time | str):


class JobDataDimension(models.Model):
class JobType(models.TextChoices):
BUILD = "build", "Build"
GENERATE = "generate", "Generate"
NO_SPECS = "no-specs-to-rebuild", "No Specs to Rebuild"
REBUILD_INDEX = "rebuild-index", "Rebuild Index"
COPY = "copy", "Copy"
UNSUPPORTED_COPY = "unsupported-copy", "Unsupported Copy"
SIGN_PKGS = "sign-pkgs", "Sign Packages"
PROTECTED_PUBLISH = "protected-publish", "Protected Publish"

class Meta:
constraints = [
models.CheckConstraint(
Expand Down Expand Up @@ -165,7 +175,12 @@ class Meta:

pod_name = models.CharField(max_length=128, null=True, blank=True)
gitlab_runner_version = models.CharField(max_length=16)
is_build = models.BooleanField()
job_type = models.CharField(
max_length=max(len(c) for c, _ in JobType.choices), choices=JobType.choices
)
gitlab_section_timers = models.JSONField(
default=dict, db_comment="The GitLab CI section timers for this job."
) # type: ignore


class NodeCapacityType(models.TextChoices):
Expand All @@ -188,6 +203,10 @@ class Meta:
),
]

@classmethod
def get_empty_row(cls):
return cls.objects.get(name="")


class RunnerDimension(models.Model):
runner_id = models.PositiveIntegerField(primary_key=True)
Expand All @@ -198,6 +217,10 @@ class RunnerDimension(models.Model):
tags = ArrayField(base_field=models.CharField(max_length=32), default=list)
in_cluster = models.BooleanField()

@classmethod
def get_empty_row(cls):
return cls.objects.get(name="")


# TODO: Split up variants into it's own dimension
# Query to get variants (without patches) from packages
Expand All @@ -224,6 +247,10 @@ class PackageSpecDimension(models.Model):
arch = models.CharField(max_length=64)
variants = models.TextField(default="", blank=True)

@classmethod
def get_empty_row(cls):
return cls.objects.get(hash="")

class Meta:
constraints = [
models.CheckConstraint(
Expand Down Expand Up @@ -258,6 +285,10 @@ class PackageDimension(models.Model):

name = models.CharField(max_length=128, primary_key=True)

@classmethod
def get_empty_row(cls):
return cls.objects.get(name="")


class TimerPhaseDimension(models.Model):
path = models.CharField(max_length=64, unique=True)
Expand Down
18 changes: 7 additions & 11 deletions analytics/analytics/core/views.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import json
import re
from typing import Any

import sentry_sdk
from django.http import HttpRequest, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
import sentry_sdk

from analytics.core.job_failure_classifier import upload_job_failure_classification
from analytics.core.job_log_uploader import store_job_data
from analytics.job_processor import process_job

BUILD_STAGE_REGEX = r"^stage-\d+$"


@require_http_methods(["POST"])
@csrf_exempt
Expand All @@ -23,16 +20,15 @@ def webhook_handler(request: HttpRequest) -> HttpResponse:
sentry_sdk.capture_message("Not a build event")
return HttpResponse("Not a build event", status=400)

if job_input_data["build_status"] in ["success", "failed"]:
store_job_data.delay(request.body)
if job_input_data["build_status"] not in ["success", "failed"]:
return HttpResponse("Build job not finished. Skipping.", status=200)

# Store gitlab job log and failure data in opensearch
store_job_data.delay(request.body)
if job_input_data["build_status"] == "failed":
upload_job_failure_classification.delay(request.body)

if (
re.match(BUILD_STAGE_REGEX, job_input_data["build_stage"])
and job_input_data["build_status"] == "success"
):
process_job.delay(request.body)
# Store job data in postgres DB
process_job.delay(request.body)

return HttpResponse("OK", status=200)
Loading

0 comments on commit 22c7c9a

Please sign in to comment.