Skip to content

Commit

Permalink
Make analytics report update job scheduling more efficient
Browse files Browse the repository at this point in the history
Currently, `schedule_analytics_report_autoupdate_job` attempts to debounce
job scheduling by examining existing jobs before scheduling a new one.
Unfortunately, the `scheduler.get_jobs` function, which it uses for this
purpose, scales poorly. Not only does it fetch a list of all scheduled jobs
(and not just ones related to the current object), but it then fetches
information about every job, one by one. The current logic doesn't even need
this information, but RQ Scheduler provides no method to get just the IDs.

Replace the current logic with a new lightweight approach that uses a custom
Redis key to block scheduling of additional jobs.
  • Loading branch information
SpecLad committed Mar 8, 2024
1 parent bfb902f commit 0f78ac8
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Fixed

- Made analytics report update job scheduling more efficient
(<https://github.com/opencv/cvat/pull/7576>)
66 changes: 21 additions & 45 deletions cvat/apps/analytics_report/report/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,18 @@ def _get_queue(self):

def _make_queue_job_prefix(self, obj) -> str:
if isinstance(obj, Task):
return f"{self._QUEUE_JOB_PREFIX_TASK}{obj.id}-"
return f"{self._QUEUE_JOB_PREFIX_TASK}{obj.id}"
else:
return f"{self._QUEUE_JOB_PREFIX_PROJECT}{obj.id}-"
return f"{self._QUEUE_JOB_PREFIX_PROJECT}{obj.id}"

def _make_custom_analytics_check_job_id(self) -> str:
return uuid4().hex

def _make_initial_queue_job_id(self, obj) -> str:
return f"{self._make_queue_job_prefix(obj)}initial"
def _make_queue_job_id(self, obj, start_time: timezone.datetime) -> str:
return f"{self._make_queue_job_prefix(obj)}-{start_time.timestamp()}"

def _make_regular_queue_job_id(self, obj, start_time: timezone.datetime) -> str:
return f"{self._make_queue_job_prefix(obj)}{start_time.timestamp()}"
def _make_autoupdate_blocker_key(self, obj) -> str:
return f"cvat:analytics:autoupdate-blocker:{self._make_queue_job_prefix(obj)}"

@classmethod
def _get_last_report_time(cls, obj):
Expand All @@ -90,39 +90,6 @@ def _get_last_report_time(cls, obj):
except ObjectDoesNotExist:
return None

def _find_next_job_id(self, existing_job_ids, obj, *, now) -> str:
job_id_prefix = self._make_queue_job_prefix(obj)

def _get_timestamp(job_id: str) -> timezone.datetime:
job_timestamp = job_id.split(job_id_prefix, maxsplit=1)[-1]
if job_timestamp == "initial":
return timezone.datetime.min.replace(tzinfo=timezone.utc)
else:
return timezone.datetime.fromtimestamp(float(job_timestamp), tz=timezone.utc)

max_job_id = max(
(j for j in existing_job_ids if j.startswith(job_id_prefix)),
key=_get_timestamp,
default=None,
)
max_timestamp = _get_timestamp(max_job_id) if max_job_id else None

last_update_time = self._get_last_report_time(obj)
if last_update_time is None:
# Report has never been computed, is queued, or is being computed
queue_job_id = self._make_initial_queue_job_id(obj)
elif max_timestamp is not None and now < max_timestamp:
# Reuse the existing next job
queue_job_id = max_job_id
else:
# Add an updating job in the queue in the next time frame
delay = self._get_analytics_check_job_delay()
intervals = max(1, 1 + (now - last_update_time) // delay)
next_update_time = last_update_time + delay * intervals
queue_job_id = self._make_regular_queue_job_id(obj, next_update_time)

return queue_job_id

class AnalyticsReportsNotAvailable(Exception):
pass

Expand All @@ -133,9 +100,6 @@ def schedule_analytics_report_autoupdate_job(self, *, job=None, task=None, proje
delay = self._get_analytics_check_job_delay()
next_job_time = now.utcnow() + delay

scheduler = self._get_scheduler()
existing_job_ids = set(j.id for j in scheduler.get_jobs(until=next_job_time))

target_obj = None
cvat_project_id = None
cvat_task_id = None
Expand All @@ -157,16 +121,28 @@ def schedule_analytics_report_autoupdate_job(self, *, job=None, task=None, proje
target_obj = project
cvat_project_id = project.id

queue_job_id = self._find_next_job_id(existing_job_ids, target_obj, now=now)
if queue_job_id not in existing_job_ids:
scheduler.enqueue_at(
with django_rq.get_connection(settings.CVAT_QUEUES.ANALYTICS_REPORTS.value) as connection:
# The blocker key is used to avoid scheduling a report update job
# for every single change. The first time this method is called
# for a given object, we schedule the job and create a blocker
# that expires at the same time as the job is supposed to start.
# Until the blocker expires, we don't schedule any more jobs.
blocker_key = self._make_autoupdate_blocker_key(target_obj)
if connection.exists(blocker_key):
return

queue_job_id = self._make_queue_job_id(target_obj, next_job_time)

self._get_scheduler().enqueue_at(
next_job_time,
self._check_analytics_report,
cvat_task_id=cvat_task_id,
cvat_project_id=cvat_project_id,
job_id=queue_job_id,
)

connection.set(blocker_key, queue_job_id, exat=next_job_time)

def schedule_analytics_check_job(self, *, job=None, task=None, project=None, user_id):
rq_id = self._make_custom_analytics_check_job_id()

Expand Down

0 comments on commit 0f78ac8

Please sign in to comment.