From 144381cc4ac7fb8e693dedad2c6af46de7af7de3 Mon Sep 17 00:00:00 2001 From: Roman Donchenko Date: Fri, 8 Mar 2024 17:23:08 +0200 Subject: [PATCH] Make analytics report update job scheduling more efficient Currently, `schedule_analytics_report_autoupdate_job` attempts to debounce job scheduling by examining existing jobs before scheduling a new one. Unfortunately, the `scheduler.get_jobs` function, which it uses for this purpose, scales poorly. Not only does it fetch a list of all scheduled jobs (and not just ones related to the current object), but it then fetches information about every job, one by one. The current logic doesn't even need this information, but RQ Scheduler provides no method to get just the IDs. Replace the current logic with a new lightweight approach that uses a custom Redis key to block scheduling of additional jobs. --- ...5130_roman_efficient_analytics_debounce.md | 4 ++ cvat/apps/analytics_report/report/create.py | 68 ++++++------------- 2 files changed, 26 insertions(+), 46 deletions(-) create mode 100644 changelog.d/20240308_175130_roman_efficient_analytics_debounce.md diff --git a/changelog.d/20240308_175130_roman_efficient_analytics_debounce.md b/changelog.d/20240308_175130_roman_efficient_analytics_debounce.md new file mode 100644 index 000000000000..8ab33ea6089e --- /dev/null +++ b/changelog.d/20240308_175130_roman_efficient_analytics_debounce.md @@ -0,0 +1,4 @@ +### Fixed + +- Made analytics report update job scheduling more efficient + () diff --git a/cvat/apps/analytics_report/report/create.py b/cvat/apps/analytics_report/report/create.py index 3a8da9dca55a..fdf44e6d666d 100644 --- a/cvat/apps/analytics_report/report/create.py +++ b/cvat/apps/analytics_report/report/create.py @@ -66,20 +66,20 @@ def _get_scheduler(self): def _get_queue(self): return django_rq.get_queue(settings.CVAT_QUEUES.ANALYTICS_REPORTS.value) - def _make_queue_job_prefix(self, obj) -> str: + def _make_queue_job_id_base(self, obj) -> str: if isinstance(obj, Task): - return f"{self._QUEUE_JOB_PREFIX_TASK}{obj.id}-" + return f"{self._QUEUE_JOB_PREFIX_TASK}{obj.id}" else: - return f"{self._QUEUE_JOB_PREFIX_PROJECT}{obj.id}-" + return f"{self._QUEUE_JOB_PREFIX_PROJECT}{obj.id}" def _make_custom_analytics_check_job_id(self) -> str: return uuid4().hex - def _make_initial_queue_job_id(self, obj) -> str: - return f"{self._make_queue_job_prefix(obj)}initial" + def _make_queue_job_id(self, obj, start_time: timezone.datetime) -> str: + return f"{self._make_queue_job_id_base(obj)}-{start_time.timestamp()}" - def _make_regular_queue_job_id(self, obj, start_time: timezone.datetime) -> str: - return f"{self._make_queue_job_prefix(obj)}{start_time.timestamp()}" + def _make_autoupdate_blocker_key(self, obj) -> str: + return f"cvat:analytics:autoupdate-blocker:{self._make_queue_job_id_base(obj)}" @classmethod def _get_last_report_time(cls, obj): @@ -90,39 +90,6 @@ def _get_last_report_time(cls, obj): except ObjectDoesNotExist: return None - def _find_next_job_id(self, existing_job_ids, obj, *, now) -> str: - job_id_prefix = self._make_queue_job_prefix(obj) - - def _get_timestamp(job_id: str) -> timezone.datetime: - job_timestamp = job_id.split(job_id_prefix, maxsplit=1)[-1] - if job_timestamp == "initial": - return timezone.datetime.min.replace(tzinfo=timezone.utc) - else: - return timezone.datetime.fromtimestamp(float(job_timestamp), tz=timezone.utc) - - max_job_id = max( - (j for j in existing_job_ids if j.startswith(job_id_prefix)), - key=_get_timestamp, - default=None, - ) - max_timestamp = _get_timestamp(max_job_id) if max_job_id else None - - last_update_time = self._get_last_report_time(obj) - if last_update_time is None: - # Report has never been computed, is queued, or is being computed - queue_job_id = self._make_initial_queue_job_id(obj) - elif max_timestamp is not None and now < max_timestamp: - # Reuse the existing next job - queue_job_id = max_job_id - else: - # Add an updating job in the queue in the next time frame - delay = self._get_analytics_check_job_delay() - intervals = max(1, 1 + (now - last_update_time) // delay) - next_update_time = last_update_time + delay * intervals - queue_job_id = self._make_regular_queue_job_id(obj, next_update_time) - - return queue_job_id - class AnalyticsReportsNotAvailable(Exception): pass @@ -133,9 +100,6 @@ def schedule_analytics_report_autoupdate_job(self, *, job=None, task=None, proje delay = self._get_analytics_check_job_delay() next_job_time = now.utcnow() + delay - scheduler = self._get_scheduler() - existing_job_ids = set(j.id for j in scheduler.get_jobs(until=next_job_time)) - target_obj = None cvat_project_id = None cvat_task_id = None @@ -157,9 +121,19 @@ def schedule_analytics_report_autoupdate_job(self, *, job=None, task=None, proje target_obj = project cvat_project_id = project.id - queue_job_id = self._find_next_job_id(existing_job_ids, target_obj, now=now) - if queue_job_id not in existing_job_ids: - scheduler.enqueue_at( + with django_rq.get_connection(settings.CVAT_QUEUES.ANALYTICS_REPORTS.value) as connection: + # The blocker key is used to avoid scheduling a report update job + # for every single change. The first time this method is called + # for a given object, we schedule the job and create a blocker + # that expires at the same time as the job is supposed to start. + # Until the blocker expires, we don't schedule any more jobs. + blocker_key = self._make_autoupdate_blocker_key(target_obj) + if connection.exists(blocker_key): + return + + queue_job_id = self._make_queue_job_id(target_obj, next_job_time) + + self._get_scheduler().enqueue_at( next_job_time, self._check_analytics_report, cvat_task_id=cvat_task_id, @@ -167,6 +141,8 @@ def schedule_analytics_report_autoupdate_job(self, *, job=None, task=None, proje job_id=queue_job_id, ) + connection.set(blocker_key, queue_job_id, exat=next_job_time) + def schedule_analytics_check_job(self, *, job=None, task=None, project=None, user_id): rq_id = self._make_custom_analytics_check_job_id()