Skip to content

Commit

Permalink
Optimize DB requests (#6340)
Browse files Browse the repository at this point in the history
Optimizes several DB request memory issues, introduced in #6204

- Optimized memory use in job annotation retrieval
- Optimized migration
  • Loading branch information
zhiltsov-max authored and azhavoro committed Jun 20, 2023
1 parent 429bf6e commit a82d336
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 14 deletions.
9 changes: 3 additions & 6 deletions cvat/apps/dataset_manager/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from cvat.apps.dataset_manager.annotation import AnnotationIR, AnnotationManager
from cvat.apps.dataset_manager.bindings import TaskData, JobData, CvatImportError
from cvat.apps.dataset_manager.formats.registry import make_exporter, make_importer
from cvat.apps.dataset_manager.util import add_prefetch_fields, bulk_create
from cvat.apps.dataset_manager.util import add_prefetch_fields, bulk_create, get_cached


class dotdict(OrderedDict):
Expand Down Expand Up @@ -105,17 +105,14 @@ def add_prefetch_info(cls, queryset):

def __init__(self, pk, *, is_prefetched=False, queryset=None):
if queryset is None:
queryset = self.add_prefetch_info(models.Job.objects).all()
queryset = self.add_prefetch_info(models.Job.objects)

if is_prefetched:
self.db_job: models.Job = queryset.select_related(
'segment__task'
).select_for_update().get(id=pk)
else:
try:
self.db_job: models.Job = next(job for job in queryset if job.pk == int(pk))
except StopIteration as ex:
raise models.Job.DoesNotExist from ex
self.db_job: models.Job = get_cached(queryset, pk=int(pk))

db_segment = self.db_job.segment
self.start_frame = db_segment.start_frame
Expand Down
25 changes: 22 additions & 3 deletions cvat/apps/dataset_manager/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import zipfile

from django.conf import settings
from django.db.models import QuerySet
from django.db import models


def current_function_name(depth=1):
Expand Down Expand Up @@ -40,16 +40,35 @@ def bulk_create(db_model, objects, flt_param):

return []

def is_prefetched(queryset: QuerySet, field: str) -> bool:
def is_prefetched(queryset: models.QuerySet, field: str) -> bool:
return field in queryset._prefetch_related_lookups

def add_prefetch_fields(queryset: QuerySet, fields: Sequence[str]) -> QuerySet:
def add_prefetch_fields(queryset: models.QuerySet, fields: Sequence[str]) -> models.QuerySet:
for field in fields:
if not is_prefetched(queryset, field):
queryset = queryset.prefetch_related(field)

return queryset

def get_cached(queryset: models.QuerySet, pk: int) -> models.Model:
"""
Like regular queryset.get(), but checks for the cached values first
instead of just making a request.
"""

# Read more about caching insights:
# https://www.mattduck.com/2021-01-django-orm-result-cache.html
# The field is initialized on accessing the query results, eg. on iteration
if getattr(queryset, '_result_cache'):
result = next((obj for obj in queryset if obj.pk == pk), None)
else:
result = None

if result is None:
result = queryset.get(id=pk)

return result

def deepcopy_simple(v):
# Default deepcopy is very slow

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def add_created_date_to_existing_jobs(apps, schema_editor):
task = job.segment.task
job.created_date = task.created_date

Job.objects.bulk_update(jobs, fields=['created_date'])
Job.objects.bulk_update(jobs, fields=['created_date'], batch_size=500)


class Migration(migrations.Migration):
Expand Down
15 changes: 11 additions & 4 deletions cvat/apps/quality_control/quality_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -2241,10 +2241,9 @@ def _compute_reports(self, task_id: int) -> int:
except Task.DoesNotExist:
return

# Try to use shared queryset to minimize DB requests
job_queryset = Job.objects.prefetch_related("segment")
job_queryset = JobDataProvider.add_prefetch_info(job_queryset)
job_queryset = job_queryset.filter(segment__task_id=task_id).all()
# Try to use a shared queryset to minimize DB requests
job_queryset = Job.objects.select_related("segment")
job_queryset = job_queryset.filter(segment__task_id=task_id)

# The GT job could have been removed during scheduling, so we need to check it exists
gt_job: Job = next(
Expand All @@ -2258,6 +2257,14 @@ def _compute_reports(self, task_id: int) -> int:
# - job updated -> job report is computed
# old reports can be reused in this case (need to add M-1 relationship in reports)

# Add prefetch data to the shared queryset
# All the jobs / segments share the same task, so we can load it just once.
# We reuse the same object for better memory use (OOM is possible otherwise).
# Perform manual "join", since django can't do this.
gt_job = JobDataProvider.add_prefetch_info(job_queryset).get(id=gt_job.id)
for job in job_queryset:
job.segment.task = gt_job.segment.task

# Preload all the data for the computations
# It must be done in a single transaction and before all the remaining computations
# because the task and jobs can be changed after the beginning,
Expand Down

0 comments on commit a82d336

Please sign in to comment.