Skip to content

Commit

Permalink
add test helpers for mocking state
Browse files Browse the repository at this point in the history
  • Loading branch information
lobsterkatie committed Jul 23, 2024
1 parent 5ffd306 commit d5c9975
Showing 1 changed file with 187 additions and 2 deletions.
189 changes: 187 additions & 2 deletions tests/sentry/utils/test_circuit_breaker2.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import time
from typing import Any
from unittest import TestCase
from unittest.mock import ANY, MagicMock, patch

from django.conf import settings
from redis.client import Pipeline

from sentry.ratelimits.sliding_windows import Quota, RedisSlidingWindowRateLimiter
from sentry.ratelimits.sliding_windows import (
GrantedQuota,
Quota,
RedisSlidingWindowRateLimiter,
RequestedQuota,
)
from sentry.testutils.helpers.datetime import freeze_time
from sentry.utils.circuit_breaker2 import CircuitBreaker, CircuitBreakerConfig
from sentry.utils.circuit_breaker2 import CircuitBreaker, CircuitBreakerConfig, CircuitBreakerState

# Note: These need to be relatively big. If the limit is too low, the RECOVERY quota isn't big
# enough to be useful, and if the window is too short, redis (which doesn't seem to listen to the
Expand All @@ -31,6 +38,184 @@ class MockCircuitBreaker(CircuitBreaker):
level.
"""

def _set_breaker_state(
self, state: CircuitBreakerState, seconds_left: int | None = None
) -> None:
"""
Adjust redis keys to force the breaker into the given state. If no remaining seconds are
given, puts the breaker at the beginning of its time in the given state.
"""
now = int(time.time())

if state == CircuitBreakerState.OK:
self._delete_from_redis([self.broken_state_key, self.recovery_state_key])

elif state == CircuitBreakerState.BROKEN:
broken_state_timeout = seconds_left or self.broken_state_duration
broken_state_end = now + broken_state_timeout
recovery_timeout = broken_state_timeout + self.recovery_duration
recovery_end = now + recovery_timeout

self._set_in_redis(
[
(self.broken_state_key, broken_state_end, broken_state_timeout),
(self.recovery_state_key, recovery_end, recovery_timeout),
]
)

elif state == CircuitBreakerState.RECOVERY:
recovery_timeout = seconds_left or self.recovery_duration
recovery_end = now + recovery_timeout

self._delete_from_redis([self.broken_state_key])
self._set_in_redis([(self.recovery_state_key, recovery_end, recovery_timeout)])

assert self._get_state_and_remaining_time() == (
state,
(
None
if state == CircuitBreakerState.OK
else (
broken_state_timeout
if state == CircuitBreakerState.BROKEN
else recovery_timeout
)
),
)

def _add_quota_usage(
self,
quota: Quota,
amount_used: int,
granule_or_window_end: int | None = None,
) -> None:
"""
Add to the usage total of the given quota, in the granule or window ending at the given
time. If a window (rather than a granule) end time is given, usage will be added to the
final granule.
If no end time is given, the current time will be used.
"""
now = int(time.time())
window_end_time = granule_or_window_end or now

self.limiter.use_quotas(
[RequestedQuota(self.key, amount_used, [quota])],
[GrantedQuota(self.key, amount_used, [])],
window_end_time,
)

def _clear_quota(self, quota: Quota, window_end: int | None = None) -> list[int]:
"""
Clear usage of the given quota up until the end of the given time window. If no window end
is given, clear the quota up to the present.
Returns the list of granule values which were cleared.
"""
now = int(time.time())
window_end_time = window_end or now
granule_end_times = self._get_granule_end_times(quota, window_end_time)
num_granules = len(granule_end_times)
previous_granule_values = [0] * num_granules

current_total_quota_used = quota.limit - self._get_remaining_error_quota(
quota, window_end_time
)
if current_total_quota_used != 0:
# Empty the granules one by one, starting with the oldest.
#
# To empty each granule, we need to add negative quota usage, which means we need to
# know how much usage is currently in each granule. Unfortunately, the limiter will only
# report quota usage at the window level, not the granule level. To get around this, we
# start with a window ending with the oldest granule. Any granules before it will have
# expired, so the window usage will equal the granule usage.ending in that granule will
# have a total usage equal to that of the granule.
#
# Once we zero-out the granule, we can move the window one granule forward. It will now
# consist of expired granules, the granule we just set to 0, and the granule we care
# about. Thus the window usage will again match the granule usage, which we can use to
# empty the granule. We then just repeat the pattern until we've reached the end of the
# window we want to clear.
for i, granule_end_time in enumerate(granule_end_times):
granule_quota_used = quota.limit - self._get_remaining_error_quota(
quota, granule_end_time
)
previous_granule_values[i] = granule_quota_used
self._add_quota_usage(quota, -granule_quota_used, granule_end_time)

new_total_quota_used = quota.limit - self._get_remaining_error_quota(
quota, window_end_time
)
assert new_total_quota_used == 0

return previous_granule_values

def _get_granule_end_times(
self, quota: Quota, window_end: int, newest_first: bool = False
) -> list[int]:
"""
Given a quota and the end of the time window it's covering, return the timestamps
corresponding to the end of each granule.
"""
window_duration = quota.window_seconds
granule_duration = quota.granularity_seconds
num_granules = window_duration // granule_duration

# Walk backwards through the granules
end_times_newest_first = [
window_end - num_granules_ago * granule_duration
for num_granules_ago in range(num_granules)
]

return end_times_newest_first if newest_first else list(reversed(end_times_newest_first))

def _set_granule_values(
self,
quota: Quota,
values: list[int | None],
window_end: int | None = None,
) -> None:
"""
Set the usage in each granule of the given quota, for the time window ending at the given
time.
If no ending time is given, the current time is used.
The list of values should be ordered from oldest to newest and must contain the same number
of elements as the window has granules. To only change some of the values, pass `None` in
the spot of any value which should remain unchanged. (For example, in a two-granule window,
to only change the older granule, pass `[3, None]`.)
"""
window_duration = quota.window_seconds
granule_duration = quota.granularity_seconds
num_granules = window_duration // granule_duration

if len(values) != num_granules:
raise Exception(
f"Exactly {num_granules} granule values must be provided. "
+ "To leave an existing value as is, include `None` in its spot."
)

now = int(time.time())
window_end_time = window_end or now

previous_values = self._clear_quota(quota, window_end_time)

for i, granule_end_time, value in zip(
range(num_granules), self._get_granule_end_times(quota, window_end_time), values
):
# When we cleared the quota above, we set each granule's value to 0, so here "adding"
# usage is actually setting usage
if value is not None:
self._add_quota_usage(quota, value, granule_end_time)
else:
self._add_quota_usage(quota, previous_values[i], granule_end_time)

def _delete_from_redis(self, keys: list[str]) -> Any:
for key in keys:
self.redis_pipeline.delete(key)
return self.redis_pipeline.execute()


@freeze_time()
class CircuitBreakerTest(TestCase):
Expand Down

0 comments on commit d5c9975

Please sign in to comment.