Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub: Clean up after message extension gives up #3633

Merged
merged 1 commit into from
Sep 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class MessageDispatcher {
private final MessageWaiter messagesWaiter;

// Maps ID to "total expiration time". If it takes longer than this, stop extending.
private final ConcurrentMap<String, Instant> pendingMessages = new ConcurrentHashMap<>();
private final ConcurrentMap<AckHandler, Instant> pendingMessages = new ConcurrentHashMap<>();

private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -141,7 +141,7 @@ private class AckHandler implements FutureCallback<AckReply> {
}

private void onBoth(LinkedBlockingQueue<String> destination) {
pendingMessages.remove(ackId);
pendingMessages.remove(this);
destination.add(ackId);
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
Expand Down Expand Up @@ -329,17 +329,15 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
}
messagesWaiter.incrementPendingMessages(messages.size());

Instant totalExpiration = now().plus(maxAckExtensionPeriod);
for (ReceivedMessage message : messages) {
pendingReceipts.add(message.getAckId());
pendingMessages.put(message.getAckId(), totalExpiration);
}

Instant totalExpiration = now().plus(maxAckExtensionPeriod);
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
for (ReceivedMessage message : messages) {
AckHandler ackHandler =
new AckHandler(message.getAckId(), message.getMessage().getSerializedSize());
outstandingBatch.addMessage(message, ackHandler);
pendingReceipts.add(message.getAckId());
pendingMessages.put(ackHandler, totalExpiration);
}
synchronized (outstandingMessageBatches) {
outstandingMessageBatches.add(outstandingBatch);
Expand Down Expand Up @@ -436,10 +434,10 @@ void extendDeadlines() {
Instant extendTo = now.plusSeconds(extendSeconds);

int count = 0;
Iterator<Map.Entry<String, Instant>> it = pendingMessages.entrySet().iterator();
Iterator<Map.Entry<AckHandler, Instant>> it = pendingMessages.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, Instant> entry = it.next();
String ackId = entry.getKey();
Map.Entry<AckHandler, Instant> entry = it.next();
String ackId = entry.getKey().ackId;
Instant totalExpiration = entry.getValue();
// TODO(pongad): PendingModifyAckDeadline is created to dance around polling pull,
// since one modack RPC only takes one expiration.
Expand All @@ -455,6 +453,9 @@ void extendDeadlines() {
int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS));
modacks.add(new PendingModifyAckDeadline(sec, ackId));
count++;
} else {
flowController.release(1, entry.getKey().outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
}
}
modacks.add(modack);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void run() {
private List<String> sentAcks;
private List<ModAckItem> sentModAcks;
private FakeClock clock;
private FlowController flowController;

@AutoValue
abstract static class ModAckItem {
Expand Down Expand Up @@ -101,6 +102,12 @@ public void sendAckOperations(
systemExecutor.shutdownNow();

clock = new FakeClock();
flowController =
new FlowController(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
.build());

dispatcher =
new MessageDispatcher(
Expand All @@ -109,7 +116,7 @@ public void sendAckOperations(
Duration.ofSeconds(5),
Duration.ofMinutes(60),
new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1),
new FlowController(FlowControlSettings.newBuilder().build()),
flowController,
new LinkedList<MessageDispatcher.OutstandingMessageBatch>(),
MoreExecutors.directExecutor(),
systemExecutor,
Expand Down Expand Up @@ -182,6 +189,10 @@ public void testExtension_GiveUp() throws Exception {
clock.advance(1, TimeUnit.DAYS);
dispatcher.extendDeadlines();
assertThat(sentModAcks).isEmpty();

// We should be able to reserve another item in the flow controller and not block shutdown
flowController.reserve(1, 0);

This comment was marked as spam.

dispatcher.stop();
}

@Test
Expand Down