Skip to content

Commit

Permalink
make stream retry/backoff/jitter behavior consistent with other SDKs …
Browse files Browse the repository at this point in the history
…+ improve testing (#131)
  • Loading branch information
eli-darkly authored Mar 27, 2020
1 parent b7d081b commit 770fd71
Show file tree
Hide file tree
Showing 20 changed files with 869 additions and 319 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ jobs:
pip install -r consul-requirements.txt
python setup.py install
- run:
name: run tests (2.7)
name: run tests
command: |
mkdir test-reports
$env:Path += ";C:\Python27\;C:\Python27\Scripts\" # has no effect if 2.7 isn't installed
Expand Down
14 changes: 14 additions & 0 deletions ldclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,20 @@ def get():
__lock.unlock()


# for testing only
def _reset_client():
global __client
global __lock
try:
__lock.lock()
c = __client
__client = None
finally:
__lock.unlock()
if c:
c.close()


# currently hidden from documentation - see docs/README.md
class NullHandler(logging.Handler):
"""A :class:`logging.Handler` implementation that does nothing.
Expand Down
10 changes: 10 additions & 0 deletions ldclient/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def __init__(self,
flush_interval=5,
stream_uri='https://stream.launchdarkly.com',
stream=True,
initial_reconnect_delay=1,
verify_ssl=True,
defaults=None,
send_events=None,
Expand Down Expand Up @@ -135,6 +136,10 @@ def __init__(self,
use the default value.
:param bool stream: Whether or not the streaming API should be used to receive flag updates. By
default, it is enabled. Streaming should only be disabled on the advice of LaunchDarkly support.
:param float initial_reconnect_delay: The initial reconnect delay (in seconds) for the streaming
connection. The streaming service uses a backoff algorithm (with jitter) every time the connection needs
to be reestablished. The delay for the first reconnection will start near this value, and then
increase exponentially for any subsequent connection failures.
:param bool verify_ssl: Deprecated; use `http` instead and specify `disable_ssl_verification` as
part of :class:`HTTPConfig` if you want to turn off SSL verification (not recommended).
:param bool send_events: Whether or not to send events back to LaunchDarkly. This differs from
Expand Down Expand Up @@ -198,6 +203,7 @@ def __init__(self,
self.__stream_uri = stream_uri.rstrip('\\')
self.__update_processor_class = update_processor_class
self.__stream = stream
self.__initial_reconnect_delay = initial_reconnect_delay
self.__poll_interval = max(poll_interval, 30)
self.__use_ldd = use_ldd
self.__feature_store = InMemoryFeatureStore() if not feature_store else feature_store
Expand Down Expand Up @@ -248,6 +254,7 @@ def copy_with_new_sdk_key(self, new_sdk_key):
flush_interval=self.__flush_interval,
stream_uri=self.__stream_uri,
stream=self.__stream,
initial_reconnect_delay=self.__initial_reconnect_delay,
verify_ssl=self.__verify_ssl,
defaults=self.__defaults,
send_events=self.__send_events,
Expand Down Expand Up @@ -315,6 +322,9 @@ def stream(self):
return self.__stream

@property
def initial_reconnect_delay(self):
return self.__initial_reconnect_delay
@property
def poll_interval(self):
return self.__poll_interval

Expand Down
2 changes: 1 addition & 1 deletion ldclient/impl/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import urllib3

def _base_headers(config):
headers = {'Authorization': config.sdk_key,
headers = {'Authorization': config.sdk_key or '',
'User-Agent': 'PythonClient/' + VERSION}
if isinstance(config.wrapper_name, str) and config.wrapper_name != "":
wrapper_version = ""
Expand Down
93 changes: 93 additions & 0 deletions ldclient/impl/retry_delay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from random import Random

# This implementation is based on the equivalent code in the Go eventsource library.

class RetryDelayStrategy(object):
"""Encapsulation of configurable backoff/jitter behavior, used for stream connections.
- The system can either be in a "good" state or a "bad" state. The initial state is "bad"; the
caller is responsible for indicating when it transitions to "good". When we ask for a new retry
delay, that implies the state is now transitioning to "bad".
- There is a configurable base delay, which can be changed at any time (if the SSE server sends
us a "retry:" directive).
- There are optional strategies for applying backoff and jitter to the delay.
This object is meant to be used from a single thread once it's been created; its methods are
not safe for concurrent use.
"""
def __init__(self, base_delay, reset_interval, backoff_strategy, jitter_strategy):
self.__base_delay = base_delay
self.__reset_interval = reset_interval
self.__backoff = backoff_strategy
self.__jitter = jitter_strategy
self.__retry_count = 0
self.__good_since = None

def next_retry_delay(self, current_time):
"""Computes the next retry interval. This also sets the current state to "bad".
Note that current_time is passed as a parameter instead of computed by this function to
guarantee predictable behavior in tests.
:param float current_time: the current time, in seconds
"""
if self.__good_since and self.__reset_interval and (current_time - self.__good_since >= self.__reset_interval):
self.__retry_count = 0
self.__good_since = None
delay = self.__base_delay
if self.__backoff:
delay = self.__backoff.apply_backoff(delay, self.__retry_count)
self.__retry_count += 1
if self.__jitter:
delay = self.__jitter.apply_jitter(delay)
return delay

def set_good_since(self, good_since):
"""Marks the current state as "good" and records the time.
:param float good_since: the time that the state became "good", in seconds
"""
self.__good_since = good_since

def set_base_delay(self, base_delay):
"""Changes the initial retry delay and resets the backoff (if any) so the next retry will use
that value.
This is used to implement the optional SSE behavior where the server sends a "retry:" command to
set the base retry to a specific value. Note that we will still apply a jitter, if jitter is enabled,
and subsequent retries will still increase exponentially.
"""
self.__base_delay = base_delay
self.__retry_count = 0

class DefaultBackoffStrategy(object):
"""The default implementation of exponential backoff, which doubles the delay each time up to
the specified maximum.
If a reset_interval was specified for the RetryDelayStrategy, and the system has been in a "good"
state for at least that long, the delay is reset back to the base. This avoids perpetually increasing
delays in a situation where failures are rare).
"""
def __init__(self, max_delay):
self.__max_delay = max_delay

def apply_backoff(self, delay, retry_count):
d = delay * (2 ** retry_count)
return d if d <= self.__max_delay else self.__max_delay

class DefaultJitterStrategy(object):
"""The default implementation of jitter, which subtracts a pseudo-random amount from each delay.
"""
def __init__(self, ratio, rand_seed = None):
"""Creates an instance.
:param float ratio: a number in the range [0.0, 1.0] representing 0%-100% jitter
:param int rand_seed: if not None, will use this random seed (for test determinacy)
"""
self.__ratio = ratio
self.__random = Random(rand_seed)

def apply_jitter(self, delay):
return delay - (self.__random.random() * self.__ratio * delay)
21 changes: 13 additions & 8 deletions ldclient/sse_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,19 @@ def __next__(self):
raise EOFError()
self.buf += nextline.decode("utf-8")
except (StopIteration, EOFError) as e:
time.sleep(self.retry / 1000.0)
self._connect()

# The SSE spec only supports resuming from a whole message, so
# if we have half a message we should throw it out.
head, sep, tail = self.buf.rpartition('\n')
self.buf = head + sep
continue
if self.retry:
# This retry logic is not what we want in the SDK. It's retained here for backward compatibility in case
# anyone else is using SSEClient.
time.sleep(self.retry / 1000.0)
self._connect()

# The SSE spec only supports resuming from a whole message, so
# if we have half a message we should throw it out.
head, sep, tail = self.buf.rpartition('\n')
self.buf = head + sep
continue
else:
raise

split = re.split(end_of_field, self.buf)
head = split[0]
Expand Down
35 changes: 20 additions & 15 deletions ldclient/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
import json
from threading import Thread

import backoff
import logging
import math
import time

from ldclient.impl.http import _http_factory
from ldclient.impl.retry_delay import RetryDelayStrategy, DefaultBackoffStrategy, DefaultJitterStrategy
from ldclient.interfaces import UpdateProcessor
from ldclient.sse_client import SSEClient
from ldclient.util import log, UnsuccessfulResponseException, http_error_message, is_http_error_recoverable
Expand All @@ -22,6 +23,10 @@
# stream will keep this from triggering
stream_read_timeout = 5 * 60

MAX_RETRY_DELAY = 30
BACKOFF_RESET_INTERVAL = 60
JITTER_RATIO = 0.5

STREAM_ALL_PATH = '/all'

ParsedPath = namedtuple('ParsedPath', ['kind', 'key'])
Expand All @@ -39,6 +44,11 @@ def __init__(self, config, requester, store, ready, diagnostic_accumulator):
self._ready = ready
self._diagnostic_accumulator = diagnostic_accumulator
self._es_started = None
self._retry_delay = RetryDelayStrategy(
config.initial_reconnect_delay,
BACKOFF_RESET_INTERVAL,
DefaultBackoffStrategy(MAX_RETRY_DELAY),
DefaultJitterStrategy(JITTER_RATIO))

# We need to suppress the default logging behavior of the backoff package, because
# it logs messages at ERROR level with variable content (the delay time) which will
Expand All @@ -53,12 +63,19 @@ def __init__(self, config, requester, store, ready, diagnostic_accumulator):
def run(self):
log.info("Starting StreamingUpdateProcessor connecting to uri: " + self._uri)
self._running = True
attempts = 0
while self._running:
if attempts > 0:
delay = self._retry_delay.next_retry_delay(time.time())
log.info("Will reconnect after delay of %fs" % delay)
time.sleep(delay)
attempts += 1
try:
self._es_started = int(time.time() * 1000)
messages = self._connect()
for msg in messages:
if not self._running:
log.warning("but I'm done")
break
message_ok = self.process_message(self._store, self._requester, msg)
if message_ok:
Expand All @@ -76,32 +93,20 @@ def run(self):
self.stop()
break
except Exception as e:
log.warning("Caught exception. Restarting stream connection after one second. %s" % e)
log.warning("Unexpected error on stream connection: %s, will retry" % e)
self._record_stream_init(True)
self._es_started = None
# no stacktrace here because, for a typical connection error, it'll just be a lengthy tour of urllib3 internals
time.sleep(1)

def _record_stream_init(self, failed):
if self._diagnostic_accumulator and self._es_started:
current_time = int(time.time() * 1000)
self._diagnostic_accumulator.record_stream_init(current_time, current_time - self._es_started, failed)

def _backoff_expo():
return backoff.expo(max_value=30)

def should_not_retry(e):
return isinstance(e, UnsuccessfulResponseException) and (not is_http_error_recoverable(e.status))

def log_backoff_message(props):
log.error("Streaming connection failed, will attempt to restart")
log.info("Will reconnect after delay of %fs", props['wait'])

@backoff.on_exception(_backoff_expo, BaseException, max_tries=None, jitter=backoff.full_jitter,
on_backoff=log_backoff_message, giveup=should_not_retry)
def _connect(self):
return SSEClient(
self._uri,
retry = None, # we're implementing our own retry
http_factory = _http_factory(self._config)
)

Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
backoff>=1.4.3
certifi>=2018.4.16
expiringdict>=1.1.4,<1.2.0
six>=1.10.0
Expand Down
3 changes: 0 additions & 3 deletions testing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import logging
import os

logging.basicConfig(level=logging.WARN)

sdk_key = os.environ.get('LD_SDK_KEY')
Loading

0 comments on commit 770fd71

Please sign in to comment.