diff --git a/katka/constants.py b/katka/constants.py index d9d6dd1..b122033 100644 --- a/katka/constants.py +++ b/katka/constants.py @@ -1,14 +1,18 @@ # Status is not in the model to allow usage in different models PIPELINE_STATUS_INITIALIZING = 'initializing' +PIPELINE_STATUS_QUEUED = 'queued' PIPELINE_STATUS_IN_PROGRESS = 'in progress' PIPELINE_STATUS_FAILED = 'failed' PIPELINE_STATUS_SUCCESS = 'success' +PIPELINE_STATUS_SKIPPED = 'skipped' PIPELINE_STATUS_CHOICES = ( (PIPELINE_STATUS_INITIALIZING, PIPELINE_STATUS_INITIALIZING), + (PIPELINE_STATUS_QUEUED, PIPELINE_STATUS_QUEUED), (PIPELINE_STATUS_IN_PROGRESS, PIPELINE_STATUS_IN_PROGRESS), (PIPELINE_STATUS_FAILED, PIPELINE_STATUS_FAILED), (PIPELINE_STATUS_SUCCESS, PIPELINE_STATUS_SUCCESS), + (PIPELINE_STATUS_SKIPPED, PIPELINE_STATUS_SKIPPED), ) PIPELINE_FINAL_STATUSES = ( diff --git a/katka/migrations/0027_commit_ordering.py b/katka/migrations/0027_commit_ordering.py new file mode 100644 index 0000000..39346b6 --- /dev/null +++ b/katka/migrations/0027_commit_ordering.py @@ -0,0 +1,23 @@ +# Generated by Django 2.2.6 on 2019-11-13 13:38 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('katka', '0026_scmsteprun_step_type'), + ] + + operations = [ + migrations.AddField( + model_name='scmpipelinerun', + name='first_parent_hash', + field=models.CharField(blank=True, help_text='Commit hash of first parent commit, to determine order of commits. First commit has none.', max_length=64, null=True), + ), + migrations.AlterField( + model_name='scmpipelinerun', + name='status', + field=models.CharField(choices=[('initializing', 'initializing'), ('queued', 'queued'), ('in progress', 'in progress'), ('failed', 'failed'), ('success', 'success'), ('skipped', 'skipped')], default='initializing', max_length=30), + ), + ] diff --git a/katka/models.py b/katka/models.py index caa802f..223ec1a 100644 --- a/katka/models.py +++ b/katka/models.py @@ -107,6 +107,12 @@ class Meta: public_identifier = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) commit_hash = models.CharField(max_length=64) # A SHA-1 hash is 40 characters, SHA-256 is 64 characters + first_parent_hash = models.CharField( + max_length=64, + help_text='Commit hash of first parent commit, to determine order of commits. First commit has none.', + null=True, + blank=True, + ) status = models.CharField(max_length=30, choices=PIPELINE_STATUS_CHOICES, default=PIPELINE_STATUS_INITIALIZING) steps_total = models.PositiveSmallIntegerField(default=0) steps_completed = models.PositiveSmallIntegerField(default=0) diff --git a/katka/signals.py b/katka/signals.py index 940e808..975f4d4 100644 --- a/katka/signals.py +++ b/katka/signals.py @@ -4,7 +4,7 @@ from django.db.models.signals import post_save from django.dispatch import receiver -from katka.constants import PIPELINE_STATUS_INITIALIZING, STEP_FINAL_STATUSES +from katka.constants import PIPELINE_STATUS_INITIALIZING, PIPELINE_STATUS_QUEUED, STEP_FINAL_STATUSES from katka.fields import username_on_model from katka.models import SCMPipelineRun, SCMStepRun from katka.releases import close_release_if_pipeline_finished, create_release_if_necessary @@ -35,11 +35,6 @@ def update_pipeline_from_steps(sender, **kwargs): @receiver(post_save, sender=SCMPipelineRun) def send_pipeline_change_notification(sender, **kwargs): pipeline = kwargs['instance'] - if kwargs['created'] is True: - create_release_if_necessary(pipeline) - else: - close_release_if_pipeline_finished(pipeline) - if pipeline.status == PIPELINE_STATUS_INITIALIZING and kwargs['created'] is False: # Do not send notifications when the pipeline is initializing. While initializing, steps are created and # since this is done with several requests, several notifications would be sent, while the only one you @@ -48,6 +43,11 @@ def send_pipeline_change_notification(sender, **kwargs): # the notification will trigger the creation of the steps. return + if pipeline.status == PIPELINE_STATUS_QUEUED: + # When a pipeline is queued it means that other pipelines should be run first. To prevent them from + # being run, do not notify. + return + session = settings.PIPELINE_CHANGE_NOTIFICATION_SESSION response = session.post( settings.PIPELINE_CHANGE_NOTIFICATION_URL, json={'public_identifier': str(pipeline.public_identifier)} @@ -57,3 +57,12 @@ def send_pipeline_change_notification(sender, **kwargs): response.raise_for_status() except HTTPError: log.exception("Failed to notify pipeline runner") + + +@receiver(post_save, sender=SCMPipelineRun) +def create_close_releases(sender, **kwargs): + pipeline = kwargs['instance'] + if kwargs['created'] is True: + create_release_if_necessary(pipeline) + else: + close_release_if_pipeline_finished(pipeline) diff --git a/katka/views.py b/katka/views.py index e4eb8a6..243d777 100644 --- a/katka/views.py +++ b/katka/views.py @@ -1,9 +1,13 @@ +import logging from datetime import datetime from django.conf import settings import pytz -from katka.constants import STEP_FINAL_STATUSES +from katka.constants import ( + PIPELINE_FINAL_STATUSES, PIPELINE_STATUS_IN_PROGRESS, PIPELINE_STATUS_INITIALIZING, PIPELINE_STATUS_QUEUED, + STEP_FINAL_STATUSES, +) from katka.models import ( Application, ApplicationMetadata, Credential, CredentialSecret, Project, SCMPipelineRun, SCMRelease, SCMRepository, SCMService, SCMStepRun, Team, @@ -16,6 +20,8 @@ from katka.viewsets import AuditViewSet, FilterViewMixin, ReadOnlyAuditViewMixin, UpdateAuditMixin from rest_framework.permissions import IsAuthenticated +log = logging.getLogger(__name__) + class TeamViewSet(FilterViewMixin, AuditViewSet): model = Team @@ -95,6 +101,57 @@ class SCMPipelineRunViewSet(FilterViewMixin, AuditViewSet): 'release': 'scmrelease', } + def perform_update(self, serializer): + status = serializer.validated_data.get('status', None) + if status == PIPELINE_STATUS_IN_PROGRESS: + if not self._ready_to_run(serializer.instance.application, serializer.instance.first_parent_hash): + serializer.validated_data['status'] = PIPELINE_STATUS_QUEUED + + super().perform_update(serializer) + + if status in PIPELINE_FINAL_STATUSES: + self._run_next_pipeline_if_available(serializer.instance.application, serializer.instance.commit_hash) + + def _ready_to_run(self, application, parent_hash): + if parent_hash is None: + return True # this is probably the first commit + + try: + parent_instance = self.model.objects.get(application=application, commit_hash=parent_hash) + except self.model.DoesNotExist: + self._trigger_sync() + return False # parent commit does not exist, we need a sync before we can continue + + if parent_instance.status not in PIPELINE_FINAL_STATUSES: + return False # exists, but not ready to run this one yet + + return True + + def _run_next_pipeline_if_available(self, application, commit_hash): + try: + next_pipeline = self.model.objects.get(application=application, first_parent_hash=commit_hash) + except self.model.DoesNotExist: + return # does not exist (yet), so nothing to do + + if next_pipeline.status != PIPELINE_STATUS_QUEUED: + if next_pipeline.status != PIPELINE_STATUS_INITIALIZING: + log.warning( + f'Next pipeline {next_pipeline.pk} is not queued, it has status "{next_pipeline.status}", ' + 'not updating' + ) + return + + # No need to wrap inside a "with username_on_model():" context manager since at this point we already + # are in a context. Worried about infinite recursion because we keep setting statuses for the next + # pipelines? Since this method is only called when a pipeline moves from the "in progress" state to + # a final state and the next pipeline is set to the "in progress" state, it should not trigger recursion. + next_pipeline.status = PIPELINE_STATUS_IN_PROGRESS + next_pipeline.save() + + def _trigger_sync(self): + """We are missing commits, sync them so we can get the complete string of commits""" + log.warning("Need to sync commits because at least one is missing, but this is not implemented yet") + def get_queryset(self): user_groups = self.request.user.groups.all() return super().get_queryset().filter(application__project__team__group__in=user_groups) diff --git a/tests/conftest.py b/tests/conftest.py index c86b63f..aa97b7a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -318,6 +318,25 @@ def scm_pipeline_run(application): return scm_pipeline_run +@pytest.fixture +def next_scm_pipeline_run(application, scm_pipeline_run): + pipeline_yaml = '''stages: + - release + +do-release: + stage: release +''' + scm_pipeline_run = models.SCMPipelineRun(application=application, + pipeline_yaml=pipeline_yaml, + steps_total=5, + commit_hash='DD14567A143AEC5156FD1444A017A3213654EF1', + first_parent_hash=scm_pipeline_run.commit_hash) + with username_on_model(models.SCMPipelineRun, 'initial'): + scm_pipeline_run.save() + + return scm_pipeline_run + + @pytest.fixture def another_scm_pipeline_run(another_application): pipeline_yaml = '''stages: @@ -347,7 +366,9 @@ def another_another_scm_pipeline_run(another_application): scm_pipeline_run = models.SCMPipelineRun(application=another_application, pipeline_yaml=pipeline_yaml, steps_total=5, - commit_hash='9234567A143AEC5156FD1444A017A3213654329') + commit_hash='9234567A143AEC5156FD1444A017A3213654329', + # first_parent_hash does not link to existing hash + first_parent_hash='40000000000000000000000000000000000000F') with username_on_model(models.SCMPipelineRun, 'initial'): scm_pipeline_run.save() diff --git a/tests/integration/test_scmpipelinerun_view.py b/tests/integration/test_scmpipelinerun_view.py index 1746950..b8bc10f 100644 --- a/tests/integration/test_scmpipelinerun_view.py +++ b/tests/integration/test_scmpipelinerun_view.py @@ -2,6 +2,8 @@ import pytest from katka import models +from katka.constants import PIPELINE_STATUS_FAILED +from katka.fields import username_on_model @pytest.mark.django_db @@ -183,3 +185,112 @@ def test_create(self, client, logged_in_user, application, scm_pipeline_run): assert response.status_code == 201 assert models.SCMPipelineRun.objects.filter(commit_hash='4015B57A143AEC5156FD1444A017A32137A3FD0F').exists() assert models.SCMPipelineRun.objects.count() == initial_count + 1 + + +@pytest.mark.django_db +class TestSCMPipelineRunQueueOrRun: + """ + To make sure we process pipelines in order, we put pipeline runs in progress only when previous pipeline runs + have been completed (or skipped). These tests should verify that that functionality works correctly. + """ + + def test_first_commit(self, client, logged_in_user, application, scm_pipeline_run): + """There is no first parent commit hash, so we can change the status to in progress""" + + url = f'/scm-pipeline-runs/{scm_pipeline_run.public_identifier}/' + data = {'status': 'in progress'} + response = client.patch(url, data, content_type='application/json') + assert response.status_code == 200 + p = models.SCMPipelineRun.objects.get(pk=scm_pipeline_run.public_identifier) + assert p.status == 'in progress' + + def test_linked_to_non_final(self, client, logged_in_user, application, scm_pipeline_run, next_scm_pipeline_run): + """First parent is linked, but its status is not a final state""" + + url = f'/scm-pipeline-runs/{next_scm_pipeline_run.public_identifier}/' + data = {'status': 'in progress'} + response = client.patch(url, data, content_type='application/json') + assert response.status_code == 200 + p = models.SCMPipelineRun.objects.get(pk=next_scm_pipeline_run.public_identifier) + assert p.status == 'queued' + + def test_not_linked_at_all(self, client, logged_in_user, application, scm_pipeline_run, + another_another_scm_pipeline_run): + """ + The first parent points to a commit that is not present, so a sync is necessary. + This pipeline run should be queued + """ + + url = f'/scm-pipeline-runs/{another_another_scm_pipeline_run.public_identifier}/' + data = {'status': 'in progress'} + response = client.patch(url, data, content_type='application/json') + assert response.status_code == 200 + p = models.SCMPipelineRun.objects.get(pk=another_another_scm_pipeline_run.public_identifier) + assert p.status == 'queued' + + def test_linked_to_final_state(self, client, logged_in_user, application, scm_pipeline_run, next_scm_pipeline_run): + """First parent is linked, and its status is in a final state""" + + scm_pipeline_run.status = PIPELINE_STATUS_FAILED + with username_on_model(models.SCMPipelineRun, 'test'): + scm_pipeline_run.save() + + url = f'/scm-pipeline-runs/{next_scm_pipeline_run.public_identifier}/' + data = {'status': 'in progress'} + response = client.patch(url, data, content_type='application/json') + assert response.status_code == 200 + p = models.SCMPipelineRun.objects.get(pk=next_scm_pipeline_run.public_identifier) + assert p.status == 'in progress' + + +@pytest.mark.django_db +class TestSCMPipelineRunRunNextPipeline: + """ + To make sure we process pipelines in order, the next pipeline should be run if it's queued. + """ + + def test_still_initializing(self, client, logged_in_user, application, scm_pipeline_run, next_scm_pipeline_run, + caplog): + url = f'/scm-pipeline-runs/{scm_pipeline_run.public_identifier}/' + data = {'status': 'success'} + assert next_scm_pipeline_run.status == 'initializing' + response = client.patch(url, data, content_type='application/json') + assert response.status_code == 200 + p = models.SCMPipelineRun.objects.get(pk=next_scm_pipeline_run.public_identifier) + assert p.status == 'initializing' + assert caplog.messages == [] + + def test_non_queued(self, client, logged_in_user, application, scm_pipeline_run, next_scm_pipeline_run, caplog): + url = f'/scm-pipeline-runs/{scm_pipeline_run.public_identifier}/' + data = {'status': 'success'} + with username_on_model(models.SCMPipelineRun, 'test'): + next_scm_pipeline_run.status = 'in progress' + next_scm_pipeline_run.save() + + response = client.patch(url, data, content_type='application/json') + assert response.status_code == 200 + p = models.SCMPipelineRun.objects.get(pk=next_scm_pipeline_run.public_identifier) + assert p.status == 'in progress' + assert caplog.messages[0] == (f'Next pipeline {next_scm_pipeline_run.pk} is not queued, ' + 'it has status "in progress", not updating') + + def test_queued(self, client, logged_in_user, application, scm_pipeline_run, next_scm_pipeline_run, caplog): + url = f'/scm-pipeline-runs/{scm_pipeline_run.public_identifier}/' + data = {'status': 'success'} + with username_on_model(models.SCMPipelineRun, 'test'): + next_scm_pipeline_run.status = 'queued' + next_scm_pipeline_run.save() + + response = client.patch(url, data, content_type='application/json') + assert response.status_code == 200 + p = models.SCMPipelineRun.objects.get(pk=next_scm_pipeline_run.public_identifier) + assert p.status == 'in progress' + assert caplog.messages == [] + + def test_final_commit(self, client, logged_in_user, application, scm_pipeline_run, caplog): + url = f'/scm-pipeline-runs/{scm_pipeline_run.public_identifier}/' + data = {'status': 'success'} + + response = client.patch(url, data, content_type='application/json') + assert response.status_code == 200 + assert caplog.messages == []