Skip to content
This repository has been archived by the owner on May 15, 2020. It is now read-only.

Commit

Permalink
Merge pull request #52 from kpn/feature/commit-order
Browse files Browse the repository at this point in the history
Feature/commit order
  • Loading branch information
mjholtkamp authored Nov 13, 2019
2 parents d8d26b6 + f3b79e0 commit 880dbee
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 8 deletions.
4 changes: 4 additions & 0 deletions katka/constants.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
# Status is not in the model to allow usage in different models
PIPELINE_STATUS_INITIALIZING = 'initializing'
PIPELINE_STATUS_QUEUED = 'queued'
PIPELINE_STATUS_IN_PROGRESS = 'in progress'
PIPELINE_STATUS_FAILED = 'failed'
PIPELINE_STATUS_SUCCESS = 'success'
PIPELINE_STATUS_SKIPPED = 'skipped'

PIPELINE_STATUS_CHOICES = (
(PIPELINE_STATUS_INITIALIZING, PIPELINE_STATUS_INITIALIZING),
(PIPELINE_STATUS_QUEUED, PIPELINE_STATUS_QUEUED),
(PIPELINE_STATUS_IN_PROGRESS, PIPELINE_STATUS_IN_PROGRESS),
(PIPELINE_STATUS_FAILED, PIPELINE_STATUS_FAILED),
(PIPELINE_STATUS_SUCCESS, PIPELINE_STATUS_SUCCESS),
(PIPELINE_STATUS_SKIPPED, PIPELINE_STATUS_SKIPPED),
)

PIPELINE_FINAL_STATUSES = (
Expand Down
23 changes: 23 additions & 0 deletions katka/migrations/0027_commit_ordering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 2.2.6 on 2019-11-13 13:38

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('katka', '0026_scmsteprun_step_type'),
]

operations = [
migrations.AddField(
model_name='scmpipelinerun',
name='first_parent_hash',
field=models.CharField(blank=True, help_text='Commit hash of first parent commit, to determine order of commits. First commit has none.', max_length=64, null=True),
),
migrations.AlterField(
model_name='scmpipelinerun',
name='status',
field=models.CharField(choices=[('initializing', 'initializing'), ('queued', 'queued'), ('in progress', 'in progress'), ('failed', 'failed'), ('success', 'success'), ('skipped', 'skipped')], default='initializing', max_length=30),
),
]
6 changes: 6 additions & 0 deletions katka/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ class Meta:

public_identifier = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
commit_hash = models.CharField(max_length=64) # A SHA-1 hash is 40 characters, SHA-256 is 64 characters
first_parent_hash = models.CharField(
max_length=64,
help_text='Commit hash of first parent commit, to determine order of commits. First commit has none.',
null=True,
blank=True,
)
status = models.CharField(max_length=30, choices=PIPELINE_STATUS_CHOICES, default=PIPELINE_STATUS_INITIALIZING)
steps_total = models.PositiveSmallIntegerField(default=0)
steps_completed = models.PositiveSmallIntegerField(default=0)
Expand Down
21 changes: 15 additions & 6 deletions katka/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from django.db.models.signals import post_save
from django.dispatch import receiver

from katka.constants import PIPELINE_STATUS_INITIALIZING, STEP_FINAL_STATUSES
from katka.constants import PIPELINE_STATUS_INITIALIZING, PIPELINE_STATUS_QUEUED, STEP_FINAL_STATUSES
from katka.fields import username_on_model
from katka.models import SCMPipelineRun, SCMStepRun
from katka.releases import close_release_if_pipeline_finished, create_release_if_necessary
Expand Down Expand Up @@ -35,11 +35,6 @@ def update_pipeline_from_steps(sender, **kwargs):
@receiver(post_save, sender=SCMPipelineRun)
def send_pipeline_change_notification(sender, **kwargs):
pipeline = kwargs['instance']
if kwargs['created'] is True:
create_release_if_necessary(pipeline)
else:
close_release_if_pipeline_finished(pipeline)

if pipeline.status == PIPELINE_STATUS_INITIALIZING and kwargs['created'] is False:
# Do not send notifications when the pipeline is initializing. While initializing, steps are created and
# since this is done with several requests, several notifications would be sent, while the only one you
Expand All @@ -48,6 +43,11 @@ def send_pipeline_change_notification(sender, **kwargs):
# the notification will trigger the creation of the steps.
return

if pipeline.status == PIPELINE_STATUS_QUEUED:
# When a pipeline is queued it means that other pipelines should be run first. To prevent them from
# being run, do not notify.
return

session = settings.PIPELINE_CHANGE_NOTIFICATION_SESSION
response = session.post(
settings.PIPELINE_CHANGE_NOTIFICATION_URL, json={'public_identifier': str(pipeline.public_identifier)}
Expand All @@ -57,3 +57,12 @@ def send_pipeline_change_notification(sender, **kwargs):
response.raise_for_status()
except HTTPError:
log.exception("Failed to notify pipeline runner")


@receiver(post_save, sender=SCMPipelineRun)
def create_close_releases(sender, **kwargs):
pipeline = kwargs['instance']
if kwargs['created'] is True:
create_release_if_necessary(pipeline)
else:
close_release_if_pipeline_finished(pipeline)
59 changes: 58 additions & 1 deletion katka/views.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import logging
from datetime import datetime

from django.conf import settings

import pytz
from katka.constants import STEP_FINAL_STATUSES
from katka.constants import (
PIPELINE_FINAL_STATUSES, PIPELINE_STATUS_IN_PROGRESS, PIPELINE_STATUS_INITIALIZING, PIPELINE_STATUS_QUEUED,
STEP_FINAL_STATUSES,
)
from katka.models import (
Application, ApplicationMetadata, Credential, CredentialSecret, Project, SCMPipelineRun, SCMRelease, SCMRepository,
SCMService, SCMStepRun, Team,
Expand All @@ -16,6 +20,8 @@
from katka.viewsets import AuditViewSet, FilterViewMixin, ReadOnlyAuditViewMixin, UpdateAuditMixin
from rest_framework.permissions import IsAuthenticated

log = logging.getLogger(__name__)


class TeamViewSet(FilterViewMixin, AuditViewSet):
model = Team
Expand Down Expand Up @@ -95,6 +101,57 @@ class SCMPipelineRunViewSet(FilterViewMixin, AuditViewSet):
'release': 'scmrelease',
}

def perform_update(self, serializer):
status = serializer.validated_data.get('status', None)
if status == PIPELINE_STATUS_IN_PROGRESS:
if not self._ready_to_run(serializer.instance.application, serializer.instance.first_parent_hash):
serializer.validated_data['status'] = PIPELINE_STATUS_QUEUED

super().perform_update(serializer)

if status in PIPELINE_FINAL_STATUSES:
self._run_next_pipeline_if_available(serializer.instance.application, serializer.instance.commit_hash)

def _ready_to_run(self, application, parent_hash):
if parent_hash is None:
return True # this is probably the first commit

try:
parent_instance = self.model.objects.get(application=application, commit_hash=parent_hash)
except self.model.DoesNotExist:
self._trigger_sync()
return False # parent commit does not exist, we need a sync before we can continue

if parent_instance.status not in PIPELINE_FINAL_STATUSES:
return False # exists, but not ready to run this one yet

return True

def _run_next_pipeline_if_available(self, application, commit_hash):
try:
next_pipeline = self.model.objects.get(application=application, first_parent_hash=commit_hash)
except self.model.DoesNotExist:
return # does not exist (yet), so nothing to do

if next_pipeline.status != PIPELINE_STATUS_QUEUED:
if next_pipeline.status != PIPELINE_STATUS_INITIALIZING:
log.warning(
f'Next pipeline {next_pipeline.pk} is not queued, it has status "{next_pipeline.status}", '
'not updating'
)
return

# No need to wrap inside a "with username_on_model():" context manager since at this point we already
# are in a context. Worried about infinite recursion because we keep setting statuses for the next
# pipelines? Since this method is only called when a pipeline moves from the "in progress" state to
# a final state and the next pipeline is set to the "in progress" state, it should not trigger recursion.
next_pipeline.status = PIPELINE_STATUS_IN_PROGRESS
next_pipeline.save()

def _trigger_sync(self):
"""We are missing commits, sync them so we can get the complete string of commits"""
log.warning("Need to sync commits because at least one is missing, but this is not implemented yet")

def get_queryset(self):
user_groups = self.request.user.groups.all()
return super().get_queryset().filter(application__project__team__group__in=user_groups)
Expand Down
23 changes: 22 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,25 @@ def scm_pipeline_run(application):
return scm_pipeline_run


@pytest.fixture
def next_scm_pipeline_run(application, scm_pipeline_run):
pipeline_yaml = '''stages:
- release
do-release:
stage: release
'''
scm_pipeline_run = models.SCMPipelineRun(application=application,
pipeline_yaml=pipeline_yaml,
steps_total=5,
commit_hash='DD14567A143AEC5156FD1444A017A3213654EF1',
first_parent_hash=scm_pipeline_run.commit_hash)
with username_on_model(models.SCMPipelineRun, 'initial'):
scm_pipeline_run.save()

return scm_pipeline_run


@pytest.fixture
def another_scm_pipeline_run(another_application):
pipeline_yaml = '''stages:
Expand Down Expand Up @@ -347,7 +366,9 @@ def another_another_scm_pipeline_run(another_application):
scm_pipeline_run = models.SCMPipelineRun(application=another_application,
pipeline_yaml=pipeline_yaml,
steps_total=5,
commit_hash='9234567A143AEC5156FD1444A017A3213654329')
commit_hash='9234567A143AEC5156FD1444A017A3213654329',
# first_parent_hash does not link to existing hash
first_parent_hash='40000000000000000000000000000000000000F')
with username_on_model(models.SCMPipelineRun, 'initial'):
scm_pipeline_run.save()

Expand Down
111 changes: 111 additions & 0 deletions tests/integration/test_scmpipelinerun_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import pytest
from katka import models
from katka.constants import PIPELINE_STATUS_FAILED
from katka.fields import username_on_model


@pytest.mark.django_db
Expand Down Expand Up @@ -183,3 +185,112 @@ def test_create(self, client, logged_in_user, application, scm_pipeline_run):
assert response.status_code == 201
assert models.SCMPipelineRun.objects.filter(commit_hash='4015B57A143AEC5156FD1444A017A32137A3FD0F').exists()
assert models.SCMPipelineRun.objects.count() == initial_count + 1


@pytest.mark.django_db
class TestSCMPipelineRunQueueOrRun:
"""
To make sure we process pipelines in order, we put pipeline runs in progress only when previous pipeline runs
have been completed (or skipped). These tests should verify that that functionality works correctly.
"""

def test_first_commit(self, client, logged_in_user, application, scm_pipeline_run):
"""There is no first parent commit hash, so we can change the status to in progress"""

url = f'/scm-pipeline-runs/{scm_pipeline_run.public_identifier}/'
data = {'status': 'in progress'}
response = client.patch(url, data, content_type='application/json')
assert response.status_code == 200
p = models.SCMPipelineRun.objects.get(pk=scm_pipeline_run.public_identifier)
assert p.status == 'in progress'

def test_linked_to_non_final(self, client, logged_in_user, application, scm_pipeline_run, next_scm_pipeline_run):
"""First parent is linked, but its status is not a final state"""

url = f'/scm-pipeline-runs/{next_scm_pipeline_run.public_identifier}/'
data = {'status': 'in progress'}
response = client.patch(url, data, content_type='application/json')
assert response.status_code == 200
p = models.SCMPipelineRun.objects.get(pk=next_scm_pipeline_run.public_identifier)
assert p.status == 'queued'

def test_not_linked_at_all(self, client, logged_in_user, application, scm_pipeline_run,
another_another_scm_pipeline_run):
"""
The first parent points to a commit that is not present, so a sync is necessary.
This pipeline run should be queued
"""

url = f'/scm-pipeline-runs/{another_another_scm_pipeline_run.public_identifier}/'
data = {'status': 'in progress'}
response = client.patch(url, data, content_type='application/json')
assert response.status_code == 200
p = models.SCMPipelineRun.objects.get(pk=another_another_scm_pipeline_run.public_identifier)
assert p.status == 'queued'

def test_linked_to_final_state(self, client, logged_in_user, application, scm_pipeline_run, next_scm_pipeline_run):
"""First parent is linked, and its status is in a final state"""

scm_pipeline_run.status = PIPELINE_STATUS_FAILED
with username_on_model(models.SCMPipelineRun, 'test'):
scm_pipeline_run.save()

url = f'/scm-pipeline-runs/{next_scm_pipeline_run.public_identifier}/'
data = {'status': 'in progress'}
response = client.patch(url, data, content_type='application/json')
assert response.status_code == 200
p = models.SCMPipelineRun.objects.get(pk=next_scm_pipeline_run.public_identifier)
assert p.status == 'in progress'


@pytest.mark.django_db
class TestSCMPipelineRunRunNextPipeline:
"""
To make sure we process pipelines in order, the next pipeline should be run if it's queued.
"""

def test_still_initializing(self, client, logged_in_user, application, scm_pipeline_run, next_scm_pipeline_run,
caplog):
url = f'/scm-pipeline-runs/{scm_pipeline_run.public_identifier}/'
data = {'status': 'success'}
assert next_scm_pipeline_run.status == 'initializing'
response = client.patch(url, data, content_type='application/json')
assert response.status_code == 200
p = models.SCMPipelineRun.objects.get(pk=next_scm_pipeline_run.public_identifier)
assert p.status == 'initializing'
assert caplog.messages == []

def test_non_queued(self, client, logged_in_user, application, scm_pipeline_run, next_scm_pipeline_run, caplog):
url = f'/scm-pipeline-runs/{scm_pipeline_run.public_identifier}/'
data = {'status': 'success'}
with username_on_model(models.SCMPipelineRun, 'test'):
next_scm_pipeline_run.status = 'in progress'
next_scm_pipeline_run.save()

response = client.patch(url, data, content_type='application/json')
assert response.status_code == 200
p = models.SCMPipelineRun.objects.get(pk=next_scm_pipeline_run.public_identifier)
assert p.status == 'in progress'
assert caplog.messages[0] == (f'Next pipeline {next_scm_pipeline_run.pk} is not queued, '
'it has status "in progress", not updating')

def test_queued(self, client, logged_in_user, application, scm_pipeline_run, next_scm_pipeline_run, caplog):
url = f'/scm-pipeline-runs/{scm_pipeline_run.public_identifier}/'
data = {'status': 'success'}
with username_on_model(models.SCMPipelineRun, 'test'):
next_scm_pipeline_run.status = 'queued'
next_scm_pipeline_run.save()

response = client.patch(url, data, content_type='application/json')
assert response.status_code == 200
p = models.SCMPipelineRun.objects.get(pk=next_scm_pipeline_run.public_identifier)
assert p.status == 'in progress'
assert caplog.messages == []

def test_final_commit(self, client, logged_in_user, application, scm_pipeline_run, caplog):
url = f'/scm-pipeline-runs/{scm_pipeline_run.public_identifier}/'
data = {'status': 'success'}

response = client.patch(url, data, content_type='application/json')
assert response.status_code == 200
assert caplog.messages == []

0 comments on commit 880dbee

Please sign in to comment.