From 636c4d86573bc5a752aa07cfaddea9c70f6a5511 Mon Sep 17 00:00:00 2001 From: Roman Donchenko Date: Thu, 8 Feb 2024 18:50:12 +0200 Subject: [PATCH] Defer most signal handling code until after the end of the current transaction Non-database actions like deleting directories, sending webhooks, scheduling reports should only be done after the current transaction is committed. If we do it immediately, and the transaction is later aborted, then we will have (for example) sent a webhook about an event that didn't actually happen. There's also a secondary benefit to moving this action outside of the transaction; the less time we spend inside a transaction, the better, since a transaction may lock out other clients from working on the affected DB rows. In addition, prevent the affected actions from crashing the view handler with an exception (using the `robust=True` option). I don't think it's reasonable to (for example) return a 500 response to a `PATCH` request just because we failed to send the corresponding webhook. There is one more type of action that should be modified in this way (sending events), but it would be easier to do that after a refactoring that I did in another patch, so I'll do it later. --- ...9_171518_roman_handle_after_transaction.md | 6 ++++++ cvat/apps/analytics_report/signals.py | 18 ++++++++++++---- cvat/apps/engine/signals.py | 21 +++++++++++++------ cvat/apps/engine/tests/test_rest_api.py | 8 +++++-- cvat/apps/quality_control/signals.py | 12 +++++++++-- cvat/apps/webhooks/signals.py | 13 +++++++++--- 6 files changed, 61 insertions(+), 17 deletions(-) create mode 100644 changelog.d/20240209_171518_roman_handle_after_transaction.md diff --git a/changelog.d/20240209_171518_roman_handle_after_transaction.md b/changelog.d/20240209_171518_roman_handle_after_transaction.md new file mode 100644 index 000000000000..8461dd14f670 --- /dev/null +++ b/changelog.d/20240209_171518_roman_handle_after_transaction.md @@ -0,0 +1,6 @@ +### Fixed + +- Side effects of data changes, such as the sending of webhooks, + are no longer triggered until after the changes have been committed + to the database + () diff --git a/cvat/apps/analytics_report/signals.py b/cvat/apps/analytics_report/signals.py index 0b7f86a02e0f..5de53675a7c7 100644 --- a/cvat/apps/analytics_report/signals.py +++ b/cvat/apps/analytics_report/signals.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: MIT +from django.db import transaction from django.db.models.signals import post_save from django.dispatch import receiver @@ -19,12 +20,21 @@ ) def __save_job__update_analytics_report(instance, created, **kwargs): if isinstance(instance, Project): - AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(project=instance) + kwargs = {"project": instance} elif isinstance(instance, Task): - AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(task=instance) + kwargs = {"task": instance} elif isinstance(instance, Job): - AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(job=instance) + kwargs = {"job": instance} elif isinstance(instance, Annotation): - AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(job=instance.job) + kwargs = {"job": instance.job} else: assert False + + def schedule_autoupdate_job(): + if any(v.id is None for v in kwargs.values()): + # The object may have been deleted after the on_commit call. + return + + AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(**kwargs) + + transaction.on_commit(schedule_autoupdate_job, robust=True) diff --git a/cvat/apps/engine/signals.py b/cvat/apps/engine/signals.py index bc69dcf2a7e5..570325c5a58d 100644 --- a/cvat/apps/engine/signals.py +++ b/cvat/apps/engine/signals.py @@ -2,9 +2,11 @@ # Copyright (C) 2023 CVAT.ai Corporation # # SPDX-License-Identifier: MIT +import functools import shutil from django.contrib.auth.models import User +from django.db import transaction from django.db.models.signals import post_delete, post_save from django.dispatch import receiver @@ -45,17 +47,21 @@ def __save_user_handler(instance, **kwargs): @receiver(post_delete, sender=Project, dispatch_uid=__name__ + ".delete_project_handler") def __delete_project_handler(instance, **kwargs): - shutil.rmtree(instance.get_dirname(), ignore_errors=True) + transaction.on_commit( + functools.partial(shutil.rmtree, instance.get_dirname(), ignore_errors=True)) @receiver(post_delete, sender=Asset, dispatch_uid=__name__ + ".__delete_asset_handler") def __delete_asset_handler(instance, **kwargs): - shutil.rmtree(instance.get_asset_dir(), ignore_errors=True) + transaction.on_commit( + functools.partial(shutil.rmtree, instance.get_asset_dir(), ignore_errors=True)) @receiver(post_delete, sender=Task, dispatch_uid=__name__ + ".delete_task_handler") def __delete_task_handler(instance, **kwargs): - shutil.rmtree(instance.get_dirname(), ignore_errors=True) + transaction.on_commit( + functools.partial(shutil.rmtree, instance.get_dirname(), ignore_errors=True)) + if instance.data and not instance.data.tasks.exists(): instance.data.delete() @@ -69,14 +75,17 @@ def __delete_task_handler(instance, **kwargs): @receiver(post_delete, sender=Job, dispatch_uid=__name__ + ".delete_job_handler") def __delete_job_handler(instance, **kwargs): - shutil.rmtree(instance.get_dirname(), ignore_errors=True) + transaction.on_commit( + functools.partial(shutil.rmtree, instance.get_dirname(), ignore_errors=True)) @receiver(post_delete, sender=Data, dispatch_uid=__name__ + ".delete_data_handler") def __delete_data_handler(instance, **kwargs): - shutil.rmtree(instance.get_data_dirname(), ignore_errors=True) + transaction.on_commit( + functools.partial(shutil.rmtree, instance.get_data_dirname(), ignore_errors=True)) @receiver(post_delete, sender=CloudStorage, dispatch_uid=__name__ + ".delete_cloudstorage_handler") def __delete_cloudstorage_handler(instance, **kwargs): - shutil.rmtree(instance.get_storage_dirname(), ignore_errors=True) + transaction.on_commit( + functools.partial(shutil.rmtree, instance.get_storage_dirname(), ignore_errors=True)) diff --git a/cvat/apps/engine/tests/test_rest_api.py b/cvat/apps/engine/tests/test_rest_api.py index b83303071c16..60c2a3fdb4b3 100644 --- a/cvat/apps/engine/tests/test_rest_api.py +++ b/cvat/apps/engine/tests/test_rest_api.py @@ -933,7 +933,8 @@ def test_api_v2_projects_delete_project_data_after_delete_project(self): task_dir = task.get_dirname() self.assertTrue(os.path.exists(task_dir)) - self._check_api_v2_projects_id(self.admin) + with self.captureOnCommitCallbacks(execute=True): + self._check_api_v2_projects_id(self.admin) for project in self.projects: project_dir = project.get_dirname() @@ -2019,7 +2020,10 @@ def test_api_v2_tasks_delete_task_data_after_delete_task(self): for task in self.tasks: task_dir = task.get_dirname() self.assertTrue(os.path.exists(task_dir)) - self._check_api_v2_tasks_id(self.admin) + + with self.captureOnCommitCallbacks(execute=True): + self._check_api_v2_tasks_id(self.admin) + for task in self.tasks: task_dir = task.get_dirname() self.assertFalse(os.path.exists(task_dir)) diff --git a/cvat/apps/quality_control/signals.py b/cvat/apps/quality_control/signals.py index 609e1abe88f2..7371608c3ce9 100644 --- a/cvat/apps/quality_control/signals.py +++ b/cvat/apps/quality_control/signals.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: MIT +from django.db import transaction from django.db.models.signals import post_save from django.dispatch import receiver @@ -37,8 +38,15 @@ def __save_job__update_quality_metrics(instance, created, **kwargs): else: assert False - for task in tasks: - qc.QualityReportUpdateManager().schedule_quality_autoupdate_job(task) + def schedule_autoupdate_jobs(): + for task in tasks: + if task.id is None: + # The task may have been deleted after the on_commit call. + continue + + qc.QualityReportUpdateManager().schedule_quality_autoupdate_job(task) + + transaction.on_commit(schedule_autoupdate_jobs, robust=True) @receiver(post_save, sender=Task, dispatch_uid=__name__ + ".save_task-initialize_quality_settings") diff --git a/cvat/apps/webhooks/signals.py b/cvat/apps/webhooks/signals.py index 9e381dd22b89..0d34950cf6ff 100644 --- a/cvat/apps/webhooks/signals.py +++ b/cvat/apps/webhooks/signals.py @@ -12,6 +12,7 @@ import requests from django.conf import settings from django.core.exceptions import ObjectDoesNotExist +from django.db import transaction from django.db.models.signals import (post_delete, post_save, pre_delete, pre_save) from django.dispatch import Signal, receiver @@ -186,7 +187,10 @@ def post_save_resource_event(sender, instance, created, **kwargs): else: return - batch_add_to_queue(filtered_webhooks, data) + transaction.on_commit( + lambda: batch_add_to_queue(filtered_webhooks, data), + robust=True, + ) @receiver(pre_delete, sender=Project, dispatch_uid=__name__ + ":project:pre_delete") @@ -232,9 +236,12 @@ def post_delete_resource_event(sender, instance, **kwargs): "sender": get_sender(instance), } - batch_add_to_queue(filtered_webhooks, data) related_webhooks = [webhook for webhook in getattr(instance, "_related_webhooks", []) if webhook.id not in map(lambda a: a.id, filtered_webhooks)] - batch_add_to_queue(related_webhooks, data) + + transaction.on_commit( + lambda: batch_add_to_queue(filtered_webhooks + related_webhooks, data), + robust=True, + ) @receiver(signal_redelivery)