Skip to content

Commit

Permalink
Refactor job creation code (#3465)
Browse files Browse the repository at this point in the history
This PR refactors the job creation code. Jobs, archive items and display
sets now all use the same creation and validation logic.

Errors from asynchronous input validation now get caught and are
reported back to the user as error message on the Job object. The job is
then cancelled and not executed.

This PR also enables selecting existing images and files for jobs
through the UI and enables re-using CIVs for existing values if
possible. The same holds partially for the API view as well. It is not
yet possible to create a job through the API with an existing non-image
file. This was out of scope for this PR
(DIAGNijmegen/rse-roadmap#335 (comment))

Part of DIAGNijmegen/rse-roadmap#335 

Closes #3139
Closes #3325
Closes #3565
Partially addresses
#3368

---------

Co-authored-by: James Meakin <[email protected]>
  • Loading branch information
amickan and jmsmkn authored Oct 7, 2024
1 parent ce7092c commit 2e92eeb
Show file tree
Hide file tree
Showing 49 changed files with 3,679 additions and 1,349 deletions.
68 changes: 65 additions & 3 deletions app/grandchallenge/algorithms/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@
AlgorithmSerializer,
)
from grandchallenge.algorithms.tasks import import_remote_algorithm_image
from grandchallenge.components.form_fields import InterfaceFormField
from grandchallenge.components.form_fields import (
INTERFACE_FORM_FIELD_PREFIX,
InterfaceFormField,
)
from grandchallenge.components.forms import ContainerImageForm
from grandchallenge.components.models import (
CIVData,
ComponentInterface,
ComponentJob,
ImportStatusChoices,
Expand Down Expand Up @@ -93,16 +97,27 @@ class JobCreateForm(SaveFormInitMixin, Form):
algorithm_model = ModelChoiceField(
queryset=None, disabled=True, required=False, widget=HiddenInput
)
creator = ModelChoiceField(
queryset=None, disabled=True, required=False, widget=HiddenInput
)
time_limit = IntegerField(
disabled=True, required=False, widget=HiddenInput
)

def __init__(self, *args, algorithm, user, **kwargs):
super().__init__(*args, **kwargs)

self.helper = FormHelper()

self._user = user
self.fields["creator"].queryset = get_user_model().objects.filter(
pk=self._user.pk
)
self.fields["creator"].initial = self._user

self._algorithm = algorithm
self._algorithm_image = self._algorithm.active_image
self.fields["time_limit"].initial = self._algorithm.time_limit

active_model = self._algorithm.active_model

Expand All @@ -119,12 +134,27 @@ def __init__(self, *args, algorithm, user, **kwargs):
self.fields["algorithm_model"].initial = active_model

for inp in self._algorithm.inputs.all():
self.fields[inp.slug] = InterfaceFormField(
prefixed_interface_slug = (
f"{INTERFACE_FORM_FIELD_PREFIX}{inp.slug}"
)

if prefixed_interface_slug in self.data:
if inp.kind == ComponentInterface.Kind.ANY:
# interfaces for which the data can be a list need
# to be retrieved with getlist() from the QueryDict
initial = self.data.getlist(prefixed_interface_slug)
else:
initial = self.data[prefixed_interface_slug]
else:
initial = None

self.fields[prefixed_interface_slug] = InterfaceFormField(
instance=inp,
initial=inp.default_value,
initial=initial if initial else inp.default_value,
user=self._user,
required=inp.value_required,
help_text=clean(inp.description) if inp.description else "",
form_data=self.data,
).field

@cached_property
Expand All @@ -143,6 +173,38 @@ def clean(self):
if self.jobs_limit < 1:
raise ValidationError("You have run out of algorithm credits")

cleaned_data = self.reformat_inputs(cleaned_data=cleaned_data)

if Job.objects.get_jobs_with_same_inputs(
inputs=cleaned_data["inputs"],
algorithm_image=cleaned_data["algorithm_image"],
algorithm_model=cleaned_data["algorithm_model"],
):
raise ValidationError(
"A result for these inputs with the current image "
"and model already exists."
)

return cleaned_data

def reformat_inputs(self, *, cleaned_data):
keys_to_remove = []
inputs = []
for k, v in cleaned_data.items():
if k.startswith(INTERFACE_FORM_FIELD_PREFIX):
keys_to_remove.append(k)
inputs.append(
CIVData(
interface_slug=k[len(INTERFACE_FORM_FIELD_PREFIX) :],
value=v,
)
)

for key in keys_to_remove:
cleaned_data.pop(key)

cleaned_data["inputs"] = inputs

return cleaned_data


Expand Down
37 changes: 37 additions & 0 deletions app/grandchallenge/algorithms/migrations/0055_alter_job_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Generated by Django 4.2.16 on 2024-09-20 08:40

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("algorithms", "0054_alter_algorithmimage_options_and_more"),
]

operations = [
migrations.AlterField(
model_name="job",
name="status",
field=models.PositiveSmallIntegerField(
choices=[
(0, "Queued"),
(1, "Started"),
(2, "Re-Queued"),
(3, "Failed"),
(4, "Succeeded"),
(5, "Cancelled"),
(6, "Provisioning"),
(7, "Provisioned"),
(8, "Executing"),
(9, "Executed"),
(10, "Parsing Outputs"),
(11, "Executing Algorithm"),
(12, "External Execution In Progress"),
(13, "Validating inputs"),
],
db_index=True,
default=0,
),
),
]
139 changes: 123 additions & 16 deletions app/grandchallenge/algorithms/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
from grandchallenge.algorithms.tasks import update_algorithm_average_duration
from grandchallenge.anatomy.models import BodyStructure
from grandchallenge.charts.specs import stacked_bar
from grandchallenge.components.models import (
from grandchallenge.components.models import ( # noqa: F401
CIVForObjectMixin,
ComponentImage,
ComponentInterface,
ComponentInterfaceValue,
ComponentJob,
ComponentJobManager,
ImportStatusChoices,
Expand Down Expand Up @@ -626,6 +628,78 @@ def create(

return obj

@staticmethod
def retrieve_existing_civs(*, civ_data):
"""
Checks if there are existing CIVs for the provided data and returns those.
Parameters
----------
civ_data
A list of CIVData objects.
Returns
-------
A list of ComponentInterfaceValues
"""
existing_civs = []
for civ in civ_data:
if (
civ.user_upload
or civ.upload_session
or civ.user_upload_queryset
):
# uploads will create new CIVs, so ignore these
continue
elif civ.image:
try:
civ = ComponentInterfaceValue.objects.filter(
interface__slug=civ.interface_slug, image=civ.image
).get()
existing_civs.append(civ)
except ObjectDoesNotExist:
continue
elif civ.file_civ:
existing_civs.append(civ.file_civ)
else:
# values can be of different types, including None and False
try:
civ = ComponentInterfaceValue.objects.filter(
interface__slug=civ.interface_slug, value=civ.value
).get()
existing_civs.append(civ)
except ObjectDoesNotExist:
continue

return existing_civs

def get_jobs_with_same_inputs(
self, *, inputs, algorithm_image, algorithm_model
):
existing_civs = self.retrieve_existing_civs(civ_data=inputs)
unique_kwargs = {
"algorithm_image": algorithm_image,
}
input_interface_count = algorithm_image.algorithm.inputs.count()

if algorithm_model:
unique_kwargs["algorithm_model"] = algorithm_model
else:
unique_kwargs["algorithm_model__isnull"] = True

existing_jobs = (
Job.objects.filter(**unique_kwargs)
.annotate(
inputs_match_count=Count(
"inputs", filter=Q(inputs__in=existing_civs)
)
)
.filter(inputs_match_count=input_interface_count)
)

return existing_jobs

def spent_credits(self, user):
now = timezone.now()
period = timedelta(days=30)
Expand Down Expand Up @@ -714,7 +788,7 @@ class AlgorithmModelGroupObjectPermission(GroupObjectPermissionBase):
)


class Job(UUIDModel, ComponentJob):
class Job(UUIDModel, CIVForObjectMixin, ComponentJob):
objects = JobManager.as_manager()

algorithm_image = models.ForeignKey(
Expand Down Expand Up @@ -772,6 +846,13 @@ def container(self):
def output_interfaces(self):
return self.algorithm_image.algorithm.outputs

@cached_property
def inputs_complete(self):
# check if all inputs are present and if they all have a value
return {
civ.interface for civ in self.inputs.all() if civ.has_value
} == {*self.algorithm_image.algorithm.inputs.all()}

@cached_property
def rendered_result_text(self) -> str:
try:
Expand Down Expand Up @@ -889,25 +970,51 @@ def add_viewer(self, user):
def remove_viewer(self, user):
return user.groups.remove(self.viewers)

def sort_inputs_and_execute(
self, upload_session_pks=None, user_upload_pks=None
):
# Local import to avoid circular dependency
def add_civ(self, *, civ):
super().add_civ(civ=civ)
return self.inputs.add(civ)

def remove_civ(self, *, civ):
super().remove_civ(civ=civ)
return self.inputs.remove(civ)

def get_civ_for_interface(self, interface):
return self.inputs.get(interface=interface)

def validate_inputs_and_execute(self, *, inputs):
from grandchallenge.algorithms.tasks import (
run_algorithm_job_for_inputs,
execute_algorithm_job_for_inputs,
)

on_commit(
run_algorithm_job_for_inputs.signature(
kwargs={
"job_pk": self.pk,
"upload_session_pks": upload_session_pks,
"user_upload_pks": user_upload_pks,
},
immutable=True,
).apply_async
linked_task = execute_algorithm_job_for_inputs.signature(
kwargs={"job_pk": self.pk}, immutable=True
)

if not self.is_editable:
raise RuntimeError(
"Job is not editable. No CIVs can be added or removed from it."
)
else:
for civ_data in inputs:
self.create_civ(
civ_data=civ_data,
user=self.creator,
linked_task=linked_task,
)

@property
def is_editable(self):
# staying with display set and archive item terminology here
# since this property is checked in create_civ()
if self.status == self.VALIDATING_INPUTS:
return True
else:
return False

@property
def base_object(self):
return self.algorithm_image.algorithm

@property
def executor_kwargs(self):
executor_kwargs = super().executor_kwargs
Expand Down
Loading

0 comments on commit 2e92eeb

Please sign in to comment.