-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pubsub: send message receipts #2580
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
load test shows no regression
} | ||
|
||
// @Test |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
pendingReceipts.drainTo(receiptsToSend.ackIds); | ||
logger.log(Level.FINER, "Sending {0} receipts", receiptsToSend.ackIds.size()); | ||
if (!receiptsToSend.ackIds.isEmpty()) { | ||
modifyAckDeadlinesToSend.add(receiptsToSend); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@mdietz94 please take a look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, it may be easier to instead just schedule the pending extension job to run immediately the first time instead of in stream ack deadline seconds, and then you won't need to explicitly keep track of pending receipts.
synchronized (pendingAcks) { | ||
pendingAcks.add(ackId); | ||
} | ||
destination = pendingAcks; | ||
// Record the latency rounded to the next closest integer. | ||
ackLatencyDistribution.record( |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
public void run() { | ||
try { | ||
processOutstandingAckOperations(); | ||
} catch (Exception e) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This commit also simplifies acks/nacks logic.
It is accessed from executor threads and must to synced to guarantee property updates are observed.
If the user code throws an exception, we catch this exception and nack the message. This logic cannot be tested correctly. The test uses a MessageReceiver that can notify us that messages have been processed, so that we can "advanceTime". The receiver can only notify us that it's throwing an exception BEFORE the exception is actually thrown out of the method. (Even with finally-clause, the statements in the clause runs before the giving control back to the caller.) Consequently, we're notifying too soon: the message has not been processed yet as the exception might not have been caught.
@mdietz94 Thank you for the review, PTAL.
I plan to soon make extension job run on schedule, kind of like what's ack/nack/receipt is doing in this PR. I found a pretty simple way to make it work with periodic stream deadline modification. However, I don't think I can make it work with immediate modack scheduling that receipt would need. If this PR looks "right but sub-optimal" to you, do you think we can submit this and follow up in another PR? I think this will give us better throughput; timezone makes code review latency rather high. I had to add a few changes to address some flakes. Looks like there's still one flakey test. I after analyzing the logs, I believe the problem is in the test itself, not the library, though I can't pinpoint where. I think the best course of action is to finish simplifying the library, then we can simplify the test to have fewer moving parts. I can take this on later as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, though as we simplify the subscriber I think it would be good to combine some of the logic for mod ack/receipt as discussed.
ack. (no pun intended) |
This commit also simplifies acks/nacks logic.