diff --git a/watchtower/__init__.py b/watchtower/__init__.py index 4e12fc3..869ab6a 100644 --- a/watchtower/__init__.py +++ b/watchtower/__init__.py @@ -77,7 +77,7 @@ def __init__(self, log_group=__name__, stream_name=None, use_queues=True, send_i _idempotent_create(self.cwl_client.create_log_group, logGroupName=self.log_group) - def _submit_batch(self, batch, stream_name): + def _submit_batch(self, batch, stream_name, max_retries=5): if len(batch) < 1: return sorted_batch = sorted(batch, key=itemgetter('timestamp'), reverse=False) @@ -86,15 +86,16 @@ def _submit_batch(self, batch, stream_name): if self.sequence_tokens[stream_name] is not None: kwargs["sequenceToken"] = self.sequence_tokens[stream_name] - try: - response = self.cwl_client.put_log_events(**kwargs) - except ClientError as e: - if e.response.get("Error", {}).get("Code") in ("DataAlreadyAcceptedException", - "InvalidSequenceTokenException"): - kwargs["sequenceToken"] = e.response["Error"]["Message"].rsplit(" ", 1)[-1] + for retry in range(max_retries): + try: response = self.cwl_client.put_log_events(**kwargs) - else: - raise + break + except ClientError as e: + if e.response.get("Error", {}).get("Code") in ("DataAlreadyAcceptedException", + "InvalidSequenceTokenException"): + kwargs["sequenceToken"] = e.response["Error"]["Message"].rsplit(" ", 1)[-1] + else: + raise if "rejectedLogEventsInfo" in response: # TODO: make this configurable/non-fatal