From d5c9975f95cc9d53ba5c4677459c46e728985e69 Mon Sep 17 00:00:00 2001 From: Katie Byers Date: Mon, 22 Jul 2024 23:12:15 -0700 Subject: [PATCH] add test helpers for mocking state --- tests/sentry/utils/test_circuit_breaker2.py | 189 +++++++++++++++++++- 1 file changed, 187 insertions(+), 2 deletions(-) diff --git a/tests/sentry/utils/test_circuit_breaker2.py b/tests/sentry/utils/test_circuit_breaker2.py index 50de3806a3b962..5cb4370b09e304 100644 --- a/tests/sentry/utils/test_circuit_breaker2.py +++ b/tests/sentry/utils/test_circuit_breaker2.py @@ -1,12 +1,19 @@ +import time +from typing import Any from unittest import TestCase from unittest.mock import ANY, MagicMock, patch from django.conf import settings from redis.client import Pipeline -from sentry.ratelimits.sliding_windows import Quota, RedisSlidingWindowRateLimiter +from sentry.ratelimits.sliding_windows import ( + GrantedQuota, + Quota, + RedisSlidingWindowRateLimiter, + RequestedQuota, +) from sentry.testutils.helpers.datetime import freeze_time -from sentry.utils.circuit_breaker2 import CircuitBreaker, CircuitBreakerConfig +from sentry.utils.circuit_breaker2 import CircuitBreaker, CircuitBreakerConfig, CircuitBreakerState # Note: These need to be relatively big. If the limit is too low, the RECOVERY quota isn't big # enough to be useful, and if the window is too short, redis (which doesn't seem to listen to the @@ -31,6 +38,184 @@ class MockCircuitBreaker(CircuitBreaker): level. """ + def _set_breaker_state( + self, state: CircuitBreakerState, seconds_left: int | None = None + ) -> None: + """ + Adjust redis keys to force the breaker into the given state. If no remaining seconds are + given, puts the breaker at the beginning of its time in the given state. + """ + now = int(time.time()) + + if state == CircuitBreakerState.OK: + self._delete_from_redis([self.broken_state_key, self.recovery_state_key]) + + elif state == CircuitBreakerState.BROKEN: + broken_state_timeout = seconds_left or self.broken_state_duration + broken_state_end = now + broken_state_timeout + recovery_timeout = broken_state_timeout + self.recovery_duration + recovery_end = now + recovery_timeout + + self._set_in_redis( + [ + (self.broken_state_key, broken_state_end, broken_state_timeout), + (self.recovery_state_key, recovery_end, recovery_timeout), + ] + ) + + elif state == CircuitBreakerState.RECOVERY: + recovery_timeout = seconds_left or self.recovery_duration + recovery_end = now + recovery_timeout + + self._delete_from_redis([self.broken_state_key]) + self._set_in_redis([(self.recovery_state_key, recovery_end, recovery_timeout)]) + + assert self._get_state_and_remaining_time() == ( + state, + ( + None + if state == CircuitBreakerState.OK + else ( + broken_state_timeout + if state == CircuitBreakerState.BROKEN + else recovery_timeout + ) + ), + ) + + def _add_quota_usage( + self, + quota: Quota, + amount_used: int, + granule_or_window_end: int | None = None, + ) -> None: + """ + Add to the usage total of the given quota, in the granule or window ending at the given + time. If a window (rather than a granule) end time is given, usage will be added to the + final granule. + + If no end time is given, the current time will be used. + """ + now = int(time.time()) + window_end_time = granule_or_window_end or now + + self.limiter.use_quotas( + [RequestedQuota(self.key, amount_used, [quota])], + [GrantedQuota(self.key, amount_used, [])], + window_end_time, + ) + + def _clear_quota(self, quota: Quota, window_end: int | None = None) -> list[int]: + """ + Clear usage of the given quota up until the end of the given time window. If no window end + is given, clear the quota up to the present. + + Returns the list of granule values which were cleared. + """ + now = int(time.time()) + window_end_time = window_end or now + granule_end_times = self._get_granule_end_times(quota, window_end_time) + num_granules = len(granule_end_times) + previous_granule_values = [0] * num_granules + + current_total_quota_used = quota.limit - self._get_remaining_error_quota( + quota, window_end_time + ) + if current_total_quota_used != 0: + # Empty the granules one by one, starting with the oldest. + # + # To empty each granule, we need to add negative quota usage, which means we need to + # know how much usage is currently in each granule. Unfortunately, the limiter will only + # report quota usage at the window level, not the granule level. To get around this, we + # start with a window ending with the oldest granule. Any granules before it will have + # expired, so the window usage will equal the granule usage.ending in that granule will + # have a total usage equal to that of the granule. + # + # Once we zero-out the granule, we can move the window one granule forward. It will now + # consist of expired granules, the granule we just set to 0, and the granule we care + # about. Thus the window usage will again match the granule usage, which we can use to + # empty the granule. We then just repeat the pattern until we've reached the end of the + # window we want to clear. + for i, granule_end_time in enumerate(granule_end_times): + granule_quota_used = quota.limit - self._get_remaining_error_quota( + quota, granule_end_time + ) + previous_granule_values[i] = granule_quota_used + self._add_quota_usage(quota, -granule_quota_used, granule_end_time) + + new_total_quota_used = quota.limit - self._get_remaining_error_quota( + quota, window_end_time + ) + assert new_total_quota_used == 0 + + return previous_granule_values + + def _get_granule_end_times( + self, quota: Quota, window_end: int, newest_first: bool = False + ) -> list[int]: + """ + Given a quota and the end of the time window it's covering, return the timestamps + corresponding to the end of each granule. + """ + window_duration = quota.window_seconds + granule_duration = quota.granularity_seconds + num_granules = window_duration // granule_duration + + # Walk backwards through the granules + end_times_newest_first = [ + window_end - num_granules_ago * granule_duration + for num_granules_ago in range(num_granules) + ] + + return end_times_newest_first if newest_first else list(reversed(end_times_newest_first)) + + def _set_granule_values( + self, + quota: Quota, + values: list[int | None], + window_end: int | None = None, + ) -> None: + """ + Set the usage in each granule of the given quota, for the time window ending at the given + time. + + If no ending time is given, the current time is used. + + The list of values should be ordered from oldest to newest and must contain the same number + of elements as the window has granules. To only change some of the values, pass `None` in + the spot of any value which should remain unchanged. (For example, in a two-granule window, + to only change the older granule, pass `[3, None]`.) + """ + window_duration = quota.window_seconds + granule_duration = quota.granularity_seconds + num_granules = window_duration // granule_duration + + if len(values) != num_granules: + raise Exception( + f"Exactly {num_granules} granule values must be provided. " + + "To leave an existing value as is, include `None` in its spot." + ) + + now = int(time.time()) + window_end_time = window_end or now + + previous_values = self._clear_quota(quota, window_end_time) + + for i, granule_end_time, value in zip( + range(num_granules), self._get_granule_end_times(quota, window_end_time), values + ): + # When we cleared the quota above, we set each granule's value to 0, so here "adding" + # usage is actually setting usage + if value is not None: + self._add_quota_usage(quota, value, granule_end_time) + else: + self._add_quota_usage(quota, previous_values[i], granule_end_time) + + def _delete_from_redis(self, keys: list[str]) -> Any: + for key in keys: + self.redis_pipeline.delete(key) + return self.redis_pipeline.execute() + @freeze_time() class CircuitBreakerTest(TestCase):