Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add flow control for message publishing #96

Merged
merged 13 commits into from
Jun 2, 2020
4 changes: 0 additions & 4 deletions google/cloud/pubsub_v1/publisher/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ class FlowControlLimitError(Exception):
"""An action resulted in exceeding the flow control limits."""


class PermanentlyBlockedError(FlowControlLimitError):
"""A message exceeds *total* flow control limits and would block forever."""


__all__ = (
"FlowControlLimitError",
"MessageTooLargeError",
Expand Down
41 changes: 23 additions & 18 deletions google/cloud/pubsub_v1/publisher/flow_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ def add(self, message):

Raises:
:exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`:
If adding a message exceeds flow control limits and the desired
action is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR`.
:exception:`~pubsub_v1.publisher.exceptions.PermanentlyBlockedError`:
If adding a message exceeds total flow control limits and would
always overflow on its own, and the desired action is
:attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK`.
If adding a message would exceed flow control limits and the desired
action is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR`,
or if a message would always exceed total flow control limits on
its own and the desired action is
plamut marked this conversation as resolved.
Show resolved Hide resolved
:attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK`,
meaning that the message would block forever.
"""
if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE:
return
Expand Down Expand Up @@ -123,10 +123,11 @@ def add(self, message):
load_info = self._load_info(
message_count=1, total_bytes=message.ByteSize()
)
error_msg = "Flow control limits too low for the message - {}.".format(
load_info
error_msg = (
"Total flow control limits too low for the message, "
"would block forever - {}.".format(load_info)
)
raise exceptions.PermanentlyBlockedError(error_msg)
raise exceptions.FlowControlLimitError(error_msg)

current_thread = threading.current_thread()

Expand All @@ -149,16 +150,12 @@ def add(self, message):
"{}.".format(self._load_info())
)

# Message accepted, increase the load and remove thread stats if
# they exist in the waiting queue.
# Message accepted, increase the load and remove thread stats.
self._message_count += 1
self._total_bytes += message.ByteSize()

reservation = self._byte_reservations.get(current_thread)
if reservation:
self._reserved_bytes -= reservation.reserved
del self._byte_reservations[current_thread]
self._waiting.remove(current_thread)
self._reserved_bytes -= self._byte_reservations[current_thread].reserved
del self._byte_reservations[current_thread]
self._waiting.remove(current_thread)

def release(self, message):
"""Release a mesage from flow control.
plamut marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -204,8 +201,16 @@ def _distribute_available_bytes(self):

reservation = self._byte_reservations[thread]
still_needed = reservation.needed - reservation.reserved
can_give = min(still_needed, available)

# Sanity check for any internal inconsistencies.
if still_needed < 0:
pradn marked this conversation as resolved.
Show resolved Hide resolved
msg = "Too many bytes reserved: {} / {}".format(
reservation.reserved, reservation.needed
)
warnings.warn(msg, category=RuntimeWarning)
still_needed = 0

can_give = min(still_needed, available)
reservation.reserved += can_give
self._reserved_bytes += can_give
available -= can_give
Expand Down
48 changes: 47 additions & 1 deletion tests/unit/pubsub_v1/publisher/test_flow_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,11 @@ def test_error_if_mesage_would_block_indefinitely():
# Now that we know that an error occurs, we can check its type directly
# without the fear of blocking indefinitely.
flow_controller = FlowController(settings) # we want a fresh controller
with pytest.raises(exceptions.PermanentlyBlockedError) as error_info:
with pytest.raises(exceptions.FlowControlLimitError) as error_info:
flow_controller.add(msg)

error_msg = str(error_info.value)
assert "would block forever" in error_msg
assert "messages: 1 / 0" in error_msg
assert "bytes: {} / 1".format(msg.ByteSize()) in error_msg

Expand Down Expand Up @@ -361,3 +362,48 @@ def test_threads_posting_large_messages_do_not_starve():

if not adding_busy_done.wait(timeout=1.0):
pytest.fail("Adding messages blocked or errored.")


def test_warning_on_internal_reservation_stats_error_when_unblocking():
settings = types.PublishFlowControl(
message_limit=1,
byte_limit=150,
limit_exceeded_behavior=types.LimitExceededBehavior.BLOCK,
)
flow_controller = FlowController(settings)

msg1 = types.PubsubMessage(data=b"x" * 100)
msg2 = types.PubsubMessage(data=b"y" * 100)

# If there is a concurrency bug in FlowController, we do not want to block
# the main thread running the tests, thus we delegate all add/release
# operations to daemon threads and check the outcome (blocked/not blocked)
# through Events.
adding_1_done = threading.Event()
adding_2_done = threading.Event()
releasing_1_done = threading.Event()

# Adding a message with free capacity should not block.
_run_in_daemon(flow_controller, "add", [msg1], adding_1_done)
if not adding_1_done.wait(timeout=0.1):
pytest.fail("Adding a message with enough flow capacity blocked or errored.")

# Adding messages when there is not enough capacity should block, even if
# added through multiple threads.
_run_in_daemon(flow_controller, "add", [msg2], adding_2_done)
if adding_2_done.wait(timeout=0.1):
pytest.fail("Adding a message on overflow did not block.")

# Intentionally corrupt internal stats
reservation = next(iter(flow_controller._byte_reservations.values()), None)
assert reservation is not None, "No messages blocked by flow controller."
reservation.reserved = reservation.needed + 1

with warnings.catch_warnings(record=True) as warned:
_run_in_daemon(flow_controller, "release", [msg1], releasing_1_done)
if not releasing_1_done.wait(timeout=0.1):
pytest.fail("Releasing a message blocked or errored.")

matches = [warning for warning in warned if warning.category is RuntimeWarning]
assert len(matches) == 1
assert "too many bytes reserved" in str(matches[0].message).lower()