Skip to content

Commit

Permalink
Clean up after extension give up
Browse files Browse the repository at this point in the history
  • Loading branch information
csainty committed Sep 1, 2018
1 parent b9b4aff commit 430938f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ void extendDeadlines() {
int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS));
modacks.add(new PendingModifyAckDeadline(sec, ackId));
count++;
} else {
// TODO: How to get the byte count?
flowController.release(1, 0);
messagesWaiter.incrementPendingMessages(-1);
}
}
modacks.add(modack);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void run() {
private List<String> sentAcks;
private List<ModAckItem> sentModAcks;
private FakeClock clock;
private FlowController flowController;

@AutoValue
abstract static class ModAckItem {
Expand Down Expand Up @@ -101,6 +102,12 @@ public void sendAckOperations(
systemExecutor.shutdownNow();

clock = new FakeClock();
flowController =
new FlowController(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
.build());

dispatcher =
new MessageDispatcher(
Expand All @@ -109,7 +116,7 @@ public void sendAckOperations(
Duration.ofSeconds(5),
Duration.ofMinutes(60),
new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1),
new FlowController(FlowControlSettings.newBuilder().build()),
flowController,
new LinkedList<MessageDispatcher.OutstandingMessageBatch>(),
MoreExecutors.directExecutor(),
systemExecutor,
Expand Down Expand Up @@ -182,6 +189,10 @@ public void testExtension_GiveUp() throws Exception {
clock.advance(1, TimeUnit.DAYS);
dispatcher.extendDeadlines();
assertThat(sentModAcks).isEmpty();

// We should be able to reserve another item in the flow controller and not block shutdown
flowController.reserve(1, 0);
dispatcher.stop();
}

@Test
Expand Down

0 comments on commit 430938f

Please sign in to comment.