Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reset internal state on fork to prevent deadlocks in worker threads #139

Merged
merged 1 commit into from
May 8, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions watchtower/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,9 @@ def __init__(self, log_group=__name__, stream_name=None, use_queues=True, send_i
self.max_batch_size = max_batch_size
self.max_batch_count = max_batch_count
self.max_message_size = max_message_size
self.queues, self.sequence_tokens = {}, {}
self.threads = []
self.creating_log_stream, self.shutting_down = False, False
self.create_log_stream = create_log_stream
self.log_group_retention_days = log_group_retention_days
self._init_state()
Copy link
Contributor Author

@terencehonles terencehonles Jan 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be called with the parent's __init__ which creates its lock. Since it's ok to init the state twice I made it more obvious it's being called by calling it directly. I can add a comment on super().__init__ if that seems better (I went with this way in case the stdlib changed, but that seems unlikely).


# Creating session should be the final call in __init__, after all instance attributes are set.
# This ensures that failing to create the session will not result in any missing attribtues.
Expand All @@ -155,6 +153,18 @@ def __init__(self, log_group=__name__, stream_name=None, use_queues=True, send_i

self.addFilter(_boto_debug_filter)

def _at_fork_reinit(self):
# This was added in Python 3.9 and should only be called with a recent
# version of Python. An older version will attempt to call createLock
# instead.
super()._at_fork_reinit()
self._init_state()

def _init_state(self):
self.queues, self.sequence_tokens = {}, {}
self.threads = []
self.creating_log_stream, self.shutting_down = False, False

def _submit_batch(self, batch, stream_name, max_retries=5):
if len(batch) < 1:
return
Expand Down Expand Up @@ -208,6 +218,10 @@ def _submit_batch(self, batch, stream_name, max_retries=5):
# from the response
self.sequence_tokens[stream_name] = response["nextSequenceToken"]

def createLock(self):
super().createLock()
self._init_state()

def emit(self, message):
if self.creating_log_stream:
return # Avoid infinite recursion when asked to log a message as our own side effect
Expand Down