From 46a11db49096b866a9276e69497dfa1cb35922e9 Mon Sep 17 00:00:00 2001 From: Charles Li Date: Wed, 12 Dec 2018 13:42:24 -0500 Subject: [PATCH 1/3] add abandonment --- .../com/google/cloud/pubsub/v1/AckReplyConsumer.java | 4 +++- .../com/google/cloud/pubsub/v1/MessageDispatcher.java | 9 +++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java index d659563f9cef..78e4d8fdff4b 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java @@ -25,10 +25,12 @@ public interface AckReplyConsumer { * message again. */ void ack(); - + /** * Signals that the message has not been successfully processed. The service should resend the * message. */ void nack(); + + void abandon(); } diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index cda1eefea4cd..e3b06efecc65 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -134,6 +134,7 @@ private class AckHandler implements ApiFutureCallback { private final int outstandingBytes; private final long receivedTimeMillis; private final Instant totalExpiration; + private boolean abandon = false; AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) { this.ackId = ackId; @@ -415,6 +416,11 @@ public void ack() { public void nack() { response.set(AckReply.NACK); } + + @Override + public void abandon() { + ackHandler.abandon = true; + } }; ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); executor.execute( @@ -466,6 +472,9 @@ void extendDeadlines() { Instant extendTo = now.plusSeconds(extendSeconds); for (Map.Entry entry : pendingMessages.entrySet()) { + if (entry.getValue().abandon) { + continue; + } String ackId = entry.getKey(); Instant totalExpiration = entry.getValue().totalExpiration; if (totalExpiration.isAfter(extendTo)) { From 5fe96d172aa98c0c3ed9577a16f70fe55a794b7a Mon Sep 17 00:00:00 2001 From: Charles Li Date: Thu, 13 Dec 2018 09:07:56 -0500 Subject: [PATCH 2/3] add unit test for abandon --- .../com/google/cloud/pubsub/v1/MessageDispatcherTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 75296dd89c87..35487174b049 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -148,6 +148,14 @@ public void testNack() throws Exception { assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0)); } + @Test + public void testAbandon() throws Exception { + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + consumers.take().abandon(); + dispatcher.extendDeadlines(); + assertThat(sentModAcks).doesNotContain(TEST_MESSAGE.getAckId()); + } + @Test public void testExtension() throws Exception { dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); From f8ed6ae78d23b46e760b7425f2e9e8f2eb04efb2 Mon Sep 17 00:00:00 2001 From: Charles Li Date: Thu, 20 Dec 2018 11:05:08 -0500 Subject: [PATCH 3/3] update forget() to stop extending deadline --- .../java/com/google/cloud/pubsub/v1/MessageDispatcher.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index e3b06efecc65..b0b3c5fd27b1 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -134,7 +134,7 @@ private class AckHandler implements ApiFutureCallback { private final int outstandingBytes; private final long receivedTimeMillis; private final Instant totalExpiration; - private boolean abandon = false; + private boolean extending = true; AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) { this.ackId = ackId; @@ -153,6 +153,7 @@ private void forget() { */ return; } + extending = false; flowController.release(1, outstandingBytes); messagesWaiter.incrementPendingMessages(-1); processOutstandingBatches(); @@ -419,7 +420,7 @@ public void nack() { @Override public void abandon() { - ackHandler.abandon = true; + ackHandler.forget(); } }; ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); @@ -472,7 +473,7 @@ void extendDeadlines() { Instant extendTo = now.plusSeconds(extendSeconds); for (Map.Entry entry : pendingMessages.entrySet()) { - if (entry.getValue().abandon) { + if (!entry.getValue().extending) { continue; } String ackId = entry.getKey();