Skip to content

Commit

Permalink
implement our own retry logic & logging for event posts, don't use ur…
Browse files Browse the repository at this point in the history
…llib3.Retry (#133)
  • Loading branch information
eli-darkly authored May 9, 2020
1 parent f7ec18a commit 02a803f
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 39 deletions.
99 changes: 63 additions & 36 deletions ldclient/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
from ldclient.interfaces import EventProcessor
from ldclient.repeating_timer import RepeatingTimer
from ldclient.util import UnsuccessfulResponseException
from ldclient.util import _headers, _retryable_statuses
from ldclient.util import log
from ldclient.util import http_error_message, is_http_error_recoverable, stringify_attrs, throw_if_unsuccessful_response
from ldclient.util import check_if_error_is_recoverable_and_log, is_http_error_recoverable, stringify_attrs, throw_if_unsuccessful_response, _headers
from ldclient.diagnostics import create_diagnostic_init

__MAX_FLUSH_THREADS__ = 5
Expand Down Expand Up @@ -141,18 +140,6 @@ def _get_userkey(self, event):
return str(event['user'].get('key'))


class _EventRetry(urllib3.Retry):
def __init__(self):
urllib3.Retry.__init__(self, total=1,
method_whitelist=False, # Enable retry on POST
status_forcelist=_retryable_statuses,
raise_on_status=False)

# Override backoff time to be flat 1 second
def get_backoff_time(self):
return 1


class EventPayloadSendTask(object):
def __init__(self, http, config, formatter, payload, response_fn):
self._http = http
Expand All @@ -175,16 +162,17 @@ def _do_send(self, output_events):
try:
json_body = json.dumps(output_events)
log.debug('Sending events payload: ' + json_body)
hdrs = _headers(self._config)
hdrs['X-LaunchDarkly-Event-Schema'] = str(__CURRENT_EVENT_SCHEMA__)
hdrs['X-LaunchDarkly-Payload-ID'] = str(uuid.uuid4())
uri = self._config.events_uri
r = self._http.request('POST', uri,
headers=hdrs,
timeout=urllib3.Timeout(connect=self._config.connect_timeout, read=self._config.read_timeout),
body=json_body,
retries=_EventRetry())
self._response_fn(r)
payload_id = str(uuid.uuid4())
r = _post_events_with_retry(
self._http,
self._config,
self._config.events_uri,
payload_id,
json_body,
"%d events" % len(self._payload.events)
)
if r:
self._response_fn(r)
return r
except Exception as e:
log.warning(
Expand All @@ -202,13 +190,14 @@ def run(self):
try:
json_body = json.dumps(self._event_body)
log.debug('Sending diagnostic event: ' + json_body)
hdrs = _headers(self._config)
uri = self._config.events_base_uri + '/diagnostic'
r = self._http.request('POST', uri,
headers=hdrs,
timeout=urllib3.Timeout(connect=self._config.connect_timeout, read=self._config.read_timeout),
body=json_body,
retries=1)
_post_events_with_retry(
self._http,
self._config,
self._config.events_base_uri + '/diagnostic',
None,
json_body,
"diagnostic event"
)
except Exception as e:
log.warning(
'Unhandled exception in event processor. Diagnostic event was not sent. [%s]', e)
Expand Down Expand Up @@ -381,11 +370,9 @@ def _handle_response(self, r):
if server_date is not None:
timestamp = int(time.mktime(server_date) * 1000)
self._last_known_past_time = timestamp
if r.status > 299:
log.error(http_error_message(r.status, "event delivery", "some events were dropped"))
if not is_http_error_recoverable(r.status):
self._disabled = True
return
if r.status > 299 and not is_http_error_recoverable(r.status):
self._disabled = True
return

def _send_and_reset_diagnostics(self):
if self._diagnostic_accumulator is not None:
Expand Down Expand Up @@ -472,3 +459,43 @@ def __enter__(self):

def __exit__(self, type, value, traceback):
self.stop()


def _post_events_with_retry(
http_client,
config,
uri,
payload_id,
body,
events_description
):
hdrs = _headers(config)
hdrs['Content-Type'] = 'application/json'
if payload_id:
hdrs['X-LaunchDarkly-Event-Schema'] = str(__CURRENT_EVENT_SCHEMA__)
hdrs['X-LaunchDarkly-Payload-ID'] = payload_id
can_retry = True
context = "posting %s" % events_description
while True:
next_action_message = "will retry" if can_retry else "some events were dropped"
try:
r = http_client.request(
'POST',
uri,
headers=hdrs,
body=body,
timeout=urllib3.Timeout(connect=config.connect_timeout, read=config.read_timeout),
retries=0
)
if r.status < 300:
return r
recoverable = check_if_error_is_recoverable_and_log(context, r.status, None, next_action_message)
if not recoverable:
return r
except Exception as e:
check_if_error_is_recoverable_and_log(context, None, str(e), next_action_message)
if not can_retry:
return None
can_retry = False
# fixed delay of 1 second for event retries
time.sleep(1)
19 changes: 16 additions & 3 deletions ldclient/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,28 @@ def is_http_error_recoverable(status):
return True # all other errors are recoverable


def http_error_description(status):
return "HTTP error %d%s" % (status, " (invalid SDK key)" if (status == 401 or status == 403) else "")


def http_error_message(status, context, retryable_message = "will retry"):
return "Received HTTP error %d%s for %s - %s" % (
status,
" (invalid SDK key)" if (status == 401 or status == 403) else "",
return "Received %s for %s - %s" % (
http_error_description(status),
context,
retryable_message if is_http_error_recoverable(status) else "giving up permanently"
)


def check_if_error_is_recoverable_and_log(error_context, status_code, error_desc, recoverable_message):
if status_code and (error_desc is None):
error_desc = http_error_description(status_code)
if status_code and not is_http_error_recoverable(status_code):
log.error("Error %s (giving up permanently): %s" % (error_context, error_desc))
return False
log.warning("Error %s (%s): %s" % (error_context, recoverable_message, error_desc))
return True


def stringify_attrs(attrdict, attrs):
if attrdict is None:
return None
Expand Down

0 comments on commit 02a803f

Please sign in to comment.