diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index bcb73352b537..40f17f30a7cb 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -64,30 +64,32 @@ def bytes(self): def add(self, items): """Add messages to be managed by the leaser.""" - for item in items: - # Add the ack ID to the set of managed ack IDs, and increment - # the size counter. - if item.ack_id not in self._leased_messages: - self._leased_messages[item.ack_id] = _LeasedMessage( - added_time=time.time(), size=item.byte_size - ) - self._bytes += item.byte_size - else: - _LOGGER.debug("Message %s is already lease managed", item.ack_id) + with self._operational_lock: + for item in items: + # Add the ack ID to the set of managed ack IDs, and increment + # the size counter. + if item.ack_id not in self._leased_messages: + self._leased_messages[item.ack_id] = _LeasedMessage( + added_time=time.time(), size=item.byte_size + ) + self._bytes += item.byte_size + else: + _LOGGER.debug("Message %s is already lease managed", item.ack_id) def remove(self, items): """Remove messages from lease management.""" - # Remove the ack ID from lease management, and decrement the - # byte counter. - for item in items: - if self._leased_messages.pop(item.ack_id, None) is not None: - self._bytes -= item.byte_size - else: - _LOGGER.debug("Item %s was not managed.", item.ack_id) - - if self._bytes < 0: - _LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes) - self._bytes = 0 + with self._operational_lock: + # Remove the ack ID from lease management, and decrement the + # byte counter. + for item in items: + if self._leased_messages.pop(item.ack_id, None) is not None: + self._bytes -= item.byte_size + else: + _LOGGER.debug("Item %s was not managed.", item.ack_id) + + if self._bytes < 0: + _LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes) + self._bytes = 0 def maintain_leases(self): """Maintain all of the leases being managed. diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index f14a084a3fca..ba4c7673bcb9 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -120,6 +120,7 @@ def __init__( # but not yet added to the lease management (and not sent to user callback), # because the FlowControl limits have been hit. self._messages_on_hold = queue.Queue() + self._pause_resume_lock = threading.Lock() # The threads created in ``.open()``. self._dispatcher = None @@ -216,10 +217,13 @@ def add_close_callback(self, callback): def maybe_pause_consumer(self): """Check the current load and pause the consumer if needed.""" - if self.load >= 1.0: - if self._consumer is not None and not self._consumer.is_paused: - _LOGGER.debug("Message backlog over load at %.2f, pausing.", self.load) - self._consumer.pause() + with self._pause_resume_lock: + if self.load >= 1.0: + if self._consumer is not None and not self._consumer.is_paused: + _LOGGER.debug( + "Message backlog over load at %.2f, pausing.", self.load + ) + self._consumer.pause() def maybe_resume_consumer(self): """Check the load and held messages and resume the consumer if needed. @@ -228,26 +232,27 @@ def maybe_resume_consumer(self): release those messages first before resuming the consumer, all with the maximum allowed leaser load in mind. """ - # If we have been paused by flow control, check and see if we are - # back within our limits. - # - # In order to not thrash too much, require us to have passed below - # the resume threshold (80% by default) of each flow control setting - # before restarting. - if self._consumer is None or not self._consumer.is_paused: - return - - _LOGGER.debug("Current load: %.2f", self.load) - - # Before maybe resuming the background consumer, release any messages - # currently on hold, if the current load allows for it. - self._maybe_release_messages() - - if self.load < self.flow_control.resume_threshold: - _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) - self._consumer.resume() - else: - _LOGGER.debug("Did not resume, current load is %.2f.", self.load) + with self._pause_resume_lock: + # If we have been paused by flow control, check and see if we are + # back within our limits. + # + # In order to not thrash too much, require us to have passed below + # the resume threshold (80% by default) of each flow control setting + # before restarting. + if self._consumer is None or not self._consumer.is_paused: + return + + _LOGGER.debug("Current load: %.2f", self.load) + + # Before maybe resuming the background consumer, release any messages + # currently on hold, if the current load allows for it. + self._maybe_release_messages() + + if self.load < self.flow_control.resume_threshold: + _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) + self._consumer.resume() + else: + _LOGGER.debug("Did not resume, current load is %.2f.", self.load) def _maybe_release_messages(self): """Release (some of) the held messages if the current load allows for it.