diff --git a/retrying.py b/retrying.py index bcb7a9d..5e9d67e 100644 --- a/retrying.py +++ b/retrying.py @@ -23,6 +23,12 @@ MAX_WAIT = 1073741823 +def _val_or(val, default): + if val is None: + return default + return val + + def _retry_if_exception_of_type(retryable_types): def _retry_if_exception_these_types(exception): return isinstance(exception, retryable_types) @@ -59,161 +65,288 @@ def wrapped_f(*args, **kw): return wrap -class Retrying(object): - - def __init__(self, - stop=None, wait=None, - stop_max_attempt_number=None, - stop_max_delay=None, - wait_fixed=None, - wait_random_min=None, wait_random_max=None, - wait_incrementing_start=None, wait_incrementing_increment=None, - wait_incrementing_max=None, - wait_exponential_multiplier=None, wait_exponential_max=None, - retry_on_exception=None, - retry_on_result=None, - wrap_exception=False, - stop_func=None, - wait_func=None, - wait_jitter_max=None, - before_attempts=None, - after_attempts=None): +class _stop_after_attempt(object): + """Strategy that stops when the previous attempt >= max_attempt.""" - self._stop_max_attempt_number = 5 if stop_max_attempt_number is None else stop_max_attempt_number - self._stop_max_delay = 100 if stop_max_delay is None else stop_max_delay - self._wait_fixed = 1000 if wait_fixed is None else wait_fixed - self._wait_random_min = 0 if wait_random_min is None else wait_random_min - self._wait_random_max = 1000 if wait_random_max is None else wait_random_max - self._wait_incrementing_start = 0 if wait_incrementing_start is None else wait_incrementing_start - self._wait_incrementing_increment = 100 if wait_incrementing_increment is None else wait_incrementing_increment - self._wait_exponential_multiplier = 1 if wait_exponential_multiplier is None else wait_exponential_multiplier - self._wait_exponential_max = MAX_WAIT if wait_exponential_max is None else wait_exponential_max - self._wait_incrementing_max = MAX_WAIT if wait_incrementing_max is None else wait_incrementing_max - self._wait_jitter_max = 0 if wait_jitter_max is None else wait_jitter_max - self._before_attempts = before_attempts - self._after_attempts = after_attempts + def __init__(self, stop_max_attempt_number): + self.stop_max_attempt_number = stop_max_attempt_number - # TODO add chaining of stop behaviors - # stop behavior - stop_funcs = [] - if stop_max_attempt_number is not None: - stop_funcs.append(self.stop_after_attempt) + def __call__(self, previous_attempt_number, delay_since_first_attempt_ms): + return previous_attempt_number >= self.stop_max_attempt_number - if stop_max_delay is not None: - stop_funcs.append(self.stop_after_delay) - if stop_func is not None: - self.stop = stop_func +class _stop_after_delay(object): + """Strategy that stops when the time from the first attempt >= limit.""" - elif stop is None: - self.stop = lambda attempts, delay: any(f(attempts, delay) for f in stop_funcs) + def __init__(self, stop_max_delay): + self.stop_max_delay = stop_max_delay - else: - self.stop = getattr(self, stop) + def __call__(self, previous_attempt_number, delay_since_first_attempt_ms): + return delay_since_first_attempt_ms >= self.stop_max_delay - # TODO add chaining of wait behaviors - # wait behavior - wait_funcs = [lambda *args, **kwargs: 0] - if wait_fixed is not None: - wait_funcs.append(self.fixed_sleep) - if wait_random_min is not None or wait_random_max is not None: - wait_funcs.append(self.random_sleep) +class _fixed_sleep(object): + """Wait strategy that waits a fixed amount of time between each retry.""" - if wait_incrementing_start is not None or wait_incrementing_increment is not None: - wait_funcs.append(self.incrementing_sleep) + def __init__(self, wait_fixed): + self.wait_fixed = wait_fixed - if wait_exponential_multiplier is not None or wait_exponential_max is not None: - wait_funcs.append(self.exponential_sleep) + def __call__(self, previous_attempt_number, delay_since_first_attempt_ms): + return self.wait_fixed - if wait_func is not None: - self.wait = wait_func - elif wait is None: - self.wait = lambda attempts, delay: max(f(attempts, delay) for f in wait_funcs) +class _no_sleep(_fixed_sleep): + """Wait strategy that doesn't wait at all before retrying.""" - else: - self.wait = getattr(self, wait) + def __init__(self): + super(_no_sleep, self).__init__(0) - # retry on exception filter - if retry_on_exception is None: - self._retry_on_exception = self.always_reject - else: - # this allows for providing a tuple of exception types that - # should be allowed to retry on, and avoids having to create - # a callback that does the same thing - if isinstance(retry_on_exception, (tuple)): - retry_on_exception = _retry_if_exception_of_type( - retry_on_exception) - self._retry_on_exception = retry_on_exception - - # retry on result filter - if retry_on_result is None: - self._retry_on_result = self.never_reject - else: - self._retry_on_result = retry_on_result - self._wrap_exception = wrap_exception +class _random_sleep(object): + """Wait strategy that waits a random amount of time between min/max.""" - def stop_after_attempt(self, previous_attempt_number, delay_since_first_attempt_ms): - """Stop after the previous attempt >= stop_max_attempt_number.""" - return previous_attempt_number >= self._stop_max_attempt_number + def __init__(self, wait_random_min, wait_random_max): + self.wait_random_min = wait_random_min + self.wait_random_max = wait_random_max - def stop_after_delay(self, previous_attempt_number, delay_since_first_attempt_ms): - """Stop after the time from the first attempt >= stop_max_delay.""" - return delay_since_first_attempt_ms >= self._stop_max_delay + def __call__(self, previous_attempt_number, delay_since_first_attempt_ms): + return random.randint(self.wait_random_min, self.wait_random_max) - @staticmethod - def no_sleep(previous_attempt_number, delay_since_first_attempt_ms): - """Don't sleep at all before retrying.""" - return 0 - def fixed_sleep(self, previous_attempt_number, delay_since_first_attempt_ms): - """Sleep a fixed amount of time between each retry.""" - return self._wait_fixed +class _incrementing_sleep(object): + """ + Wait strategy that waits an incremental amount of time after each + attempt, starting at a starting value and incrementing by a value for + each attempt (and restricting the upper limit to some maximum value). + """ - def random_sleep(self, previous_attempt_number, delay_since_first_attempt_ms): - """Sleep a random amount of time between wait_random_min and wait_random_max""" - return random.randint(self._wait_random_min, self._wait_random_max) + def __init__(self, wait_incrementing_start, wait_incrementing_increment, + wait_incrementing_max): + self.wait_incrementing_start = wait_incrementing_start + self.wait_incrementing_increment = wait_incrementing_increment + self.wait_incrementing_max = wait_incrementing_max - def incrementing_sleep(self, previous_attempt_number, delay_since_first_attempt_ms): - """ - Sleep an incremental amount of time after each attempt, starting at - wait_incrementing_start and incrementing by wait_incrementing_increment - """ - result = self._wait_incrementing_start + (self._wait_incrementing_increment * (previous_attempt_number - 1)) - if result > self._wait_incrementing_max: - result = self._wait_incrementing_max + def __call__(self, previous_attempt_number, delay_since_first_attempt_ms): + result = self.wait_incrementing_start + (self.wait_incrementing_increment * (previous_attempt_number - 1)) + if result > self.wait_incrementing_max: + result = self.wait_incrementing_max if result < 0: result = 0 return result - def exponential_sleep(self, previous_attempt_number, delay_since_first_attempt_ms): - exp = 2 ** previous_attempt_number - result = self._wait_exponential_multiplier * exp - if result > self._wait_exponential_max: - result = self._wait_exponential_max + +class _exponential_sleep(object): + """Wait strategy that applies exponential backoff. + + It allows for a customized multiplier and an ability to restrict the + upper limit to some maximum value. + """ + + #: Defaults to 2^n (where n is the prior attempt number/count). + EXP_BASE = 2 + + def __init__(self, wait_exponential_multiplier, wait_exponential_max): + self.wait_exponential_multiplier = wait_exponential_multiplier + self.wait_exponential_max = wait_exponential_max + + def __call__(self, previous_attempt_number, delay_since_first_attempt_ms): + exp = self.EXP_BASE ** previous_attempt_number + result = self.wait_exponential_multiplier * exp + if result > self.wait_exponential_max: + result = self.wait_exponential_max if result < 0: result = 0 return result - @staticmethod - def never_reject(result): - return False - @staticmethod - def always_reject(result): - return True +def _never_reject(result): + """Rejection strategy that never rejects any result.""" + return False + + +def _always_reject(result): + """Rejection strategy that always rejects any result.""" + return True - def should_reject(self, attempt): + +class _MaybeReject(object): + """Rejection strategy that proxies to 2 functions to reject (or not).""" + + def __init__(self, retry_on_exception=None, + retry_on_result=None): + self.retry_on_exception = retry_on_exception + self.retry_on_result = retry_on_result + + def __call__(self, attempt): reject = False if attempt.has_exception: - reject |= self._retry_on_exception(attempt.value[1]) + if self.retry_on_exception is not None: + reject |= self.retry_on_exception(attempt.value[1]) else: - reject |= self._retry_on_result(attempt.value) - + if self.retry_on_result is not None: + reject |= self.retry_on_result(attempt.value) return reject + +def _select_stop_strategy(stop=None, stop_max_attempt_number=None, + stop_max_delay=None, stop_func=None): + if stop_func is not None: + return stop_func + + if stop is not None: + for s_name, s in [ + ('stop_after_attempt', + _stop_after_attempt(_val_or(stop_max_attempt_number, 5))), + ('stop_after_delay', + _stop_after_delay(_val_or(stop_max_delay, 100))), + ]: + if s_name == stop: + return s + # Match the original behavior if we didn't match to any + # known strategy + raise AttributeError("No stop strategy with name '%s'" % stop) + + stop_strategies = [] + + if stop_max_attempt_number is not None: + stop_strategies.append(_stop_after_attempt(stop_max_attempt_number)) + + if stop_max_delay is not None: + stop_strategies.append(_stop_after_delay(stop_max_delay)) + + def any_strategy(previous_attempt_number, delay_since_first_attempt_ms): + return any(s(previous_attempt_number, delay_since_first_attempt_ms) + for s in stop_strategies) + + return any_strategy + + +def _select_reject_strategy(retry_on_exception=None, retry_on_result=None): + if retry_on_exception is None: + retry_on_exception = _always_reject + else: + if isinstance(retry_on_exception, (tuple)): + retry_on_exception = _retry_if_exception_of_type( + retry_on_exception) + if retry_on_result is None: + retry_on_result = _never_reject + return _MaybeReject(retry_on_result=retry_on_result, + retry_on_exception=retry_on_exception) + + +def _select_wait_strategy(wait_func=None, wait=None, + wait_fixed=None, + wait_random_min=None, wait_random_max=None, + wait_incrementing_start=None, + wait_incrementing_increment=None, + wait_incrementing_max=None, + wait_exponential_multiplier=None, + wait_exponential_max=None): + if wait_func is not None: + return wait_func + + if wait is not None: + for w_name, w in [ + ('exponential_sleep', + _exponential_sleep(_val_or(wait_exponential_multiplier, 1), + _val_or(wait_exponential_max, MAX_WAIT))), + ('incrementing_sleep', + _incrementing_sleep(_val_or(wait_incrementing_start, 0), + _val_or(wait_incrementing_increment, 100), + _val_or(wait_incrementing_max, MAX_WAIT))), + ('fixed_sleep', + _fixed_sleep(_val_or(wait_fixed, 1000))), + ('no_sleep', _no_sleep()), + ('random_sleep', + _random_sleep(_val_or(wait_random_min, 0), + _val_or(wait_random_max, 1000))), + ]: + if w_name == wait: + return w + # Match the original behavior if we didn't match to any + # known strategy + raise AttributeError("No wait strategy with name '%s'" % wait) + + wait_strategies = [] + + if wait_fixed is not None: + wait_strategies.append(_fixed_sleep(wait_fixed)) + + if wait_random_min is not None or wait_random_max is not None: + wait_random_min = _val_or(wait_random_min, 0) + wait_random_max = _val_or(wait_random_max, 1000) + wait_strategies.append(_random_sleep(wait_random_min, + wait_random_max)) + + if (wait_incrementing_start is not None or + wait_incrementing_increment is not None): + wait_incrementing_start = _val_or(wait_incrementing_start, 0) + wait_incrementing_increment = _val_or(wait_incrementing_increment, 100) + wait_incrementing_max = _val_or(wait_incrementing_max, MAX_WAIT) + wait_strategies.append( + _incrementing_sleep(wait_incrementing_start, + wait_incrementing_increment, + wait_incrementing_max)) + + if (wait_exponential_multiplier is not None or + wait_exponential_max is not None): + wait_exponential_multiplier = _val_or(wait_exponential_multiplier, 1) + wait_exponential_max = _val_or(wait_exponential_max, MAX_WAIT) + wait_strategies.append( + _exponential_sleep(wait_exponential_multiplier, + wait_exponential_max)) + + if not wait_strategies: + wait_strategies.append(_no_sleep()) + + def max_strategy(previous_attempt_number, delay_since_first_attempt_ms): + return max(s(previous_attempt_number, delay_since_first_attempt_ms) + for s in wait_strategies) + + return max_strategy + + +class Retrying(object): + """Retrying controller.""" + + def __init__(self, + stop=None, wait=None, + stop_max_attempt_number=None, + stop_max_delay=None, + wait_fixed=None, + wait_random_min=None, wait_random_max=None, + wait_incrementing_start=None, wait_incrementing_increment=None, + wait_incrementing_max=None, + wait_exponential_multiplier=None, wait_exponential_max=None, + retry_on_exception=None, + retry_on_result=None, + wrap_exception=False, + stop_func=None, + wait_func=None, + wait_jitter_max=None, + before_attempts=None, + after_attempts=None): + self.stop = _select_stop_strategy( + stop=stop, stop_func=stop_func, + stop_max_attempt_number=stop_max_attempt_number, + stop_max_delay=stop_max_delay) + self.wait = _select_wait_strategy( + wait_func=wait_func, wait=wait, + wait_fixed=wait_fixed, + wait_random_min=wait_random_min, wait_random_max=wait_random_max, + wait_incrementing_start=wait_incrementing_start, + wait_incrementing_increment=wait_incrementing_increment, + wait_incrementing_max=wait_incrementing_max, + wait_exponential_multiplier=wait_exponential_multiplier, + wait_exponential_max=wait_exponential_max) + self.should_reject = _select_reject_strategy( + retry_on_exception=retry_on_exception, + retry_on_result=retry_on_result) + self._wrap_exception = wrap_exception + self._before_attempts = before_attempts + self._after_attempts = after_attempts + self._wait_jitter_max = wait_jitter_max + def call(self, fn, *args, **kwargs): start_time = int(round(time.time() * 1000)) attempt_number = 1