Skip to content

Commit

Permalink
Make a few streaming pull methods thread-safe
Browse files Browse the repository at this point in the history
With the StreamingPullManager._on_response() callback adding received
messages to the leaser synchronously (in the background consumer
thread), a race condition can happen with the dispatcher thread that
can asynchronously add (remove) messages to (from) lease management,
e.g. on ack() and nack() requests.

The same is the case with related operations of maybe pausing/resuming
the background consumer. This commit thus adds locks in key places,
assuring that these operations are atomic, ant not subject to race
conditions.
  • Loading branch information
plamut committed May 10, 2019
1 parent 65858a4 commit a9a329a
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 45 deletions.
44 changes: 23 additions & 21 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,30 +64,32 @@ def bytes(self):

def add(self, items):
"""Add messages to be managed by the leaser."""
for item in items:
# Add the ack ID to the set of managed ack IDs, and increment
# the size counter.
if item.ack_id not in self._leased_messages:
self._leased_messages[item.ack_id] = _LeasedMessage(
added_time=time.time(), size=item.byte_size
)
self._bytes += item.byte_size
else:
_LOGGER.debug("Message %s is already lease managed", item.ack_id)
with self._operational_lock:
for item in items:
# Add the ack ID to the set of managed ack IDs, and increment
# the size counter.
if item.ack_id not in self._leased_messages:
self._leased_messages[item.ack_id] = _LeasedMessage(
added_time=time.time(), size=item.byte_size
)
self._bytes += item.byte_size
else:
_LOGGER.debug("Message %s is already lease managed", item.ack_id)

def remove(self, items):
"""Remove messages from lease management."""
# Remove the ack ID from lease management, and decrement the
# byte counter.
for item in items:
if self._leased_messages.pop(item.ack_id, None) is not None:
self._bytes -= item.byte_size
else:
_LOGGER.debug("Item %s was not managed.", item.ack_id)

if self._bytes < 0:
_LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes)
self._bytes = 0
with self._operational_lock:
# Remove the ack ID from lease management, and decrement the
# byte counter.
for item in items:
if self._leased_messages.pop(item.ack_id, None) is not None:
self._bytes -= item.byte_size
else:
_LOGGER.debug("Item %s was not managed.", item.ack_id)

if self._bytes < 0:
_LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes)
self._bytes = 0

def maintain_leases(self):
"""Maintain all of the leases being managed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(
# but not yet added to the lease management (and not sent to user callback),
# because the FlowControl limits have been hit.
self._messages_on_hold = queue.Queue()
self._pause_resume_lock = threading.Lock()

# The threads created in ``.open()``.
self._dispatcher = None
Expand Down Expand Up @@ -216,10 +217,13 @@ def add_close_callback(self, callback):

def maybe_pause_consumer(self):
"""Check the current load and pause the consumer if needed."""
if self.load >= 1.0:
if self._consumer is not None and not self._consumer.is_paused:
_LOGGER.debug("Message backlog over load at %.2f, pausing.", self.load)
self._consumer.pause()
with self._pause_resume_lock:
if self.load >= 1.0:
if self._consumer is not None and not self._consumer.is_paused:
_LOGGER.debug(
"Message backlog over load at %.2f, pausing.", self.load
)
self._consumer.pause()

def maybe_resume_consumer(self):
"""Check the load and held messages and resume the consumer if needed.
Expand All @@ -228,26 +232,27 @@ def maybe_resume_consumer(self):
release those messages first before resuming the consumer, all with the
maximum allowed leaser load in mind.
"""
# If we have been paused by flow control, check and see if we are
# back within our limits.
#
# In order to not thrash too much, require us to have passed below
# the resume threshold (80% by default) of each flow control setting
# before restarting.
if self._consumer is None or not self._consumer.is_paused:
return

_LOGGER.debug("Current load: %.2f", self.load)

# Before maybe resuming the background consumer, release any messages
# currently on hold, if the current load allows for it.
self._maybe_release_messages()

if self.load < self.flow_control.resume_threshold:
_LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
self._consumer.resume()
else:
_LOGGER.debug("Did not resume, current load is %.2f.", self.load)
with self._pause_resume_lock:
# If we have been paused by flow control, check and see if we are
# back within our limits.
#
# In order to not thrash too much, require us to have passed below
# the resume threshold (80% by default) of each flow control setting
# before restarting.
if self._consumer is None or not self._consumer.is_paused:
return

_LOGGER.debug("Current load: %.2f", self.load)

# Before maybe resuming the background consumer, release any messages
# currently on hold, if the current load allows for it.
self._maybe_release_messages()

if self.load < self.flow_control.resume_threshold:
_LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
self._consumer.resume()
else:
_LOGGER.debug("Did not resume, current load is %.2f.", self.load)

def _maybe_release_messages(self):
"""Release (some of) the held messages if the current load allows for it.
Expand Down

0 comments on commit a9a329a

Please sign in to comment.