From d725b43739ceecf943e089f947b3dcecf2b6dd37 Mon Sep 17 00:00:00 2001 From: Chris Sainty Date: Wed, 26 Sep 2018 20:45:11 +0200 Subject: [PATCH] Avoid processing pubsub messages whose ack deadline has already expired --- .../cloud/pubsub/v1/MessageDispatcher.java | 34 ++++++++++++------- .../v1/StreamingSubscriberConnection.java | 6 ++-- .../google/cloud/pubsub/v1/Subscriber.java | 13 ++++++- .../pubsub/v1/MessageDispatcherTest.java | 20 ++++++++--- 4 files changed, 53 insertions(+), 20 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index bb50e18813de..32e9e8acabcb 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -78,7 +78,7 @@ class MessageDispatcher { private final MessageWaiter messagesWaiter; // Maps ID to "total expiration time". If it takes longer than this, stop extending. - private final ConcurrentMap pendingMessages = new ConcurrentHashMap<>(); + private final ConcurrentMap pendingMessages = new ConcurrentHashMap<>(); private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>(); @@ -96,6 +96,9 @@ class MessageDispatcher { // To keep track of number of seconds the receiver takes to process messages. private final Distribution ackLatencyDistribution; + // The deadline set on the subscription + private final Duration subscriptionDeadline; + /** Stores the data needed to asynchronously modify acknowledgement deadlines. */ static class PendingModifyAckDeadline { final List ackIds; @@ -141,7 +144,7 @@ private class AckHandler implements ApiFutureCallback { } private void onBoth(LinkedBlockingQueue destination) { - pendingMessages.remove(this); + pendingMessages.remove(this.ackId); destination.add(ackId); flowController.release(1, outstandingBytes); messagesWaiter.incrementPendingMessages(-1); @@ -193,7 +196,8 @@ void sendAckOperations( Deque outstandingMessageBatches, Executor executor, ScheduledExecutorService systemExecutor, - ApiClock clock) { + ApiClock clock, + Duration subscriptionDeadline) { this.executor = executor; this.systemExecutor = systemExecutor; this.ackExpirationPadding = ackExpirationPadding; @@ -207,6 +211,7 @@ void sendAckOperations( jobLock = new ReentrantLock(); messagesWaiter = new MessageWaiter(); this.clock = clock; + this.subscriptionDeadline = subscriptionDeadline; } public void start() { @@ -329,15 +334,15 @@ public void processReceivedMessages(List messages, Runnable don } messagesWaiter.incrementPendingMessages(messages.size()); - - Instant totalExpiration = now().plus(maxAckExtensionPeriod); + Duration expectedExpiration = maxAckExtensionPeriod.compareTo(subscriptionDeadline) >= 0 ? maxAckExtensionPeriod : subscriptionDeadline; + Instant totalExpiration = now().plus(expectedExpiration); OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback); for (ReceivedMessage message : messages) { AckHandler ackHandler = new AckHandler(message.getAckId(), message.getMessage().getSerializedSize()); outstandingBatch.addMessage(message, ackHandler); pendingReceipts.add(message.getAckId()); - pendingMessages.put(ackHandler, totalExpiration); + pendingMessages.put(message.getAckId(), totalExpiration); } synchronized (outstandingMessageBatches) { outstandingMessageBatches.add(outstandingBatch); @@ -398,7 +403,13 @@ public void nack() { @Override public void run() { try { - receiver.receiveMessage(message, consumer); + Instant expiration = pendingMessages.get(ackHandler.ackId); + if (expiration == null || expiration.isBefore(now())) { + // Message expired while waiting, do not process as the ack would be rejected + consumer.nack(); + } else { + receiver.receiveMessage(message, consumer); + } } catch (Exception e) { response.setException(e); } @@ -434,10 +445,10 @@ void extendDeadlines() { Instant extendTo = now.plusSeconds(extendSeconds); int count = 0; - Iterator> it = pendingMessages.entrySet().iterator(); + Iterator> it = pendingMessages.entrySet().iterator(); while (it.hasNext()) { - Map.Entry entry = it.next(); - String ackId = entry.getKey().ackId; + Map.Entry entry = it.next(); + String ackId = entry.getKey(); Instant totalExpiration = entry.getValue(); // TODO(pongad): PendingModifyAckDeadline is created to dance around polling pull, // since one modack RPC only takes one expiration. @@ -453,9 +464,6 @@ void extendDeadlines() { int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS)); modacks.add(new PendingModifyAckDeadline(sec, ackId)); count++; - } else { - flowController.release(1, entry.getKey().outstandingBytes); - messagesWaiter.incrementPendingMessages(-1); } } modacks.add(modack); diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 17a913935392..5d7f3cbb731f 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -86,7 +86,8 @@ public StreamingSubscriberConnection( Deque outstandingMessageBatches, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, - ApiClock clock) { + ApiClock clock, + Duration subscriptionDeadline) { this.subscription = subscription; this.systemExecutor = systemExecutor; this.stub = stub; @@ -101,7 +102,8 @@ public StreamingSubscriberConnection( outstandingMessageBatches, executor, systemExecutor, - clock); + clock, + subscriptionDeadline); } @Override diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index cf35733bda8b..ca6b6cc05418 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -39,6 +39,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.pubsub.v1.GetSubscriptionRequest; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.SubscriberGrpc; import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub; @@ -335,9 +336,18 @@ private void startStreamingConnections() throws IOException { for (Channel channel : channels) { SubscriberStub stub = SubscriberGrpc.newStub(channel); + SubscriberGrpc.SubscriberBlockingStub blockingStub = SubscriberGrpc.newBlockingStub(channel); if (callCredentials != null) { stub = stub.withCallCredentials(callCredentials); + blockingStub = blockingStub.withCallCredentials(callCredentials); } + + GetSubscriptionRequest request = GetSubscriptionRequest.newBuilder() + .setSubscription(subscriptionName) + .build(); + int ackDeadlineSeconds = blockingStub.getSubscription(request) + .getAckDeadlineSeconds(); + streamingSubscriberConnections.add( new StreamingSubscriberConnection( subscriptionName, @@ -350,7 +360,8 @@ private void startStreamingConnections() throws IOException { outstandingMessageBatches, executor, alarmsExecutor, - clock)); + clock, + Duration.ofSeconds(ackDeadlineSeconds))); } startConnections( streamingSubscriberConnections, diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 75296dd89c87..3d56b5128202 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -27,6 +27,7 @@ import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -43,6 +44,11 @@ public class MessageDispatcherTest { .setAckId("ackid") .setMessage(PubsubMessage.newBuilder().setData(ByteString.EMPTY).build()) .build(); + private static final ReceivedMessage TEST_MESSAGE_2 = + ReceivedMessage.newBuilder() + .setAckId("ackid_2") + .setMessage(PubsubMessage.newBuilder().setData(ByteString.EMPTY).build()) + .build(); private static final Runnable NOOP_RUNNABLE = new Runnable() { @Override @@ -120,7 +126,8 @@ public void sendAckOperations( new LinkedList(), MoreExecutors.directExecutor(), systemExecutor, - clock); + clock, + Duration.ofSeconds(10)); dispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS); } @@ -189,10 +196,15 @@ public void testExtension_GiveUp() throws Exception { clock.advance(1, TimeUnit.DAYS); dispatcher.extendDeadlines(); assertThat(sentModAcks).isEmpty(); + } - // We should be able to reserve another item in the flow controller and not block shutdown - flowController.reserve(1, 0); - dispatcher.stop(); + @Test + public void testExtension_AvoidProcessingThatCanNotAck() throws Exception { + dispatcher.processReceivedMessages(Arrays.asList(TEST_MESSAGE, TEST_MESSAGE_2), NOOP_RUNNABLE); + clock.advance(1, TimeUnit.DAYS); + consumers.poll().ack(); + dispatcher.processOutstandingBatches(); + assertThat(consumers).isEmpty(); } @Test