Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PubSub] Avoid processing pubsub messages whose ack deadline has already expired #3734

Closed
wants to merge 1 commit into from

Conversation

csainty
Copy link

@csainty csainty commented Sep 26, 2018

This stems from the same investigation work as #3633 but I believe is a better solution to the problem. It also relates to this guidance https://cloud.google.com/pubsub/docs/pull#dealing-with-large-backlogs-of-small-messages

I don't feel the current client makes a best-effort attempt to reach at-most-once processing. While it is not a service guarantee the move to StreamingPull has made it a common occurrence in our setup to see backlogs of messages that simply can not be consumed due to acks being rejected.
I want to emphasise this, I do not mean simply we get duplicates, I mean we get so many duplicates that the backlog never finishes. We had a 400k message backlog open for over 48 hours trying to hack a solution to this, it was not until we switched away from the default StreamingPull subscriber and use the raw pull methods that we were able to chew through this backlog in under and hour.

This change checks whether an ack deadline has already passed before handing the message off to the MessageHandler, we know the ack would be rejected so why knowingly double process the message.

Most controversial in here is that I made the subscriber fetch the subscription metadata so it has the default deadline at hand to take in to account. Without this, setting setMessageDeadlineSeconds(Duration.ZERO) results in nothing being processed as the code thinks the message is already expired.
If pulling that metadata is a big issue, I think assuming the default as the minimum deadline allowed (10s) would also work at the cost of nacking and re-pulling additional messages in cases that have a longer default deadline.

With all this in place I can effectively revert #3633 and allow the message to be picked up and handled in the standard flow, which tidies the code up somewhat.

I have a sample program here - https://github.com/csainty/pubsub-fun/blob/master/src/main/java/App.java - that assuming your core-count is similar to mine will receive up to 40% duplicate messages due to ack rejection.

@csainty csainty requested a review from pongad as a code owner September 26, 2018 19:12
@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label Sep 26, 2018
@pongad
Copy link
Contributor

pongad commented Sep 26, 2018

@csainty Thank you for this PR! I think the general idea is good, though I see two issues

  • I think maxAckExtensionPeriod.compareTo(subscriptionDeadline) >= 0 ? maxAckExtensionPeriod : subscriptionDeadline is essentially a no-op. maxAckExtensionPeriod is set to one hour so I think it's always going to be larger than any subscription deadline. I think this line can be safely removed. Then we no longer have to query for the subscription deadline at runtime.

    • Looking up subscription deadline is unfortunately problematic. It is considered an "admin" API so its quota is typically low. It has caused problems in the past when users try to start up many subscribers.
  • I think this undoes an important part of Pubsub: Clean up after message extension gives up #3633. Imagine an opposite problem when working on a message takes a long time. It is possible that we start working on the message before its total expiration but we don't finish until after. In this case, the logic in this PR won't release the flow control properly.

For the second, I think we can keep the check here but also keep the logic added by #3633. To avoid the problem of freeing twice, we can do something like this both on L457 and L147.

if (pendingMessages.remove(handler) != null) {
  flowController.release(...);	
  messagesWaiter.incrementPendingMessages(-1);
}

I realize this might be more work than you signed up for. If you want to give this a go, you're absolutely welcome to. If not, I can take it on and tag you in the PR.

@csainty
Copy link
Author

csainty commented Sep 27, 2018

maxAckExtensionPeriod defaults to an hour, but can be set all the way down to 0 in the builder. At which point the current code is always processing messages that to the best of its knowledge have expired ack deadlines. Putting a lower bound on what the builder can set, or changing behaviour based on a 0 value would save from needing to pull the metadata on the subscription. I just didn't feel in a position to decide those bounds.

I actually had a fix for the double release earlier in this exploration, then tricked myself in to thinking it wasn't needed. But you are right it still is.

I don't mind working towards the right solution, but ultimately just want it fixed, so it won't bruise my ego if you have some good solutions in mind and want to give them a try.

@csainty
Copy link
Author

csainty commented Sep 28, 2018

Closed for #3743

@csainty csainty closed this Sep 28, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants