-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PubSub: Fix streaming pull incorrectly handling FlowControl max_messages setting #7948
Conversation
Rebased because of a merge conflict when #7954 was merged. |
Putting on hold, because the change is a non-trivial one, and we want to make a PubSub release first. We will unblock the PR once done (reviews still welcome, though). cc: @sduskis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, but I want to look over this more. I have a few documentation comments.
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
Outdated
Show resolved
Hide resolved
1b8a212
to
7e5b9d7
Compare
@sduskis Made the changes suggested, the diff can be seen in the first force push. (the second force push was just rebasing on top of the latest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I keep on looking through this, and don't see anything wrong, per se. Given the size and complexity, I'm personally not comfortable Approving. @tseaver, would you be able to lend a hand and review this PR?
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
Show resolved
Hide resolved
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
Show resolved
Hide resolved
In certain cases automatically leasing Message instances upon creation might not be desired, thus an optional parameter is added to Message initializer that allows skipping that. The default behavior is not changed, new Message instances *are* automatically leased upon creation.
Leasing messages through a request queue in dispatcher causes a race condition with the ConsumeBidirectionalStream thread. A request to pause the background consumer can arrive when the Bidi consumer is just about to fetch the the next batch of messages, and thus the latter gets paused only *after* fetching those messages. This commit synchronously leases received messages in the streaming pull manager callback. If that hits the lease management load limit, the background consumer is paused synchronously, and will correctly pause *before* pulling another batch of messages.
If the PubSub backend sends too many messages in a single response that would cause the leaser overload should all these messeges were added to it, the StreamingPullManager now puts excessive messages into an internal holding buffer. The messages are released from the buffer when the leaser again has enough capacity (as defined by the FlowControl settings), and the message received callback is invoked then as well.
With the StreamingPullManager._on_response() callback adding received messages to the leaser synchronously (in the background consumer thread), a race condition can happen with the dispatcher thread that can asynchronously add (remove) messages to (from) lease management, e.g. on ack() and nack() requests. The same is the case with related operations of maybe pausing/resuming the background consumer. This commit thus adds locks in key places, assuring that these operations are atomic, ant not subject to race conditions.
Closes #7677.
This PR fixes how subscriber client handles the FlowControl.max_messages setting. It makes sure that subscriber only delivers (i.e. invokes callbacks)
max_messages
at a time, and resumes invoking callbacks only when the user code acknowledges some of the previously delivered messages.How to test
Steps to reproduce:
Actual result (before the fix):
FlowControl.max_messages
.Expected result (after the fix):
max_messages
):max_messages
pending messages (received, but not yet acknowledged) - the"pending ACK"
figure in the logs. The streaming pull process waits until some of the messages are ACK-ed before delivering new messages.Footnotes
For
max_messages
settings greater than 10, one might observe that a maximum of 10 (or less) messages are delivered to the client code at once, especially if message callbacks are slow, e.g. by artificially sleeping for prolonged periods of time.This is caused by the default callback thread pool size, which has
max_workers
set to 10.This can be mitigated by creating the subscriber client with a custom
scheduler
that uses a thread poll with a higher cap on the worker thread count, or by using batch callbacks for received messages. The latter feature is almost done, and will submit a PR for it soon.