diff --git a/watchtower/__init__.py b/watchtower/__init__.py index 1c503db..a9afc34 100644 --- a/watchtower/__init__.py +++ b/watchtower/__init__.py @@ -135,11 +135,9 @@ def __init__(self, log_group=__name__, stream_name=None, use_queues=True, send_i self.max_batch_size = max_batch_size self.max_batch_count = max_batch_count self.max_message_size = max_message_size - self.queues, self.sequence_tokens = {}, {} - self.threads = [] - self.creating_log_stream, self.shutting_down = False, False self.create_log_stream = create_log_stream self.log_group_retention_days = log_group_retention_days + self._init_state() # Creating session should be the final call in __init__, after all instance attributes are set. # This ensures that failing to create the session will not result in any missing attribtues. @@ -155,6 +153,18 @@ def __init__(self, log_group=__name__, stream_name=None, use_queues=True, send_i self.addFilter(_boto_debug_filter) + def _at_fork_reinit(self): + # This was added in Python 3.9 and should only be called with a recent + # version of Python. An older version will attempt to call createLock + # instead. + super()._at_fork_reinit() + self._init_state() + + def _init_state(self): + self.queues, self.sequence_tokens = {}, {} + self.threads = [] + self.creating_log_stream, self.shutting_down = False, False + def _submit_batch(self, batch, stream_name, max_retries=5): if len(batch) < 1: return @@ -208,6 +218,10 @@ def _submit_batch(self, batch, stream_name, max_retries=5): # from the response self.sequence_tokens[stream_name] = response["nextSequenceToken"] + def createLock(self): + super().createLock() + self._init_state() + def emit(self, message): if self.creating_log_stream: return # Avoid infinite recursion when asked to log a message as our own side effect