From 78334b95bc5ff69816fcaeafff8f67fadf0727c7 Mon Sep 17 00:00:00 2001 From: Ryan Skonnord Date: Fri, 14 Jun 2024 13:16:00 -0700 Subject: [PATCH] fix(reports): Use duplicate report detection to halt delivery (#72730) Upgrade the Redis-based weekly report duplicate delivery detection so that it halts the duplicate delivery instead of merely logging an error. Refactor the context manager to a small class, because a context manager can't actually interrupt the body of the `with`-block. --- src/sentry/tasks/summaries/weekly_reports.py | 101 ++++++++++++------- tests/sentry/tasks/test_weekly_reports.py | 2 +- 2 files changed, 65 insertions(+), 38 deletions(-) diff --git a/src/sentry/tasks/summaries/weekly_reports.py b/src/sentry/tasks/summaries/weekly_reports.py index c8c9f76ae42d04..b9795bc402c46f 100644 --- a/src/sentry/tasks/summaries/weekly_reports.py +++ b/src/sentry/tasks/summaries/weekly_reports.py @@ -3,8 +3,7 @@ import heapq import logging import uuid -from collections.abc import Generator, Mapping -from contextlib import contextmanager +from collections.abc import Mapping from dataclasses import dataclass from datetime import timedelta from functools import partial @@ -13,6 +12,7 @@ import sentry_sdk from django.db.models import F from django.utils import dateformat, timezone +from rb.clients import LocalClient from sentry_sdk import set_tag from sentry import analytics @@ -291,8 +291,10 @@ def _send_to_user(self, user_template_context: Mapping[str, Any]) -> None: template_context: Mapping[str, Any] | None = user_template_context.get("context") user_id: int | None = user_template_context.get("user_id") if template_context and user_id: - with self._check_for_duplicate_delivery(user_id): + dupe_check = _DuplicateDeliveryCheck(self, user_id) + if not dupe_check.check_for_duplicate_delivery(): self.send_email(template_ctx=template_context, user_id=user_id) + dupe_check.record_delivery() def send_email(self, template_ctx: Mapping[str, Any], user_id: int) -> None: message = MessageBuilder( @@ -331,41 +333,66 @@ def send_email(self, template_ctx: Mapping[str, Any], user_id: int) -> None: message.add_users((user_id,)) message.send_async() - @contextmanager - def _check_for_duplicate_delivery(self, user_id: int) -> Generator[None, None, None]: - """Attempt to prevent duplicate deliveries of the same report.""" - def log_error(msg: str) -> None: - extra = { - "batch_id": self.batch_id, - "organization": self.ctx.organization.id, - "user_id": user_id, - "has_email_override": bool(self.email_override), - } - logger.error(msg, extra=extra) - - cluster = redis.clusters.get("default").get_local_client_for_key("weekly_reports") - name_parts = (self.batch_id, self.ctx.organization.id, user_id) - name = ":".join(str(part) for part in name_parts) - - count_before = int(cluster.get(name) or 0) - if count_before > 0: - # When we have more confidence in this approach, we can upgrade this to - # an exception, thereby preventing the duplicate send. - log_error("weekly_report.delivery_record.duplicate_detected") - - # Dispatch the send operation. There is no lock for concurrency, which leaves - # open the possibility of a race condition, in case another thread or server - # node received a duplicate Celery task somehow. But we do not think this is - # a likely failure mode. - yield - - count_after = cluster.incr(name) - cluster.expire(name, timedelta(days=1)) - if count_after > count_before + 1: - # The `cluster.incr` operation is atomic, so if concurrent duplicates are - # happening, this should reliably detect them after the fact. - log_error("weekly_report.delivery_record.concurrent_detected") +class _DuplicateDeliveryCheck: + def __init__(self, batch: OrganizationReportBatch, user_id: int): + self.batch = batch + self.user_id = user_id + + # Tracks state from `check_for_duplicate_delivery` to `record_delivery` + self.count: int | None = None + + def _get_redis_cluster(self) -> LocalClient: + return redis.clusters.get("default").get_local_client_for_key("weekly_reports") + + @property + def _redis_name(self) -> str: + name_parts = (self.batch.batch_id, self.batch.ctx.organization.id, self.user_id) + return ":".join(str(part) for part in name_parts) + + def _get_log_extras(self) -> dict[str, Any]: + return { + "batch_id": self.batch.batch_id, + "organization": self.batch.ctx.organization.id, + "user_id": self.user_id, + "has_email_override": bool(self.batch.email_override), + } + + def check_for_duplicate_delivery(self) -> bool: + """Check whether this delivery has been recorded in Redis already.""" + if self.count is not None: + raise ValueError("This object has already checked a delivery") + cluster = self._get_redis_cluster() + self.count = int(cluster.get(self._redis_name) or 0) + + is_duplicate_detected = self.count > 0 + if is_duplicate_detected: + logger.error( + "weekly_report.delivery_record.duplicate_detected", extra=self._get_log_extras() + ) + return is_duplicate_detected + + def record_delivery(self) -> bool: + """Record in Redis that the delivery was completed successfully.""" + if self.count is None: + raise ValueError("This object has not had `check_for_duplicate_delivery` called yet") + cluster = self._get_redis_cluster() + count_after = cluster.incr(self._redis_name) + cluster.expire(self._redis_name, timedelta(days=1)) + + is_duplicate_detected = count_after > self.count + 1 + if is_duplicate_detected: + # There is no lock for concurrency, which leaves open the possibility of + # a race condition, in case another thread or server node received a + # duplicate Celery task somehow. But we do not think this is a likely + # failure mode. + # + # Nonetheless, the `cluster.incr` operation is atomic, so if concurrent + # duplicates are happening, this should reliably detect them after the fact. + logger.error( + "weekly_report.delivery_record.concurrent_detected", extra=self._get_log_extras() + ) + return is_duplicate_detected project_breakdown_colors = ["#422C6E", "#895289", "#D6567F", "#F38150", "#F2B713"] diff --git a/tests/sentry/tasks/test_weekly_reports.py b/tests/sentry/tasks/test_weekly_reports.py index bd4a9916c5c4ca..74052abcdb7ca8 100644 --- a/tests/sentry/tasks/test_weekly_reports.py +++ b/tests/sentry/tasks/test_weekly_reports.py @@ -1012,7 +1012,7 @@ def test_duplicate_detection(self, mock_send_email, mock_prepare_template_contex # Duplicate send OrganizationReportBatch(ctx, batch_id).deliver_reports() - assert mock_send_email.call_count == 2 # When we halt instead of logging, expect 1 instead + assert mock_send_email.call_count == 1 assert mock_logger.error.call_count == 1 mock_logger.error.assert_called_once_with( "weekly_report.delivery_record.duplicate_detected",