Skip to content

Commit

Permalink
Use the same algorithm to throttle quality reports as analytics repor…
Browse files Browse the repository at this point in the history
…ts (cvat-ai#7596)

<!-- Raise an issue to propose your change
(https://github.com/opencv/cvat/issues).
It helps to avoid duplication of efforts from multiple independent
contributors.
Discuss your ideas with maintainers to be sure that changes will be
approved and merged.
Read the [Contribution
guide](https://opencv.github.io/cvat/docs/contributing/). -->

<!-- Provide a general summary of your changes in the Title above -->

### Motivation and context
<!-- Why is this change required? What problem does it solve? If it
fixes an open
issue, please link to the issue here. Describe your changes in detail,
add
screenshots. -->
See cvat-ai#7576 for more details. This patch extracts the high-level
throttling functionality added in that patch and reuses it for quality
reports.

Note: in that patch I referred to this functionality as debouncing, but
throttling seems like a more accurate description. It would be
debouncing if the autoupdate job only ran after no updates occurred for
a period, which is not how it actually works.

### How has this been tested?
<!-- Please describe in detail how you tested your changes.
Include details of your testing environment, and the tests you ran to
see how your change affects other areas of the code, etc. -->

### Checklist
<!-- Go over all the following points, and put an `x` in all the boxes
that apply.
If an item isn't applicable for some reason, then ~~explicitly
strikethrough~~ the whole
line. If you don't do that, GitHub will show incorrect progress for the
pull request.
If you're unsure about any of these, don't hesitate to ask. We're here
to help! -->
- [x] I submit my changes into the `develop` branch
- [x] I have created a changelog fragment <!-- see top comment in
CHANGELOG.md -->
- ~~[ ] I have updated the documentation accordingly~~
- ~~[ ] I have added tests to cover my changes~~
- ~~[ ] I have linked related issues (see [GitHub docs](

https://help.github.com/en/github/managing-your-work-on-github/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword))~~
- ~~[ ] I have increased versions of npm packages if it is necessary

([cvat-canvas](https://github.com/opencv/cvat/tree/develop/cvat-canvas#versioning),

[cvat-core](https://github.com/opencv/cvat/tree/develop/cvat-core#versioning),

[cvat-data](https://github.com/opencv/cvat/tree/develop/cvat-data#versioning)
and

[cvat-ui](https://github.com/opencv/cvat/tree/develop/cvat-ui#versioning))~~

### License

- [x] I submit _my code changes_ under the same [MIT License](
https://github.com/opencv/cvat/blob/develop/LICENSE) that covers the
project.
  Feel free to contact the maintainers if that's a concern.
  • Loading branch information
SpecLad authored Mar 29, 2024
1 parent 85f5376 commit 77378f1
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 86 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Fixed

- Made quality report update job scheduling more efficient
(<https://github.com/opencv/cvat/pull/7596>)
36 changes: 9 additions & 27 deletions cvat/apps/analytics_report/report/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
JobObjects,
)
from cvat.apps.engine.models import Job, Project, Task
from cvat.utils.background_jobs import schedule_job_with_throttling


def get_empty_report():
Expand Down Expand Up @@ -75,12 +76,6 @@ def _make_queue_job_id_base(self, obj) -> str:
def _make_custom_analytics_check_job_id(self) -> str:
return uuid4().hex

def _make_queue_job_id(self, obj, start_time: timezone.datetime) -> str:
return f"{self._make_queue_job_id_base(obj)}-{start_time.timestamp()}"

def _make_autoupdate_blocker_key(self, obj) -> str:
return f"cvat:analytics:autoupdate-blocker:{self._make_queue_job_id_base(obj)}"

@classmethod
def _get_last_report_time(cls, obj):
try:
Expand Down Expand Up @@ -121,27 +116,14 @@ def schedule_analytics_report_autoupdate_job(self, *, job=None, task=None, proje
target_obj = project
cvat_project_id = project.id

with django_rq.get_connection(settings.CVAT_QUEUES.ANALYTICS_REPORTS.value) as connection:
# The blocker key is used to avoid scheduling a report update job
# for every single change. The first time this method is called
# for a given object, we schedule the job and create a blocker
# that expires at the same time as the job is supposed to start.
# Until the blocker expires, we don't schedule any more jobs.
blocker_key = self._make_autoupdate_blocker_key(target_obj)
if connection.exists(blocker_key):
return

queue_job_id = self._make_queue_job_id(target_obj, next_job_time)

self._get_scheduler().enqueue_at(
next_job_time,
self._check_analytics_report,
cvat_task_id=cvat_task_id,
cvat_project_id=cvat_project_id,
job_id=queue_job_id,
)

connection.set(blocker_key, queue_job_id, exat=next_job_time)
schedule_job_with_throttling(
settings.CVAT_QUEUES.ANALYTICS_REPORTS.value,
self._make_queue_job_id_base(target_obj),
next_job_time,
self._check_analytics_report,
cvat_task_id=cvat_task_id,
cvat_project_id=cvat_project_id,
)

def schedule_analytics_check_job(self, *, job=None, task=None, project=None, user_id):
rq_id = self._make_custom_analytics_check_job_id()
Expand Down
69 changes: 10 additions & 59 deletions cvat/apps/quality_control/quality_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
AnnotationConflictType,
AnnotationType,
)
from cvat.utils.background_jobs import schedule_job_with_throttling


class _Serializable:
Expand Down Expand Up @@ -2065,60 +2066,19 @@ def _get_scheduler(self):
def _get_queue(self):
return django_rq.get_queue(settings.CVAT_QUEUES.QUALITY_REPORTS.value)

def _make_queue_job_prefix(self, task: Task) -> str:
return f"{self._QUEUE_JOB_PREFIX}{task.id}-"
def _make_queue_job_id_base(self, task: Task) -> str:
return f"{self._QUEUE_JOB_PREFIX}{task.id}"

def _make_custom_quality_check_job_id(self) -> str:
return uuid4().hex

def _make_initial_queue_job_id(self, task: Task) -> str:
return f"{self._make_queue_job_prefix(task)}initial"

def _make_regular_queue_job_id(self, task: Task, start_time: timezone.datetime) -> str:
return f"{self._make_queue_job_prefix(task)}{start_time.timestamp()}"

@classmethod
def _get_last_report_time(cls, task: Task) -> Optional[timezone.datetime]:
report = models.QualityReport.objects.filter(task=task).order_by("-created_date").first()
if report:
return report.created_date
return None

def _find_next_job_id(
self, existing_job_ids: Sequence[str], task: Task, *, now: timezone.datetime
) -> str:
job_id_prefix = self._make_queue_job_prefix(task)

def _get_timestamp(job_id: str) -> timezone.datetime:
job_timestamp = job_id.split(job_id_prefix, maxsplit=1)[-1]
if job_timestamp == "initial":
return timezone.datetime.min.replace(tzinfo=timezone.utc)
else:
return timezone.datetime.fromtimestamp(float(job_timestamp), tz=timezone.utc)

max_job_id = max(
(j for j in existing_job_ids if j.startswith(job_id_prefix)),
key=_get_timestamp,
default=None,
)
max_timestamp = _get_timestamp(max_job_id) if max_job_id else None

last_update_time = self._get_last_report_time(task)
if last_update_time is None:
# Report has never been computed, is queued, or is being computed
queue_job_id = self._make_initial_queue_job_id(task)
elif max_timestamp is not None and now < max_timestamp:
# Reuse the existing next job
queue_job_id = max_job_id
else:
# Add an updating job in the queue in the next time frame
delay = self._get_quality_check_job_delay()
intervals = max(1, 1 + (now - last_update_time) // delay)
next_update_time = last_update_time + delay * intervals
queue_job_id = self._make_regular_queue_job_id(task, next_update_time)

return queue_job_id

class QualityReportsNotAvailable(Exception):
pass

Expand Down Expand Up @@ -2148,29 +2108,20 @@ def schedule_quality_autoupdate_job(self, task: Task):
This function schedules a quality report autoupdate job
"""

# The algorithm is lock-free. It should keep the following properties:
# - job names are stable between potential writers
# - if multiple simultaneous writes can happen, the objects written must be the same
# - once a job is created, it can only be updated by the scheduler and the handling worker

if not self._should_update(task):
return

now = timezone.now()
delay = self._get_quality_check_job_delay()
next_job_time = now.utcnow() + delay

scheduler = self._get_scheduler()
existing_job_ids = set(j.id for j in scheduler.get_jobs(until=next_job_time))

queue_job_id = self._find_next_job_id(existing_job_ids, task, now=now)
if queue_job_id not in existing_job_ids:
scheduler.enqueue_at(
next_job_time,
self._check_task_quality,
task_id=task.id,
job_id=queue_job_id,
)
schedule_job_with_throttling(
settings.CVAT_QUEUES.QUALITY_REPORTS.value,
self._make_queue_job_id_base(task),
next_job_time,
self._check_task_quality,
task_id=task.id,
)

def schedule_quality_check_job(self, task: Task, *, user_id: int) -> str:
"""
Expand Down
48 changes: 48 additions & 0 deletions cvat/utils/background_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (C) 2024 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

from collections.abc import Callable
from datetime import datetime

import django_rq

def schedule_job_with_throttling(
queue_name: str,
job_id_base: str,
scheduled_time: datetime,
func: Callable,
**func_kwargs
) -> None:
"""
This function schedules an RQ job to run at `scheduled_time`,
unless it had already been used to schedule a job to run at some future time
with the same values of `queue_name` and `job_id_base`,
in which case it does nothing.
The scheduled job will have an ID beginning with `job_id_base`,
and will execute `func(**func_kwargs)`.
"""
with django_rq.get_connection(queue_name) as connection:
# The blocker key is used to implement the throttling.
# The first time this function is called for a given tuple of
# (queue_name, job_id_base), we schedule the job and create a blocker
# that expires at the same time as the job is supposed to start.
# Until the blocker expires, we don't schedule any more jobs
# with the same tuple.
blocker_key = f"cvat:utils:scheduling-blocker:{queue_name}:{job_id_base}"
if connection.exists(blocker_key):
return

queue_job_id = f"{job_id_base}-{scheduled_time.timestamp()}"

# TODO: reuse the Redis connection if Django-RQ allows it.
# See <https://github.com/rq/django-rq/issues/652>.
django_rq.get_scheduler(queue_name).enqueue_at(
scheduled_time,
func,
**func_kwargs,
job_id=queue_job_id,
)

connection.set(blocker_key, queue_job_id, exat=scheduled_time)

0 comments on commit 77378f1

Please sign in to comment.