Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PubSub] Avoid processing pubsub messages whose ack deadline has already expired #3734

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class MessageDispatcher {
private final MessageWaiter messagesWaiter;

// Maps ID to "total expiration time". If it takes longer than this, stop extending.
private final ConcurrentMap<AckHandler, Instant> pendingMessages = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Instant> pendingMessages = new ConcurrentHashMap<>();

private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue<>();
Expand All @@ -96,6 +96,9 @@ class MessageDispatcher {
// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;

// The deadline set on the subscription
private final Duration subscriptionDeadline;

/** Stores the data needed to asynchronously modify acknowledgement deadlines. */
static class PendingModifyAckDeadline {
final List<String> ackIds;
Expand Down Expand Up @@ -141,7 +144,7 @@ private class AckHandler implements ApiFutureCallback<AckReply> {
}

private void onBoth(LinkedBlockingQueue<String> destination) {
pendingMessages.remove(this);
pendingMessages.remove(this.ackId);
destination.add(ackId);
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
Expand Down Expand Up @@ -193,7 +196,8 @@ void sendAckOperations(
Deque<OutstandingMessageBatch> outstandingMessageBatches,
Executor executor,
ScheduledExecutorService systemExecutor,
ApiClock clock) {
ApiClock clock,
Duration subscriptionDeadline) {
this.executor = executor;
this.systemExecutor = systemExecutor;
this.ackExpirationPadding = ackExpirationPadding;
Expand All @@ -207,6 +211,7 @@ void sendAckOperations(
jobLock = new ReentrantLock();
messagesWaiter = new MessageWaiter();
this.clock = clock;
this.subscriptionDeadline = subscriptionDeadline;
}

public void start() {
Expand Down Expand Up @@ -329,15 +334,15 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
}
messagesWaiter.incrementPendingMessages(messages.size());


Instant totalExpiration = now().plus(maxAckExtensionPeriod);
Duration expectedExpiration = maxAckExtensionPeriod.compareTo(subscriptionDeadline) >= 0 ? maxAckExtensionPeriod : subscriptionDeadline;
Instant totalExpiration = now().plus(expectedExpiration);
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
for (ReceivedMessage message : messages) {
AckHandler ackHandler =
new AckHandler(message.getAckId(), message.getMessage().getSerializedSize());
outstandingBatch.addMessage(message, ackHandler);
pendingReceipts.add(message.getAckId());
pendingMessages.put(ackHandler, totalExpiration);
pendingMessages.put(message.getAckId(), totalExpiration);
}
synchronized (outstandingMessageBatches) {
outstandingMessageBatches.add(outstandingBatch);
Expand Down Expand Up @@ -398,7 +403,13 @@ public void nack() {
@Override
public void run() {
try {
receiver.receiveMessage(message, consumer);
Instant expiration = pendingMessages.get(ackHandler.ackId);
if (expiration == null || expiration.isBefore(now())) {
// Message expired while waiting, do not process as the ack would be rejected
consumer.nack();
} else {
receiver.receiveMessage(message, consumer);
}
} catch (Exception e) {
response.setException(e);
}
Expand Down Expand Up @@ -434,10 +445,10 @@ void extendDeadlines() {
Instant extendTo = now.plusSeconds(extendSeconds);

int count = 0;
Iterator<Map.Entry<AckHandler, Instant>> it = pendingMessages.entrySet().iterator();
Iterator<Map.Entry<String, Instant>> it = pendingMessages.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<AckHandler, Instant> entry = it.next();
String ackId = entry.getKey().ackId;
Map.Entry<String, Instant> entry = it.next();
String ackId = entry.getKey();
Instant totalExpiration = entry.getValue();
// TODO(pongad): PendingModifyAckDeadline is created to dance around polling pull,
// since one modack RPC only takes one expiration.
Expand All @@ -453,9 +464,6 @@ void extendDeadlines() {
int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS));
modacks.add(new PendingModifyAckDeadline(sec, ackId));
count++;
} else {
flowController.release(1, entry.getKey().outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
}
}
modacks.add(modack);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public StreamingSubscriberConnection(
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
ScheduledExecutorService executor,
ScheduledExecutorService systemExecutor,
ApiClock clock) {
ApiClock clock,
Duration subscriptionDeadline) {
this.subscription = subscription;
this.systemExecutor = systemExecutor;
this.stub = stub;
Expand All @@ -101,7 +102,8 @@ public StreamingSubscriberConnection(
outstandingMessageBatches,
executor,
systemExecutor,
clock);
clock,
subscriptionDeadline);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.pubsub.v1.GetSubscriptionRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.SubscriberGrpc;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub;
Expand Down Expand Up @@ -335,9 +336,18 @@ private void startStreamingConnections() throws IOException {

for (Channel channel : channels) {
SubscriberStub stub = SubscriberGrpc.newStub(channel);
SubscriberGrpc.SubscriberBlockingStub blockingStub = SubscriberGrpc.newBlockingStub(channel);
if (callCredentials != null) {
stub = stub.withCallCredentials(callCredentials);
blockingStub = blockingStub.withCallCredentials(callCredentials);
}

GetSubscriptionRequest request = GetSubscriptionRequest.newBuilder()
.setSubscription(subscriptionName)
.build();
int ackDeadlineSeconds = blockingStub.getSubscription(request)
.getAckDeadlineSeconds();

streamingSubscriberConnections.add(
new StreamingSubscriberConnection(
subscriptionName,
Expand All @@ -350,7 +360,8 @@ private void startStreamingConnections() throws IOException {
outstandingMessageBatches,
executor,
alarmsExecutor,
clock));
clock,
Duration.ofSeconds(ackDeadlineSeconds)));
}
startConnections(
streamingSubscriberConnections,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -43,6 +44,11 @@ public class MessageDispatcherTest {
.setAckId("ackid")
.setMessage(PubsubMessage.newBuilder().setData(ByteString.EMPTY).build())
.build();
private static final ReceivedMessage TEST_MESSAGE_2 =
ReceivedMessage.newBuilder()
.setAckId("ackid_2")
.setMessage(PubsubMessage.newBuilder().setData(ByteString.EMPTY).build())
.build();
private static final Runnable NOOP_RUNNABLE =
new Runnable() {
@Override
Expand Down Expand Up @@ -120,7 +126,8 @@ public void sendAckOperations(
new LinkedList<MessageDispatcher.OutstandingMessageBatch>(),
MoreExecutors.directExecutor(),
systemExecutor,
clock);
clock,
Duration.ofSeconds(10));
dispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS);
}

Expand Down Expand Up @@ -189,10 +196,15 @@ public void testExtension_GiveUp() throws Exception {
clock.advance(1, TimeUnit.DAYS);
dispatcher.extendDeadlines();
assertThat(sentModAcks).isEmpty();
}

// We should be able to reserve another item in the flow controller and not block shutdown
flowController.reserve(1, 0);
dispatcher.stop();
@Test
public void testExtension_AvoidProcessingThatCanNotAck() throws Exception {
dispatcher.processReceivedMessages(Arrays.asList(TEST_MESSAGE, TEST_MESSAGE_2), NOOP_RUNNABLE);
clock.advance(1, TimeUnit.DAYS);
consumers.poll().ack();
dispatcher.processOutstandingBatches();
assertThat(consumers).isEmpty();
}

@Test
Expand Down