From 77378f1063359fb4499e5bfedd8b1b284c2df0ed Mon Sep 17 00:00:00 2001 From: Roman Donchenko Date: Fri, 29 Mar 2024 13:42:18 +0200 Subject: [PATCH] Use the same algorithm to throttle quality reports as analytics reports (#7596) ### Motivation and context See #7576 for more details. This patch extracts the high-level throttling functionality added in that patch and reuses it for quality reports. Note: in that patch I referred to this functionality as debouncing, but throttling seems like a more accurate description. It would be debouncing if the autoupdate job only ran after no updates occurred for a period, which is not how it actually works. ### How has this been tested? ### Checklist - [x] I submit my changes into the `develop` branch - [x] I have created a changelog fragment - ~~[ ] I have updated the documentation accordingly~~ - ~~[ ] I have added tests to cover my changes~~ - ~~[ ] I have linked related issues (see [GitHub docs]( https://help.github.com/en/github/managing-your-work-on-github/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword))~~ - ~~[ ] I have increased versions of npm packages if it is necessary ([cvat-canvas](https://github.com/opencv/cvat/tree/develop/cvat-canvas#versioning), [cvat-core](https://github.com/opencv/cvat/tree/develop/cvat-core#versioning), [cvat-data](https://github.com/opencv/cvat/tree/develop/cvat-data#versioning) and [cvat-ui](https://github.com/opencv/cvat/tree/develop/cvat-ui#versioning))~~ ### License - [x] I submit _my code changes_ under the same [MIT License]( https://github.com/opencv/cvat/blob/develop/LICENSE) that covers the project. Feel free to contact the maintainers if that's a concern. --- ...163919_roman_throttling_quality_reports.md | 4 ++ cvat/apps/analytics_report/report/create.py | 36 +++------- cvat/apps/quality_control/quality_reports.py | 69 +++---------------- cvat/utils/background_jobs.py | 48 +++++++++++++ 4 files changed, 71 insertions(+), 86 deletions(-) create mode 100644 changelog.d/20240312_163919_roman_throttling_quality_reports.md create mode 100644 cvat/utils/background_jobs.py diff --git a/changelog.d/20240312_163919_roman_throttling_quality_reports.md b/changelog.d/20240312_163919_roman_throttling_quality_reports.md new file mode 100644 index 000000000000..40667c80c17e --- /dev/null +++ b/changelog.d/20240312_163919_roman_throttling_quality_reports.md @@ -0,0 +1,4 @@ +### Fixed + +- Made quality report update job scheduling more efficient + () diff --git a/cvat/apps/analytics_report/report/create.py b/cvat/apps/analytics_report/report/create.py index fdf44e6d666d..738cd9cfc069 100644 --- a/cvat/apps/analytics_report/report/create.py +++ b/cvat/apps/analytics_report/report/create.py @@ -33,6 +33,7 @@ JobObjects, ) from cvat.apps.engine.models import Job, Project, Task +from cvat.utils.background_jobs import schedule_job_with_throttling def get_empty_report(): @@ -75,12 +76,6 @@ def _make_queue_job_id_base(self, obj) -> str: def _make_custom_analytics_check_job_id(self) -> str: return uuid4().hex - def _make_queue_job_id(self, obj, start_time: timezone.datetime) -> str: - return f"{self._make_queue_job_id_base(obj)}-{start_time.timestamp()}" - - def _make_autoupdate_blocker_key(self, obj) -> str: - return f"cvat:analytics:autoupdate-blocker:{self._make_queue_job_id_base(obj)}" - @classmethod def _get_last_report_time(cls, obj): try: @@ -121,27 +116,14 @@ def schedule_analytics_report_autoupdate_job(self, *, job=None, task=None, proje target_obj = project cvat_project_id = project.id - with django_rq.get_connection(settings.CVAT_QUEUES.ANALYTICS_REPORTS.value) as connection: - # The blocker key is used to avoid scheduling a report update job - # for every single change. The first time this method is called - # for a given object, we schedule the job and create a blocker - # that expires at the same time as the job is supposed to start. - # Until the blocker expires, we don't schedule any more jobs. - blocker_key = self._make_autoupdate_blocker_key(target_obj) - if connection.exists(blocker_key): - return - - queue_job_id = self._make_queue_job_id(target_obj, next_job_time) - - self._get_scheduler().enqueue_at( - next_job_time, - self._check_analytics_report, - cvat_task_id=cvat_task_id, - cvat_project_id=cvat_project_id, - job_id=queue_job_id, - ) - - connection.set(blocker_key, queue_job_id, exat=next_job_time) + schedule_job_with_throttling( + settings.CVAT_QUEUES.ANALYTICS_REPORTS.value, + self._make_queue_job_id_base(target_obj), + next_job_time, + self._check_analytics_report, + cvat_task_id=cvat_task_id, + cvat_project_id=cvat_project_id, + ) def schedule_analytics_check_job(self, *, job=None, task=None, project=None, user_id): rq_id = self._make_custom_analytics_check_job_id() diff --git a/cvat/apps/quality_control/quality_reports.py b/cvat/apps/quality_control/quality_reports.py index 562173f14410..f497ddf3febd 100644 --- a/cvat/apps/quality_control/quality_reports.py +++ b/cvat/apps/quality_control/quality_reports.py @@ -50,6 +50,7 @@ AnnotationConflictType, AnnotationType, ) +from cvat.utils.background_jobs import schedule_job_with_throttling class _Serializable: @@ -2065,18 +2066,12 @@ def _get_scheduler(self): def _get_queue(self): return django_rq.get_queue(settings.CVAT_QUEUES.QUALITY_REPORTS.value) - def _make_queue_job_prefix(self, task: Task) -> str: - return f"{self._QUEUE_JOB_PREFIX}{task.id}-" + def _make_queue_job_id_base(self, task: Task) -> str: + return f"{self._QUEUE_JOB_PREFIX}{task.id}" def _make_custom_quality_check_job_id(self) -> str: return uuid4().hex - def _make_initial_queue_job_id(self, task: Task) -> str: - return f"{self._make_queue_job_prefix(task)}initial" - - def _make_regular_queue_job_id(self, task: Task, start_time: timezone.datetime) -> str: - return f"{self._make_queue_job_prefix(task)}{start_time.timestamp()}" - @classmethod def _get_last_report_time(cls, task: Task) -> Optional[timezone.datetime]: report = models.QualityReport.objects.filter(task=task).order_by("-created_date").first() @@ -2084,41 +2079,6 @@ def _get_last_report_time(cls, task: Task) -> Optional[timezone.datetime]: return report.created_date return None - def _find_next_job_id( - self, existing_job_ids: Sequence[str], task: Task, *, now: timezone.datetime - ) -> str: - job_id_prefix = self._make_queue_job_prefix(task) - - def _get_timestamp(job_id: str) -> timezone.datetime: - job_timestamp = job_id.split(job_id_prefix, maxsplit=1)[-1] - if job_timestamp == "initial": - return timezone.datetime.min.replace(tzinfo=timezone.utc) - else: - return timezone.datetime.fromtimestamp(float(job_timestamp), tz=timezone.utc) - - max_job_id = max( - (j for j in existing_job_ids if j.startswith(job_id_prefix)), - key=_get_timestamp, - default=None, - ) - max_timestamp = _get_timestamp(max_job_id) if max_job_id else None - - last_update_time = self._get_last_report_time(task) - if last_update_time is None: - # Report has never been computed, is queued, or is being computed - queue_job_id = self._make_initial_queue_job_id(task) - elif max_timestamp is not None and now < max_timestamp: - # Reuse the existing next job - queue_job_id = max_job_id - else: - # Add an updating job in the queue in the next time frame - delay = self._get_quality_check_job_delay() - intervals = max(1, 1 + (now - last_update_time) // delay) - next_update_time = last_update_time + delay * intervals - queue_job_id = self._make_regular_queue_job_id(task, next_update_time) - - return queue_job_id - class QualityReportsNotAvailable(Exception): pass @@ -2148,11 +2108,6 @@ def schedule_quality_autoupdate_job(self, task: Task): This function schedules a quality report autoupdate job """ - # The algorithm is lock-free. It should keep the following properties: - # - job names are stable between potential writers - # - if multiple simultaneous writes can happen, the objects written must be the same - # - once a job is created, it can only be updated by the scheduler and the handling worker - if not self._should_update(task): return @@ -2160,17 +2115,13 @@ def schedule_quality_autoupdate_job(self, task: Task): delay = self._get_quality_check_job_delay() next_job_time = now.utcnow() + delay - scheduler = self._get_scheduler() - existing_job_ids = set(j.id for j in scheduler.get_jobs(until=next_job_time)) - - queue_job_id = self._find_next_job_id(existing_job_ids, task, now=now) - if queue_job_id not in existing_job_ids: - scheduler.enqueue_at( - next_job_time, - self._check_task_quality, - task_id=task.id, - job_id=queue_job_id, - ) + schedule_job_with_throttling( + settings.CVAT_QUEUES.QUALITY_REPORTS.value, + self._make_queue_job_id_base(task), + next_job_time, + self._check_task_quality, + task_id=task.id, + ) def schedule_quality_check_job(self, task: Task, *, user_id: int) -> str: """ diff --git a/cvat/utils/background_jobs.py b/cvat/utils/background_jobs.py new file mode 100644 index 000000000000..caf2e859a530 --- /dev/null +++ b/cvat/utils/background_jobs.py @@ -0,0 +1,48 @@ +# Copyright (C) 2024 CVAT.ai Corporation +# +# SPDX-License-Identifier: MIT + +from collections.abc import Callable +from datetime import datetime + +import django_rq + +def schedule_job_with_throttling( + queue_name: str, + job_id_base: str, + scheduled_time: datetime, + func: Callable, + **func_kwargs +) -> None: + """ + This function schedules an RQ job to run at `scheduled_time`, + unless it had already been used to schedule a job to run at some future time + with the same values of `queue_name` and `job_id_base`, + in which case it does nothing. + + The scheduled job will have an ID beginning with `job_id_base`, + and will execute `func(**func_kwargs)`. + """ + with django_rq.get_connection(queue_name) as connection: + # The blocker key is used to implement the throttling. + # The first time this function is called for a given tuple of + # (queue_name, job_id_base), we schedule the job and create a blocker + # that expires at the same time as the job is supposed to start. + # Until the blocker expires, we don't schedule any more jobs + # with the same tuple. + blocker_key = f"cvat:utils:scheduling-blocker:{queue_name}:{job_id_base}" + if connection.exists(blocker_key): + return + + queue_job_id = f"{job_id_base}-{scheduled_time.timestamp()}" + + # TODO: reuse the Redis connection if Django-RQ allows it. + # See . + django_rq.get_scheduler(queue_name).enqueue_at( + scheduled_time, + func, + **func_kwargs, + job_id=queue_job_id, + ) + + connection.set(blocker_key, queue_job_id, exat=scheduled_time)