diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java index b3989fc974cb..9fd9bc837c5c 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java @@ -29,6 +29,4 @@ public interface AckReplyConsumer { * message. */ void nack(); - - void abandon(); } diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 6586dc68e535..5f5ebbaee204 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -133,7 +133,6 @@ private class AckHandler implements ApiFutureCallback { private final int outstandingBytes; private final long receivedTimeMillis; private final Instant totalExpiration; - private boolean extending = true; AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) { this.ackId = ackId; @@ -152,7 +151,6 @@ private void forget() { */ return; } - extending = false; flowController.release(1, outstandingBytes); messagesWaiter.incrementPendingMessages(-1); processOutstandingBatches(); @@ -419,11 +417,6 @@ public void ack() { public void nack() { response.set(AckReply.NACK); } - - @Override - public void abandon() { - ackHandler.forget(); - } }; ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); executor.execute( @@ -478,9 +471,6 @@ void extendDeadlines() { Instant extendTo = now.plusSeconds(extendSeconds); for (Map.Entry entry : pendingMessages.entrySet()) { - if (!entry.getValue().extending) { - continue; - } String ackId = entry.getKey(); Instant totalExpiration = entry.getValue().totalExpiration; if (totalExpiration.isAfter(extendTo)) { diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 2bd2a518bfef..785368bb13cb 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -148,14 +148,6 @@ public void testNack() throws Exception { assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0)); } - @Test - public void testAbandon() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); - consumers.take().abandon(); - dispatcher.extendDeadlines(); - assertThat(sentModAcks).doesNotContain(TEST_MESSAGE.getAckId()); - } - @Test public void testExtension() throws Exception { dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);