Skip to content

Commit

Permalink
Revert "Pub/Sub: Add message abandonment (googleapis#4250)"
Browse files Browse the repository at this point in the history
This reverts commit 6e2c2dd.
  • Loading branch information
chingor13 committed Jan 7, 2019
1 parent 978f3b7 commit 9776fab
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,4 @@ public interface AckReplyConsumer {
* message.
*/
void nack();

void abandon();
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ private class AckHandler implements ApiFutureCallback<AckReply> {
private final int outstandingBytes;
private final long receivedTimeMillis;
private final Instant totalExpiration;
private boolean extending = true;

AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) {
this.ackId = ackId;
Expand All @@ -152,7 +151,6 @@ private void forget() {
*/
return;
}
extending = false;
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
Expand Down Expand Up @@ -419,11 +417,6 @@ public void ack() {
public void nack() {
response.set(AckReply.NACK);
}

@Override
public void abandon() {
ackHandler.forget();
}
};
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
executor.execute(
Expand Down Expand Up @@ -478,9 +471,6 @@ void extendDeadlines() {
Instant extendTo = now.plusSeconds(extendSeconds);

for (Map.Entry<String, AckHandler> entry : pendingMessages.entrySet()) {
if (!entry.getValue().extending) {
continue;
}
String ackId = entry.getKey();
Instant totalExpiration = entry.getValue().totalExpiration;
if (totalExpiration.isAfter(extendTo)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,6 @@ public void testNack() throws Exception {
assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0));
}

@Test
public void testAbandon() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
consumers.take().abandon();
dispatcher.extendDeadlines();
assertThat(sentModAcks).doesNotContain(TEST_MESSAGE.getAckId());
}

@Test
public void testExtension() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
Expand Down

0 comments on commit 9776fab

Please sign in to comment.