diff --git a/changelog.d/20241213_192726_mzhiltso_optimize_validation_layout_updates.md b/changelog.d/20241213_192726_mzhiltso_optimize_validation_layout_updates.md new file mode 100644 index 000000000000..413b8b9746e6 --- /dev/null +++ b/changelog.d/20241213_192726_mzhiltso_optimize_validation_layout_updates.md @@ -0,0 +1,8 @@ +### Fixed + +- \[Server API\] Significantly improved preformance of honeypot changes in tasks + () +- \[Server API\] `PATCH tasks/id/validation_layout` responses now include correct + `disabled_frames` and handle simultaneous updates of + `disabled_frames` and honeypot frames correctly + () diff --git a/cvat/apps/dataset_manager/bindings.py b/cvat/apps/dataset_manager/bindings.py index 3b2ccd782a88..8b759f7b6316 100644 --- a/cvat/apps/dataset_manager/bindings.py +++ b/cvat/apps/dataset_manager/bindings.py @@ -50,7 +50,7 @@ class Attribute(NamedTuple): value: Any @classmethod - def add_prefetch_info(cls, queryset: QuerySet): + def add_prefetch_info(cls, queryset: QuerySet[Label]) -> QuerySet[Label]: assert issubclass(queryset.model, Label) return add_prefetch_fields(queryset, [ diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 45f1eaff4e8e..83886d7e9cf1 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -20,7 +20,7 @@ from cvat.apps.engine import models, serializers from cvat.apps.engine.plugins import plugin_decorator from cvat.apps.engine.log import DatasetLogManager -from cvat.apps.engine.utils import chunked_list +from cvat.apps.engine.utils import take_by from cvat.apps.events.handlers import handle_annotations_change from cvat.apps.profiler import silk_profile @@ -84,7 +84,7 @@ def merge_table_rows(rows, keys_for_merge, field_id): class JobAnnotation: @classmethod - def add_prefetch_info(cls, queryset: QuerySet, prefetch_images: bool = True): + def add_prefetch_info(cls, queryset: QuerySet[models.Job], prefetch_images: bool = True) -> QuerySet[models.Job]: assert issubclass(queryset.model, models.Job) label_qs = add_prefetch_fields(models.Label.objects.all(), [ @@ -530,13 +530,13 @@ def _delete(self, data=None): self.ir_data.shapes = data['shapes'] self.ir_data.tracks = data['tracks'] - for labeledimage_ids_chunk in chunked_list(labeledimage_ids, chunk_size=1000): + for labeledimage_ids_chunk in take_by(labeledimage_ids, chunk_size=1000): self._delete_job_labeledimages(labeledimage_ids_chunk) - for labeledshape_ids_chunk in chunked_list(labeledshape_ids, chunk_size=1000): + for labeledshape_ids_chunk in take_by(labeledshape_ids, chunk_size=1000): self._delete_job_labeledshapes(labeledshape_ids_chunk) - for labeledtrack_ids_chunk in chunked_list(labeledtrack_ids, chunk_size=1000): + for labeledtrack_ids_chunk in take_by(labeledtrack_ids, chunk_size=1000): self._delete_job_labeledtracks(labeledtrack_ids_chunk) deleted_data = { diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index e27b697e41c4..f89ca741501e 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -37,13 +37,13 @@ import PIL.Image import PIL.ImageOps import rq -from rq.job import JobStatus as RQJobStatus from django.conf import settings from django.core.cache import caches from django.db import models as django_models from django.utils import timezone as django_tz from redis.exceptions import LockError from rest_framework.exceptions import NotFound, ValidationError +from rq.job import JobStatus as RQJobStatus from cvat.apps.engine import models from cvat.apps.engine.cloud_provider import ( @@ -65,7 +65,12 @@ load_image, ) from cvat.apps.engine.rq_job_handler import RQJobMetaField -from cvat.apps.engine.utils import CvatChunkTimestampMismatchError, get_rq_lock_for_job, md5_hash +from cvat.apps.engine.utils import ( + CvatChunkTimestampMismatchError, + format_list, + get_rq_lock_for_job, + md5_hash, +) from utils.dataset_manifest import ImageManifestManager slogger = ServerLogManager(__name__) @@ -91,7 +96,8 @@ def enqueue_create_chunk_job( # Enqueue the job if the chunk was deleted but the RQ job still exists. # This can happen in cases involving jobs with honeypots and # if the job wasn't collected by the requesting process for any reason. - rq_job.get_status(refresh=False) in {RQJobStatus.FINISHED, RQJobStatus.FAILED, RQJobStatus.CANCELED} + rq_job.get_status(refresh=False) + in {RQJobStatus.FINISHED, RQJobStatus.FAILED, RQJobStatus.CANCELED} ): rq_job = queue.enqueue( create_callback, @@ -275,11 +281,12 @@ def _create_cache_item( return item def _delete_cache_item(self, key: str): - try: - self._cache().delete(key) - slogger.glob.info(f"Removed chunk from the cache: key {key}") - except pickle.UnpicklingError: - slogger.glob.error(f"Failed to remove item from the cache: key {key}", exc_info=True) + self._cache().delete(key) + slogger.glob.info(f"Removed the cache key {key}") + + def _bulk_delete_cache_items(self, keys: Sequence[str]): + self._cache().delete_many(keys) + slogger.glob.info(f"Removed the cache keys {format_list(keys)}") def _get_cache_item(self, key: str) -> Optional[_CacheItem]: try: @@ -350,8 +357,8 @@ def _make_segment_task_chunk_key( ) -> str: return f"{self._make_cache_key_prefix(db_obj)}_task_chunk_{chunk_number}_{quality}" - def _make_context_image_preview_key(self, db_data: models.Data, frame_number: int) -> str: - return f"context_image_{db_data.id}_{frame_number}_preview" + def _make_frame_context_images_chunk_key(self, db_data: models.Data, frame_number: int) -> str: + return f"context_images_{db_data.id}_{frame_number}" @overload def _to_data_with_mime(self, cache_item: _CacheItem) -> DataWithMime: ... @@ -474,6 +481,45 @@ def remove_segment_chunk( self._make_chunk_key(db_segment, chunk_number=chunk_number, quality=quality) ) + def remove_context_images_chunk(self, db_data: models.Data, frame_number: str) -> None: + self._delete_cache_item( + self._make_frame_context_images_chunk_key(db_data, frame_number=frame_number) + ) + + def remove_segments_chunks(self, params: Sequence[dict[str, Any]]) -> None: + """ + Removes several segment chunks from the cache. + + The function expects a sequence of remove_segment_chunk() parameters as dicts. + """ + # TODO: add a version of this function + # that removes related cache elements as well (context images, previews, ...) + # to provide encapsulation + + # TODO: add a generic bulk cleanup function for different objects, including related ones + # (likely a bulk key aggregator should be used inside to reduce requests count) + + keys_to_remove = [] + for item_params in params: + db_obj = item_params.pop("db_segment") + keys_to_remove.append(self._make_chunk_key(db_obj, **item_params)) + + self._bulk_delete_cache_items(keys_to_remove) + + def remove_context_images_chunks(self, params: Sequence[dict[str, Any]]) -> None: + """ + Removes several context image chunks from the cache. + + The function expects a sequence of remove_context_images_chunk() parameters as dicts. + """ + + keys_to_remove = [] + for item_params in params: + db_obj = item_params.pop("db_data") + keys_to_remove.append(self._make_frame_context_images_chunk_key(db_obj, **item_params)) + + self._bulk_delete_cache_items(keys_to_remove) + def get_cloud_preview(self, db_storage: models.CloudStorage) -> Optional[DataWithMime]: return self._to_data_with_mime(self._get_cache_item(self._make_preview_key(db_storage))) @@ -494,7 +540,7 @@ def get_or_set_frame_context_images_chunk( ) -> DataWithMime: return self._to_data_with_mime( self._get_or_set_cache_item( - self._make_context_image_preview_key(db_data, frame_number), + self._make_frame_context_images_chunk_key(db_data, frame_number), Callback( callable=self.prepare_context_images_chunk, args=[db_data, frame_number], diff --git a/cvat/apps/engine/cloud_provider.py b/cvat/apps/engine/cloud_provider.py index 80f907bd2e19..f3fe3e6a28e1 100644 --- a/cvat/apps/engine/cloud_provider.py +++ b/cvat/apps/engine/cloud_provider.py @@ -21,7 +21,6 @@ from botocore.client import Config from botocore.exceptions import ClientError from botocore.handlers import disable_signing -from datumaro.util import take_by # can be changed to itertools.batched after migration to python3.12 from django.conf import settings from google.cloud import storage from google.cloud.exceptions import Forbidden as GoogleCloudForbidden @@ -32,7 +31,7 @@ from cvat.apps.engine.log import ServerLogManager from cvat.apps.engine.models import CloudProviderChoice, CredentialsTypeChoice -from cvat.apps.engine.utils import get_cpu_number +from cvat.apps.engine.utils import get_cpu_number, take_by from cvat.utils.http import PROXIES_FOR_UNTRUSTED_URLS class NamedBytesIO(BytesIO): @@ -242,7 +241,7 @@ def bulk_download_to_memory( threads_number = normalize_threads_number(threads_number, len(files)) with ThreadPoolExecutor(max_workers=threads_number) as executor: - for batch_links in take_by(files, count=threads_number): + for batch_links in take_by(files, chunk_size=threads_number): yield from executor.map(func, batch_links) def bulk_download_to_dir( diff --git a/cvat/apps/engine/frame_provider.py b/cvat/apps/engine/frame_provider.py index 2da1741b5bc7..a004256320aa 100644 --- a/cvat/apps/engine/frame_provider.py +++ b/cvat/apps/engine/frame_provider.py @@ -31,7 +31,6 @@ import av import cv2 import numpy as np -from datumaro.util import take_by from django.conf import settings from PIL import Image from rest_framework.exceptions import ValidationError @@ -46,6 +45,7 @@ ZipReader, ) from cvat.apps.engine.mime_types import mimetypes +from cvat.apps.engine.utils import take_by _T = TypeVar("_T") diff --git a/cvat/apps/engine/models.py b/cvat/apps/engine/models.py index 6212ce3a8bc0..527741497531 100644 --- a/cvat/apps/engine/models.py +++ b/cvat/apps/engine/models.py @@ -12,7 +12,7 @@ import uuid from enum import Enum from functools import cached_property -from typing import Any, ClassVar, Collection, Dict, Optional +from typing import Any, ClassVar, Collection, Dict, Optional, Sequence from django.conf import settings from django.contrib.auth.models import User @@ -27,7 +27,7 @@ from cvat.apps.engine.lazy_list import LazyList from cvat.apps.engine.model_utils import MaybeUndefined -from cvat.apps.engine.utils import chunked_list, parse_specific_attributes +from cvat.apps.engine.utils import parse_specific_attributes, take_by from cvat.apps.events.utils import cache_deleted @@ -276,6 +276,11 @@ class ValidationLayout(models.Model): disabled_frames = IntArrayField(store_sorted=True, unique_values=True) "Stores task frame numbers of the disabled (deleted) validation frames" + @property + def active_frames(self) -> Sequence[int]: + "An ordered sequence of active (non-disabled) validation frames" + return set(self.frames).difference(self.disabled_frames) + class Data(models.Model): MANIFEST_FILENAME: ClassVar[str] = 'manifest.jsonl' @@ -426,7 +431,7 @@ def touch(self) -> None: @transaction.atomic(savepoint=False) def clear_annotations_in_jobs(job_ids): - for job_ids_chunk in chunked_list(job_ids, chunk_size=1000): + for job_ids_chunk in take_by(job_ids, chunk_size=1000): TrackedShapeAttributeVal.objects.filter(shape__track__job_id__in=job_ids_chunk).delete() TrackedShape.objects.filter(track__job_id__in=job_ids_chunk).delete() LabeledTrackAttributeVal.objects.filter(track__job_id__in=job_ids_chunk).delete() @@ -436,6 +441,30 @@ def clear_annotations_in_jobs(job_ids): LabeledImageAttributeVal.objects.filter(image__job_id__in=job_ids_chunk).delete() LabeledImage.objects.filter(job_id__in=job_ids_chunk).delete() +@transaction.atomic(savepoint=False) +def clear_annotations_on_frames_in_honeypot_task(db_task: Task, frames: Sequence[int]): + if db_task.data.validation_mode != ValidationMode.GT_POOL: + # Tracks are prohibited in honeypot tasks + raise AssertionError + + for frames_batch in take_by(frames, chunk_size=1000): + LabeledShapeAttributeVal.objects.filter( + shape__job_id__segment__task_id=db_task.id, + shape__frame__in=frames_batch, + ).delete() + LabeledShape.objects.filter( + job_id__segment__task_id=db_task.id, + frame__in=frames_batch, + ).delete() + LabeledImageAttributeVal.objects.filter( + image__job_id__segment__task_id=db_task.id, + image__frame__in=frames_batch, + ).delete() + LabeledImage.objects.filter( + job_id__segment__task_id=db_task.id, + frame__in=frames_batch, + ).delete() + class Project(TimestampedModel): name = SafeCharField(max_length=256) owner = models.ForeignKey(User, null=True, blank=True, diff --git a/cvat/apps/engine/serializers.py b/cvat/apps/engine/serializers.py index 3d605d6c8a2d..805ffcb15972 100644 --- a/cvat/apps/engine/serializers.py +++ b/cvat/apps/engine/serializers.py @@ -3,42 +3,45 @@ # # SPDX-License-Identifier: MIT +from __future__ import annotations + from contextlib import closing import warnings from copy import copy +from datetime import timedelta +from decimal import Decimal from inspect import isclass import os import re import shutil import string -import django_rq -import rq.defaults as rq_defaults - from tempfile import NamedTemporaryFile import textwrap -from typing import Any, Dict, Iterable, Optional, OrderedDict, Union +from typing import Any, Dict, Iterable, Optional, OrderedDict, Sequence, Union -from rq.job import Job as RQJob, JobStatus as RQJobStatus -from datetime import timedelta -from decimal import Decimal - -from rest_framework import serializers, exceptions +import django_rq from django.conf import settings from django.contrib.auth.models import User, Group from django.db import transaction +from django.db.models import prefetch_related_objects, Prefetch from django.utils import timezone from numpy import random +from rest_framework import serializers, exceptions +import rq.defaults as rq_defaults +from rq.job import Job as RQJob, JobStatus as RQJobStatus from cvat.apps.dataset_manager.formats.utils import get_label_color -from cvat.apps.engine.frame_provider import TaskFrameProvider, FrameQuality -from cvat.apps.engine.utils import format_list, parse_exception_message, CvatChunkTimestampMismatchError from cvat.apps.engine import field_validation, models +from cvat.apps.engine.frame_provider import TaskFrameProvider, FrameQuality from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials, Status from cvat.apps.engine.log import ServerLogManager from cvat.apps.engine.permissions import TaskPermission from cvat.apps.engine.task_validation import HoneypotFrameSelector -from cvat.apps.engine.utils import parse_specific_attributes, build_field_filter_params, get_list_view_name, reverse from cvat.apps.engine.rq_job_handler import RQJobMetaField, RQId +from cvat.apps.engine.utils import ( + format_list, grouped, parse_exception_message, CvatChunkTimestampMismatchError, + parse_specific_attributes, build_field_filter_params, get_list_view_name, reverse, take_by +) from drf_spectacular.utils import OpenApiExample, extend_schema_field, extend_schema_serializer @@ -961,6 +964,13 @@ class JobValidationLayoutWriteSerializer(serializers.Serializer): """.format(models.JobFrameSelectionMethod.MANUAL)) ) + def __init__( + self, *args, bulk_context: _TaskValidationLayoutBulkUpdateContext | None = None, **kwargs + ): + super().__init__(*args, **kwargs) + + self._bulk_context = bulk_context + def validate(self, attrs): frame_selection_method = attrs["frame_selection_method"] if frame_selection_method == models.JobFrameSelectionMethod.MANUAL: @@ -983,9 +993,10 @@ def validate(self, attrs): @transaction.atomic def update(self, instance: models.Job, validated_data: dict[str, Any]) -> models.Job: - from cvat.apps.engine.cache import MediaCache, Callback, enqueue_create_chunk_job, wait_for_rq_job + from cvat.apps.engine.cache import ( + MediaCache, Callback, enqueue_create_chunk_job, wait_for_rq_job + ) from cvat.apps.engine.frame_provider import JobFrameProvider - from cvat.apps.dataset_manager.task import JobAnnotation, AnnotationManager db_job = instance db_segment = db_job.segment @@ -1011,22 +1022,30 @@ def update(self, instance: models.Job, validated_data: dict[str, Any]) -> models def _to_rel_frame(abs_frame: int) -> int: return (abs_frame - db_data.start_frame) // frame_step - all_task_frames: dict[int, models.Image] = { - _to_rel_frame(frame.frame): frame - for frame in db_data.images.all() - } - task_honeypot_frames = set( - _to_rel_frame(frame_id) - for frame_id, frame in all_task_frames.items() - if frame.is_placeholder - ) - segment_frame_set = set(map(_to_rel_frame, db_segment.frame_set)) - segment_honeypots = sorted(segment_frame_set & task_honeypot_frames) + def _to_abs_frame(rel_frame: int) -> int: + return rel_frame * frame_step + db_data.start_frame - deleted_task_frames = db_data.deleted_frames - task_all_validation_frames = set(map(_to_rel_frame, db_task.gt_job.segment.frame_set)) - task_active_validation_frames = task_all_validation_frames.difference(deleted_task_frames) + bulk_context = self._bulk_context + if bulk_context: + db_frames = bulk_context.all_db_frames + task_honeypot_frames = set(bulk_context.honeypot_frames) + task_all_validation_frames = set(bulk_context.all_validation_frames) + task_active_validation_frames = set(bulk_context.active_validation_frames) + else: + db_frames: dict[int, models.Image] = { + _to_rel_frame(frame.frame): frame + for frame in db_data.images.all() + } + task_honeypot_frames = set( + _to_rel_frame(frame_id) + for frame_id, frame in db_frames.items() + if frame.is_placeholder + ) + task_all_validation_frames = set(db_data.validation_layout.frames) + task_active_validation_frames = set(db_data.validation_layout.active_frames) + segment_frame_set = set(map(_to_rel_frame, db_segment.frame_set)) + segment_honeypots = sorted(segment_frame_set & task_honeypot_frames) segment_honeypots_count = len(segment_honeypots) frame_selection_method = validated_data['frame_selection_method'] @@ -1079,70 +1098,72 @@ def _to_rel_frame(abs_frame: int) -> int: ) ) - validation_frame_counts = { - validation_frame: 0 for validation_frame in task_active_validation_frames - } - for task_honeypot_frame in task_honeypot_frames: - real_frame = _to_rel_frame(all_task_frames[task_honeypot_frame].real_frame) - if real_frame in task_active_validation_frames: - validation_frame_counts[real_frame] += 1 + if bulk_context: + active_validation_frame_counts = bulk_context.active_validation_frame_counts + else: + active_validation_frame_counts = { + validation_frame: 0 for validation_frame in task_active_validation_frames + } + for task_honeypot_frame in task_honeypot_frames: + real_frame = _to_rel_frame(db_frames[task_honeypot_frame].real_frame) + if real_frame in task_active_validation_frames: + active_validation_frame_counts[real_frame] += 1 - frame_selector = HoneypotFrameSelector(validation_frame_counts) + frame_selector = HoneypotFrameSelector(active_validation_frame_counts) requested_frames = frame_selector.select_next_frames(segment_honeypots_count) + requested_frames = list(map(_to_abs_frame, requested_frames)) else: assert False # Replace validation frames in the job - old_honeypot_real_ids = [] - updated_db_frames = [] + updated_honeypots = {} for frame, requested_validation_frame in zip(segment_honeypots, requested_frames): - db_requested_frame = all_task_frames[requested_validation_frame] - db_segment_frame = all_task_frames[frame] + db_requested_frame = db_frames[requested_validation_frame] + db_segment_frame = db_frames[frame] assert db_segment_frame.is_placeholder - old_honeypot_real_ids.append(_to_rel_frame(db_segment_frame.real_frame)) + if db_segment_frame.real_frame == db_requested_frame.frame: + continue # Change image in the current segment honeypot frame + db_segment_frame.real_frame = db_requested_frame.frame + db_segment_frame.path = db_requested_frame.path db_segment_frame.width = db_requested_frame.width db_segment_frame.height = db_requested_frame.height - db_segment_frame.real_frame = db_requested_frame.frame - db_segment_frame.related_files.set(db_requested_frame.related_files.all()) - updated_db_frames.append(db_segment_frame) + updated_honeypots[frame] = db_segment_frame - updated_validation_frames = [ - frame - for new_validation_frame, old_validation_frame, frame in zip( - requested_frames, old_honeypot_real_ids, segment_honeypots - ) - if new_validation_frame != old_validation_frame - ] - if updated_validation_frames: - models.Image.objects.bulk_update( - updated_db_frames, fields=['path', 'width', 'height', 'real_frame'] - ) + if updated_honeypots: + if bulk_context: + bulk_context.updated_honeypots.update(updated_honeypots) + else: + # Update image infos + models.Image.objects.bulk_update( + updated_honeypots.values(), fields=['path', 'width', 'height', 'real_frame'] + ) - # Remove annotations on changed validation frames - job_annotation = JobAnnotation(db_job.id) - job_annotation.init_from_db() - job_annotation_manager = AnnotationManager( - job_annotation.ir_data, dimension=db_task.dimension - ) - job_annotation_manager.clear_frames( - segment_frame_set.difference(updated_validation_frames) - ) - job_annotation.delete(job_annotation_manager.data) + models.RelatedFile.images.through.objects.filter( + image_id__in=updated_honeypots + ).delete() + + for updated_honeypot in updated_honeypots.values(): + validation_frame = db_frames[_to_rel_frame(updated_honeypot.real_frame)] + updated_honeypot.related_files.set(validation_frame.related_files.all()) + + # Remove annotations on changed validation frames + self._clear_annotations_on_frames(db_segment, updated_honeypots) # Update chunks job_frame_provider = JobFrameProvider(db_job) updated_segment_chunk_ids = set( job_frame_provider.get_chunk_number(updated_segment_frame_id) - for updated_segment_frame_id in updated_validation_frames + for updated_segment_frame_id in updated_honeypots ) segment_frames = sorted(segment_frame_set) segment_frame_map = dict(zip(segment_honeypots, requested_frames)) + chunks_to_be_removed = [] queue = django_rq.get_queue(settings.CVAT_QUEUES.CHUNKS.value) for chunk_id in sorted(updated_segment_chunk_ids): chunk_frames = segment_frames[ @@ -1163,37 +1184,67 @@ def _to_rel_frame(abs_frame: int) -> int: chunk_id, chunk_frames, quality, - {chunk_frame: all_task_frames[chunk_frame].path for chunk_frame in chunk_frames}, + { + chunk_frame: db_frames[chunk_frame].path + for chunk_frame in chunk_frames + }, segment_frame_map, ], ), ) wait_for_rq_job(rq_job) - MediaCache().remove_segment_chunk(db_segment, chunk_id, quality=quality) + chunks_to_be_removed.append( + {'db_segment': db_segment, 'chunk_number': chunk_id, 'quality': quality} + ) + + context_image_chunks_to_be_removed = [ + {"db_data": db_data, "frame_number": f} for f in updated_honeypots + ] + + if bulk_context: + bulk_context.chunks_to_be_removed.extend(chunks_to_be_removed) + bulk_context.context_image_chunks_to_be_removed.extend( + context_image_chunks_to_be_removed + ) + bulk_context.segments_with_updated_chunks.append(db_segment.id) + else: + media_cache = MediaCache() + media_cache.remove_segments_chunks(chunks_to_be_removed) + media_cache.remove_context_images_chunks(context_image_chunks_to_be_removed) - db_segment.chunks_updated_date = timezone.now() - db_segment.save(update_fields=['chunks_updated_date']) + db_segment.chunks_updated_date = timezone.now() + db_segment.save(update_fields=['chunks_updated_date']) - if updated_validation_frames or ( + if updated_honeypots or ( # even if the randomly selected frames were the same as before, we should still # consider it an update to the validation frames and restore them, if they were deleted frame_selection_method == models.JobFrameSelectionMethod.RANDOM_UNIFORM ): - if set(deleted_task_frames).intersection(updated_validation_frames): + # deleted frames that were updated in the job should be restored, as they are new now + if set(db_data.deleted_frames).intersection(updated_honeypots): db_data.deleted_frames = sorted( - set(deleted_task_frames).difference(updated_validation_frames) + set(db_data.deleted_frames).difference(updated_honeypots) ) db_data.save(update_fields=['deleted_frames']) - db_job.touch() - db_segment.job_set.exclude(id=db_job.id).update(updated_date=timezone.now()) - db_task.touch() - if db_task.project: - db_task.project.touch() + new_updated_date = timezone.now() + db_job.updated_date = new_updated_date + + if bulk_context: + bulk_context.updated_segments.append(db_segment.id) + else: + db_segment.job_set.update(updated_date=new_updated_date) + + db_task.touch() + if db_task.project: + db_task.project.touch() return instance + def _clear_annotations_on_frames(self, segment: models.Segment, frames: Sequence[int]): + models.clear_annotations_on_frames_in_honeypot_task(segment.task, frames=frames) + @staticmethod def _write_updated_static_chunk( db_segment_id: int, @@ -1237,7 +1288,8 @@ def _iterate_chunk_frames(): if db_segment.chunks_updated_date > initial_chunks_updated_date: raise CvatChunkTimestampMismatchError( "Attempting to write an out of date static chunk, " - f"segment.chunks_updated_date: {db_segment.chunks_updated_date}, expected_ts: {initial_chunks_updated_date}" + f"segment.chunks_updated_date: {db_segment.chunks_updated_date}, " + f"expected_ts: {initial_chunks_updated_date}" ) with open(get_chunk_path(chunk_id, db_segment_id), 'wb') as f: f.write(chunk.getvalue()) @@ -1296,6 +1348,28 @@ def _to_rel_frame(abs_frame: int) -> int: return super().to_representation(data) +class _TaskValidationLayoutBulkUpdateContext: + def __init__( + self, + *, + all_db_frames: dict[int, models.Image], + honeypot_frames: list[int], + all_validation_frames: list[int], + active_validation_frames: list[int], + validation_frame_counts: dict[int, int] | None = None + ): + self.updated_honeypots: dict[int, models.Image] = {} + self.updated_segments: list[int] = [] + self.chunks_to_be_removed: list[dict[str, Any]] = [] + self.context_image_chunks_to_be_removed: list[dict[str, Any]] = [] + self.segments_with_updated_chunks: list[int] = [] + + self.all_db_frames = all_db_frames + self.honeypot_frames = honeypot_frames + self.all_validation_frames = all_validation_frames + self.active_validation_frames = active_validation_frames + self.active_validation_frame_counts = validation_frame_counts + class TaskValidationLayoutWriteSerializer(serializers.Serializer): disabled_frames = serializers.ListField( child=serializers.IntegerField(min_value=0), required=False, @@ -1336,20 +1410,20 @@ def validate(self, attrs): @transaction.atomic def update(self, instance: models.Task, validated_data: dict[str, Any]) -> models.Task: - validation_layout: models.ValidationLayout | None = ( + db_validation_layout: models.ValidationLayout | None = ( getattr(instance.data, 'validation_layout', None) ) - if not validation_layout: + if not db_validation_layout: raise serializers.ValidationError("Validation is not configured in the task") if 'disabled_frames' in validated_data: requested_disabled_frames = validated_data['disabled_frames'] unknown_requested_disabled_frames = ( - set(requested_disabled_frames).difference(validation_layout.frames) + set(requested_disabled_frames).difference(db_validation_layout.frames) ) if unknown_requested_disabled_frames: raise serializers.ValidationError( - "Unknown frames requested for exclusion from the validation set {}".format( + "Unknown frames requested for exclusion from the validation set: {}".format( format_list(tuple(map(str, sorted(unknown_requested_disabled_frames)))) ) ) @@ -1360,11 +1434,12 @@ def update(self, instance: models.Task, validated_data: dict[str, Any]) -> model gt_job_meta_serializer.is_valid(raise_exception=True) gt_job_meta_serializer.save() - validation_layout.refresh_from_db() + db_validation_layout.refresh_from_db() + instance.data.refresh_from_db() frame_selection_method = validated_data.get('frame_selection_method') if frame_selection_method and not ( - validation_layout and + db_validation_layout and instance.data.validation_layout.mode == models.ValidationMode.GT_POOL ): raise serializers.ValidationError( @@ -1372,64 +1447,190 @@ def update(self, instance: models.Task, validated_data: dict[str, Any]) -> model f"validation mode is '{models.ValidationMode.GT_POOL}'" ) - if frame_selection_method == models.JobFrameSelectionMethod.MANUAL: - requested_honeypot_real_frames = validated_data['honeypot_real_frames'] + if not frame_selection_method: + return instance - task_honeypot_abs_frames = ( - instance.data.images - .filter(is_placeholder=True) - .order_by('frame') - .values_list('frame', flat=True) - ) + # Populate the prefetch cache for required objects + prefetch_related_objects([instance], + Prefetch('data__images', queryset=models.Image.objects.order_by('frame')), + 'segment_set', + 'segment_set__job_set', + ) + + frame_provider = TaskFrameProvider(instance) + db_frames = { + frame_provider.get_rel_frame_number(db_image.frame): db_image + for db_image in instance.data.images.all() + } + honeypot_frames = sorted(f for f, v in db_frames.items() if v.is_placeholder) + all_validation_frames = db_validation_layout.frames + active_validation_frames = db_validation_layout.active_frames + + bulk_context = _TaskValidationLayoutBulkUpdateContext( + all_db_frames=db_frames, + honeypot_frames=honeypot_frames, + all_validation_frames=all_validation_frames, + active_validation_frames=active_validation_frames, + ) - task_honeypot_frames_count = len(task_honeypot_abs_frames) + if frame_selection_method == models.JobFrameSelectionMethod.MANUAL: + requested_honeypot_real_frames = validated_data['honeypot_real_frames'] + task_honeypot_frames_count = len(honeypot_frames) if task_honeypot_frames_count != len(requested_honeypot_real_frames): raise serializers.ValidationError( "Invalid size of 'honeypot_real_frames' array, " f"expected {task_honeypot_frames_count}" ) elif frame_selection_method == models.JobFrameSelectionMethod.RANDOM_UNIFORM: - # Reset active honeypots in the task to produce a uniform distribution in the end - task_frame_provider = TaskFrameProvider(instance) - task_abs_disabled_validation_frames = [ - task_frame_provider.get_abs_frame_number(v) - for v in validation_layout.disabled_frames - ] - instance.data.images.filter(is_placeholder=True).exclude( - real_frame__in=task_abs_disabled_validation_frames - ).update(real_frame=0) - - if frame_selection_method: - for db_job in ( - models.Job.objects.select_related("segment") - .filter(segment__task_id=instance.id, type=models.JobType.ANNOTATION) - .order_by("segment__start_frame") - .all() - ): - db_job.segment.task = instance + # Reset distribution for active validation frames + bulk_context.active_validation_frame_counts = { f: 0 for f in active_validation_frames } + + # Could be done using Django ORM, but using order_by() and filter() + # would result in an extra DB request + db_jobs = sorted( + ( + db_job + for db_segment in instance.segment_set.all() + for db_job in db_segment.job_set.all() + if db_job.type == models.JobType.ANNOTATION + ), + key=lambda j: j.segment.start_frame + ) + for db_job in db_jobs: + job_serializer_params = { + 'frame_selection_method': frame_selection_method + } - job_serializer_params = { - 'frame_selection_method': frame_selection_method - } + if frame_selection_method == models.JobFrameSelectionMethod.MANUAL: + segment_frame_set = db_job.segment.frame_set + job_serializer_params['honeypot_real_frames'] = [ + requested_frame + for rel_frame, requested_frame in zip( + honeypot_frames, requested_honeypot_real_frames + ) + if frame_provider.get_abs_frame_number(rel_frame) in segment_frame_set + ] - if frame_selection_method == models.JobFrameSelectionMethod.MANUAL: - segment_frame_set = db_job.segment.frame_set - job_serializer_params['honeypot_real_frames'] = [ - requested_frame - for abs_frame, requested_frame in zip( - task_honeypot_abs_frames, requested_honeypot_real_frames - ) - if abs_frame in segment_frame_set - ] + job_validation_layout_serializer = JobValidationLayoutWriteSerializer( + db_job, job_serializer_params, bulk_context=bulk_context + ) + job_validation_layout_serializer.is_valid(raise_exception=True) + job_validation_layout_serializer.save() - job_validation_layout_serializer = JobValidationLayoutWriteSerializer( - db_job, job_serializer_params - ) - job_validation_layout_serializer.is_valid(raise_exception=True) - job_validation_layout_serializer.save() + self._perform_bulk_updates(instance, bulk_context=bulk_context) return instance + def _perform_bulk_updates( + self, + db_task: models.Task, + *, + bulk_context: _TaskValidationLayoutBulkUpdateContext, + ): + updated_segments = bulk_context.updated_segments + if not updated_segments: + return + + self._update_frames_in_bulk(db_task, bulk_context=bulk_context) + + # Import it here to avoid circular import + from cvat.apps.engine.cache import MediaCache + media_cache = MediaCache() + media_cache.remove_segments_chunks(bulk_context.chunks_to_be_removed) + media_cache.remove_context_images_chunks(bulk_context.context_image_chunks_to_be_removed) + + # Update segments + updated_date = timezone.now() + for updated_segments_batch in take_by(updated_segments, chunk_size=1000): + models.Job.objects.filter( + segment_id__in=updated_segments_batch + ).update(updated_date=updated_date) + + for updated_segment_chunks_batch in take_by( + bulk_context.segments_with_updated_chunks, chunk_size=1000 + ): + models.Segment.objects.filter( + id__in=updated_segment_chunks_batch + ).update(chunks_updated_date=updated_date) + + # Update parent objects + db_task.touch() + if db_task.project: + db_task.project.touch() + + def _update_frames_in_bulk( + self, + db_task: models.Task, + *, + bulk_context: _TaskValidationLayoutBulkUpdateContext, + ): + self._clear_annotations_on_frames(db_task, bulk_context.updated_honeypots) + + # The django generated bulk_update() query is too slow, so we use bulk_create() instead + # NOTE: Silk doesn't show these queries in the list of queries + # for some reason, but they can be seen in the profile + models.Image.objects.bulk_create( + list(bulk_context.updated_honeypots.values()), + update_conflicts=True, + update_fields=['path', 'width', 'height', 'real_frame'], + unique_fields=[ + # required for Postgres + # https://docs.djangoproject.com/en/4.2/ref/models/querysets/#bulk-create + 'id' + ], + batch_size=1000, + ) + + # Update related images in 2 steps: remove all m2m for honeypots, then add (copy) new ones + # 1. remove + for updated_honeypots_batch in take_by( + bulk_context.updated_honeypots.values(), chunk_size=1000 + ): + models.RelatedFile.images.through.objects.filter( + image_id__in=(db_honeypot.id for db_honeypot in updated_honeypots_batch) + ).delete() + + # 2. batched add (copy): collect all the new records and insert + frame_provider = TaskFrameProvider(db_task) + honeypots_by_validation_frame = grouped( + bulk_context.updated_honeypots, + key=lambda honeypot_frame: frame_provider.get_rel_frame_number( + bulk_context.updated_honeypots[honeypot_frame].real_frame + ) + ) # validation frame -> [honeypot_frame, ...] + + new_m2m_objects = [] + m2m_objects_by_validation_image_id = grouped( + models.RelatedFile.images.through.objects + .filter(image_id__in=( + bulk_context.all_db_frames[validation_frame].id + for validation_frame in honeypots_by_validation_frame + )) + .all(), + key=lambda m2m_obj: m2m_obj.image_id + ) + for validation_frame, validation_frame_honeypots in honeypots_by_validation_frame.items(): + validation_frame_m2m_objects = m2m_objects_by_validation_image_id.get( + bulk_context.all_db_frames[validation_frame].id + ) + if not validation_frame_m2m_objects: + continue + + # Copy validation frame m2m objects to corresponding honeypots + for honeypot_frame in validation_frame_honeypots: + new_m2m_objects.extend( + models.RelatedFile.images.through( + image_id=bulk_context.all_db_frames[honeypot_frame].id, + relatedfile_id=m2m_obj.relatedfile_id + ) + for m2m_obj in validation_frame_m2m_objects + ) + + models.RelatedFile.images.through.objects.bulk_create(new_m2m_objects, batch_size=1000) + + def _clear_annotations_on_frames(self, db_task: models.Task, frames: Sequence[int]): + models.clear_annotations_on_frames_in_honeypot_task(db_task, frames=frames) + class TaskValidationLayoutReadSerializer(serializers.ModelSerializer): validation_frames = serializers.ListField( child=serializers.IntegerField(min_value=0), source='frames', required=False, diff --git a/cvat/apps/engine/task.py b/cvat/apps/engine/task.py index 41cde26762b7..3fac8f03fe65 100644 --- a/cvat/apps/engine/task.py +++ b/cvat/apps/engine/task.py @@ -21,7 +21,6 @@ import av import attrs import django_rq -from datumaro.util import take_by from django.conf import settings from django.db import transaction from django.forms.models import model_to_dict @@ -39,8 +38,8 @@ ) from cvat.apps.engine.models import RequestAction, RequestTarget from cvat.apps.engine.utils import ( - av_scan_paths, format_list,get_rq_job_meta, - define_dependent_job, get_rq_lock_by_user + av_scan_paths, format_list, get_rq_job_meta, + define_dependent_job, get_rq_lock_by_user, take_by ) from cvat.apps.engine.rq_job_handler import RQId from cvat.apps.engine.task_validation import HoneypotFrameSelector @@ -1269,7 +1268,7 @@ def _update_status(msg: str) -> None: new_db_images: list[models.Image] = [] validation_frames: list[int] = [] frame_idx_map: dict[int, int] = {} # new to original id - for job_frames in take_by(non_pool_frames, count=db_task.segment_size or db_data.size): + for job_frames in take_by(non_pool_frames, chunk_size=db_task.segment_size or db_data.size): job_validation_frames = list(frame_selector.select_next_frames(frames_per_job_count)) job_frames += job_validation_frames diff --git a/cvat/apps/engine/task_validation.py b/cvat/apps/engine/task_validation.py index af5ebfbb7b72..3f15b7d79716 100644 --- a/cvat/apps/engine/task_validation.py +++ b/cvat/apps/engine/task_validation.py @@ -16,8 +16,7 @@ def __init__( self.validation_frame_counts = validation_frame_counts if not rng: - # Use a known uniform distribution - rng = np.random.Generator(np.random.MT19937()) + rng = np.random.default_rng() self.rng = rng @@ -31,7 +30,7 @@ def select_next_frames(self, count: int) -> Sequence[_T]: # if possible (if the job and GT counts allow this). pick = [] - for _ in range(count): + for random_number in self.rng.random(count): least_count = min(c for f, c in self.validation_frame_counts.items() if f not in pick) least_used_frames = tuple( f @@ -40,7 +39,7 @@ def select_next_frames(self, count: int) -> Sequence[_T]: if c == least_count ) - selected_item = self.rng.choice(range(len(least_used_frames)), 1).item() + selected_item = int(random_number * len(least_used_frames)) selected_frame = least_used_frames[selected_item] pick.append(selected_frame) self.validation_frame_counts[selected_frame] += 1 diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index 5d383df3465e..13d1d354dd3d 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: MIT import ast +from itertools import islice import cv2 as cv from collections import namedtuple import hashlib @@ -11,7 +12,9 @@ import sys import traceback from contextlib import suppress, nullcontext -from typing import Any, Dict, Optional, Callable, Sequence, Union +from typing import ( + Any, Callable, Dict, Generator, Iterable, Iterator, Optional, Mapping, Sequence, TypeVar, Union +) import subprocess import os import urllib.parse @@ -424,10 +427,22 @@ def directory_tree(path, max_depth=None) -> str: def is_dataset_export(request: HttpRequest) -> bool: return to_bool(request.query_params.get('save_images', False)) +_T = TypeVar('_T') -def chunked_list(lst, chunk_size): - for i in range(0, len(lst), chunk_size): - yield lst[i:i + chunk_size] +def take_by(iterable: Iterable[_T], chunk_size: int) -> Generator[list[_T], None, None]: + """ + Returns elements from the input iterable by batches of N items. + ('abcdefg', 3) -> ['a', 'b', 'c'], ['d', 'e', 'f'], ['g'] + """ + # can be changed to itertools.batched after migration to python3.12 + + it = iter(iterable) + while True: + batch = list(islice(it, chunk_size)) + if len(batch) == 0: + break + + yield batch FORMATTED_LIST_DISPLAY_THRESHOLD = 10 @@ -446,3 +461,34 @@ def format_list( separator.join(items[:max_items]), f" (and {remainder_count} more)" if 0 < remainder_count else "", ) + + +_K = TypeVar("_K") +_V = TypeVar("_V") + + +def grouped( + items: Iterator[_V] | Iterable[_V], *, key: Callable[[_V], _K] +) -> Mapping[_K, Sequence[_V]]: + """ + Returns a mapping with input iterable elements grouped by key, for example: + + grouped( + [("apple1", "red"), ("apple2", "green"), ("apple3", "red")], + key=lambda v: v[1] + ) + -> + { + "red": [("apple1", "red"), ("apple3", "red")], + "green": [("apple2", "green")] + } + + Similar to itertools.groupby, but allows reiteration on resulting groups. + """ + + # Can be implemented with itertools.groupby, but it requires extra sorting for input elements + grouped_items = {} + for item in items: + grouped_items.setdefault(key(item), []).append(item) + + return grouped_items diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index a73cf9449a60..9692cfc2f750 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -1772,7 +1772,7 @@ def preview(self, request, pk): @action(detail=True, methods=["GET", "PATCH"], url_path='validation_layout') @transaction.atomic def validation_layout(self, request, pk): - db_task = self.get_object() # call check_object_permissions as well + db_task = cast(models.Task, self.get_object()) # call check_object_permissions as well validation_layout = getattr(db_task.data, 'validation_layout', None) diff --git a/tests/python/rest_api/test_tasks.py b/tests/python/rest_api/test_tasks.py index ea81db5354bb..585d7dde4d9b 100644 --- a/tests/python/rest_api/test_tasks.py +++ b/tests/python/rest_api/test_tasks.py @@ -84,6 +84,15 @@ def get_cloud_storage_content(username: str, cloud_storage_id: int, manifest: Op return [f"{f['name']}{'/' if str(f['type']) == 'DIR' else ''}" for f in data["content"]] +def count_frame_uses(data: Sequence[int], *, included_frames: Sequence[int]) -> dict[int, int]: + use_counts = {f: 0 for f in included_frames} + for f in data: + if f in included_frames: + use_counts[f] += 1 + + return use_counts + + @pytest.mark.usefixtures("restore_db_per_class") class TestGetTasks: def _test_task_list_200(self, user, project_id, data, exclude_paths="", **kwargs): @@ -4451,6 +4460,15 @@ def test_can_change_honeypot_frames_in_task( api_client.tasks_api.retrieve_validation_layout(task["id"])[1].data ) + api_client.tasks_api.partial_update_validation_layout( + task["id"], + patched_task_validation_layout_write_request=models.PatchedTaskValidationLayoutWriteRequest( + frame_selection_method="manual", + honeypot_real_frames=old_validation_layout["honeypot_count"] + * [gt_frame_set[0]], + ), + ) + params = {"frame_selection_method": frame_selection_method} if frame_selection_method == "manual": @@ -4479,10 +4497,10 @@ def test_can_change_honeypot_frames_in_task( assert new_honeypot_real_frames == requested_honeypot_real_frames elif frame_selection_method == "random_uniform": # Test distribution - validation_frame_counts = {f: 0 for f in new_validation_layout["validation_frames"]} - for f in new_honeypot_real_frames: - validation_frame_counts[f] += 1 - + validation_frame_counts = count_frame_uses( + new_honeypot_real_frames, + included_frames=new_validation_layout["validation_frames"], + ) assert max(validation_frame_counts.values()) <= 1 + min( validation_frame_counts.values() ) @@ -4513,10 +4531,13 @@ def test_can_change_honeypot_frames_in_task_can_only_select_from_active_validati gt_frame_set = range(gt_job["start_frame"], gt_job["stop_frame"] + 1) active_gt_set = gt_frame_set[:honeypots_per_job] - api_client.jobs_api.partial_update_data_meta( - gt_job["id"], - patched_job_data_meta_write_request=models.PatchedJobDataMetaWriteRequest( - deleted_frames=[f for f in gt_frame_set if f not in active_gt_set] + api_client.tasks_api.partial_update_validation_layout( + task["id"], + patched_task_validation_layout_write_request=models.PatchedTaskValidationLayoutWriteRequest( + disabled_frames=[f for f in gt_frame_set if f not in active_gt_set], + frame_selection_method="manual", + honeypot_real_frames=old_validation_layout["honeypot_count"] + * [active_gt_set[0]], ), ) @@ -4559,7 +4580,7 @@ def test_can_change_honeypot_frames_in_task_can_only_select_from_active_validati new_honeypot_real_frames = new_validation_layout["honeypot_real_frames"] assert old_validation_layout["honeypot_count"] == len(new_honeypot_real_frames) - assert all(f in active_gt_set for f in new_honeypot_real_frames) + assert all([f in active_gt_set for f in new_honeypot_real_frames]) if frame_selection_method == "manual": assert new_honeypot_real_frames == requested_honeypot_real_frames @@ -4578,6 +4599,97 @@ def test_can_change_honeypot_frames_in_task_can_only_select_from_active_validati ] ), new_honeypot_real_frames + # Test distribution + validation_frame_counts = count_frame_uses( + new_honeypot_real_frames, included_frames=active_gt_set + ) + assert max(validation_frame_counts.values()) <= 1 + min( + validation_frame_counts.values() + ) + + @parametrize("task, gt_job, annotation_jobs", [fixture_ref(fxt_task_with_honeypots)]) + @parametrize("frame_selection_method", ["manual", "random_uniform"]) + def test_can_restore_and_change_honeypot_frames_in_task_in_the_same_request( + self, admin_user, task, gt_job, annotation_jobs, frame_selection_method: str + ): + assert gt_job["stop_frame"] - gt_job["start_frame"] + 1 >= 2 + + with make_api_client(admin_user) as api_client: + old_validation_layout = json.loads( + api_client.tasks_api.retrieve_validation_layout(task["id"])[1].data + ) + + honeypots_per_job = old_validation_layout["frames_per_job_count"] + + gt_frame_set = range(gt_job["start_frame"], gt_job["stop_frame"] + 1) + active_gt_set = gt_frame_set[:honeypots_per_job] + + api_client.tasks_api.partial_update_validation_layout( + task["id"], + patched_task_validation_layout_write_request=models.PatchedTaskValidationLayoutWriteRequest( + disabled_frames=[f for f in gt_frame_set if f not in active_gt_set], + frame_selection_method="manual", + honeypot_real_frames=old_validation_layout["honeypot_count"] + * [active_gt_set[0]], + ), + ) + + active_gt_set = gt_frame_set + + params = { + "frame_selection_method": frame_selection_method, + "disabled_frames": [], # restore all validation frames + } + + if frame_selection_method == "manual": + requested_honeypot_real_frames = [ + active_gt_set[(old_real_frame + 1) % len(active_gt_set)] + for old_real_frame in old_validation_layout["honeypot_real_frames"] + ] + + params["honeypot_real_frames"] = requested_honeypot_real_frames + + new_validation_layout = json.loads( + api_client.tasks_api.partial_update_validation_layout( + task["id"], + patched_task_validation_layout_write_request=( + models.PatchedTaskValidationLayoutWriteRequest(**params) + ), + )[1].data + ) + + new_honeypot_real_frames = new_validation_layout["honeypot_real_frames"] + + assert old_validation_layout["honeypot_count"] == len(new_honeypot_real_frames) + assert sorted(new_validation_layout["disabled_frames"]) == sorted( + params["disabled_frames"] + ) + + if frame_selection_method == "manual": + assert new_honeypot_real_frames == requested_honeypot_real_frames + else: + assert all( + [ + honeypots_per_job + == len( + set( + new_honeypot_real_frames[ + j * honeypots_per_job : (j + 1) * honeypots_per_job + ] + ) + ) + ] + for j in range(len(annotation_jobs)) + ), new_honeypot_real_frames + + # Test distribution + validation_frame_counts = count_frame_uses( + new_honeypot_real_frames, included_frames=active_gt_set + ) + assert max(validation_frame_counts.values()) <= 1 + min( + validation_frame_counts.values() + ) + @parametrize("task, gt_job, annotation_jobs", [fixture_ref(fxt_task_with_honeypots)]) @parametrize("frame_selection_method", ["manual", "random_uniform"]) def test_can_change_honeypot_frames_in_annotation_jobs(