From a9a329ad5e4a8a09c83249cdb03a0131f28da2cb Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 9 May 2019 10:52:53 +0200 Subject: [PATCH] Make a few streaming pull methods thread-safe With the StreamingPullManager._on_response() callback adding received messages to the leaser synchronously (in the background consumer thread), a race condition can happen with the dispatcher thread that can asynchronously add (remove) messages to (from) lease management, e.g. on ack() and nack() requests. The same is the case with related operations of maybe pausing/resuming the background consumer. This commit thus adds locks in key places, assuring that these operations are atomic, ant not subject to race conditions. --- .../pubsub_v1/subscriber/_protocol/leaser.py | 44 +++++++-------- .../_protocol/streaming_pull_manager.py | 53 ++++++++++--------- 2 files changed, 52 insertions(+), 45 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index bcb73352b537..40f17f30a7cb 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -64,30 +64,32 @@ def bytes(self): def add(self, items): """Add messages to be managed by the leaser.""" - for item in items: - # Add the ack ID to the set of managed ack IDs, and increment - # the size counter. - if item.ack_id not in self._leased_messages: - self._leased_messages[item.ack_id] = _LeasedMessage( - added_time=time.time(), size=item.byte_size - ) - self._bytes += item.byte_size - else: - _LOGGER.debug("Message %s is already lease managed", item.ack_id) + with self._operational_lock: + for item in items: + # Add the ack ID to the set of managed ack IDs, and increment + # the size counter. + if item.ack_id not in self._leased_messages: + self._leased_messages[item.ack_id] = _LeasedMessage( + added_time=time.time(), size=item.byte_size + ) + self._bytes += item.byte_size + else: + _LOGGER.debug("Message %s is already lease managed", item.ack_id) def remove(self, items): """Remove messages from lease management.""" - # Remove the ack ID from lease management, and decrement the - # byte counter. - for item in items: - if self._leased_messages.pop(item.ack_id, None) is not None: - self._bytes -= item.byte_size - else: - _LOGGER.debug("Item %s was not managed.", item.ack_id) - - if self._bytes < 0: - _LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes) - self._bytes = 0 + with self._operational_lock: + # Remove the ack ID from lease management, and decrement the + # byte counter. + for item in items: + if self._leased_messages.pop(item.ack_id, None) is not None: + self._bytes -= item.byte_size + else: + _LOGGER.debug("Item %s was not managed.", item.ack_id) + + if self._bytes < 0: + _LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes) + self._bytes = 0 def maintain_leases(self): """Maintain all of the leases being managed. diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index f14a084a3fca..ba4c7673bcb9 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -120,6 +120,7 @@ def __init__( # but not yet added to the lease management (and not sent to user callback), # because the FlowControl limits have been hit. self._messages_on_hold = queue.Queue() + self._pause_resume_lock = threading.Lock() # The threads created in ``.open()``. self._dispatcher = None @@ -216,10 +217,13 @@ def add_close_callback(self, callback): def maybe_pause_consumer(self): """Check the current load and pause the consumer if needed.""" - if self.load >= 1.0: - if self._consumer is not None and not self._consumer.is_paused: - _LOGGER.debug("Message backlog over load at %.2f, pausing.", self.load) - self._consumer.pause() + with self._pause_resume_lock: + if self.load >= 1.0: + if self._consumer is not None and not self._consumer.is_paused: + _LOGGER.debug( + "Message backlog over load at %.2f, pausing.", self.load + ) + self._consumer.pause() def maybe_resume_consumer(self): """Check the load and held messages and resume the consumer if needed. @@ -228,26 +232,27 @@ def maybe_resume_consumer(self): release those messages first before resuming the consumer, all with the maximum allowed leaser load in mind. """ - # If we have been paused by flow control, check and see if we are - # back within our limits. - # - # In order to not thrash too much, require us to have passed below - # the resume threshold (80% by default) of each flow control setting - # before restarting. - if self._consumer is None or not self._consumer.is_paused: - return - - _LOGGER.debug("Current load: %.2f", self.load) - - # Before maybe resuming the background consumer, release any messages - # currently on hold, if the current load allows for it. - self._maybe_release_messages() - - if self.load < self.flow_control.resume_threshold: - _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) - self._consumer.resume() - else: - _LOGGER.debug("Did not resume, current load is %.2f.", self.load) + with self._pause_resume_lock: + # If we have been paused by flow control, check and see if we are + # back within our limits. + # + # In order to not thrash too much, require us to have passed below + # the resume threshold (80% by default) of each flow control setting + # before restarting. + if self._consumer is None or not self._consumer.is_paused: + return + + _LOGGER.debug("Current load: %.2f", self.load) + + # Before maybe resuming the background consumer, release any messages + # currently on hold, if the current load allows for it. + self._maybe_release_messages() + + if self.load < self.flow_control.resume_threshold: + _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) + self._consumer.resume() + else: + _LOGGER.debug("Did not resume, current load is %.2f.", self.load) def _maybe_release_messages(self): """Release (some of) the held messages if the current load allows for it.