Skip to content

Commit

Permalink
pubsub: send message receipts (#2580)
Browse files Browse the repository at this point in the history
This commit also simplifies acks/nacks logic.

* synchronize TestReceiver methods

It is accessed from executor threads and must to synced
to guarantee property updates are observed.

* deflake test

If the user code throws an exception, we catch this exception and nack
the message.
This logic cannot be tested correctly.
The test uses a MessageReceiver that can notify us that messages
have been processed, so that we can "advanceTime".

The receiver can only notify us that it's throwing an exception BEFORE
the exception is actually thrown out of the method.
(Even with finally-clause, the statements in the clause runs before
the giving control back to the caller.)
Consequently, we're notifying too soon: the message has not been
processed yet as the exception might not have been caught.
  • Loading branch information
pongad authored Nov 9, 2017
1 parent a53f7a8 commit dadb6ff
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -73,8 +72,9 @@ class MessageDispatcher {
private final MessageWaiter messagesWaiter;

private final PriorityQueue<ExtensionJob> outstandingAckHandlers;
private final Set<String> pendingAcks;
private final Set<String> pendingNacks;
private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<String> pendingReceipts = new LinkedBlockingQueue<>();

private final Lock alarmsLock;
// The deadline should be set by the subscriber connection before use,
Expand Down Expand Up @@ -185,47 +185,41 @@ private class AckHandler implements FutureCallback<AckReply> {
receivedTimeMillis = clock.millisTime();
}

private void onBoth(LinkedBlockingQueue<String> destination) {
acked.getAndSet(true);
destination.add(ackId);
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
}

@Override
public void onFailure(Throwable t) {
logger.log(
Level.WARNING,
"MessageReceiver failed to processes ack ID: " + ackId + ", the message will be nacked.",
t);
acked.getAndSet(true);
synchronized (pendingNacks) {
pendingNacks.add(ackId);
}
setupPendingAcksAlarm();
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
onBoth(pendingNacks);
}

@Override
public void onSuccess(AckReply reply) {
acked.getAndSet(true);
LinkedBlockingQueue<String> destination;
switch (reply) {
case ACK:
synchronized (pendingAcks) {
pendingAcks.add(ackId);
}
destination = pendingAcks;
// Record the latency rounded to the next closest integer.
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
break;
case NACK:
synchronized (pendingNacks) {
pendingNacks.add(ackId);
}
destination = pendingNacks;
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
}
setupPendingAcksAlarm();
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
onBoth(destination);
}
}

Expand Down Expand Up @@ -254,8 +248,6 @@ void sendAckOperations(
this.flowController = flowController;
this.outstandingMessageBatches = outstandingMessageBatches;
outstandingAckHandlers = new PriorityQueue<>();
pendingAcks = new HashSet<>();
pendingNacks = new HashSet<>();
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
this.ackLatencyDistribution = ackLatencyDistribution;
alarmsLock = new ReentrantLock();
Expand All @@ -264,6 +256,25 @@ void sendAckOperations(
this.clock = clock;
}

public void start() {
pendingAcksAlarm =
systemExecutor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
try {
processOutstandingAckOperations();
} catch (Throwable t) {
// Catch everything so that one run failing doesn't prevent subsequent runs.
logger.log(Level.WARNING, "failed to send acks/nacks", t);
}
}
},
PENDING_ACKS_SEND_DELAY.toMillis(),
PENDING_ACKS_SEND_DELAY.toMillis(),
TimeUnit.MILLISECONDS);
}

public void stop() {
messagesWaiter.waitNoMessages();
alarmsLock.lock();
Expand All @@ -272,6 +283,10 @@ public void stop() {
ackDeadlineExtensionAlarm.cancel(true);
ackDeadlineExtensionAlarm = null;
}
if (pendingAcksAlarm != null) {
pendingAcksAlarm.cancel(false);
pendingAcksAlarm = null;
}
} finally {
alarmsLock.unlock();
}
Expand Down Expand Up @@ -328,6 +343,9 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
return;
}
messagesWaiter.incrementPendingMessages(messages.size());
for (ReceivedMessage message : messages) {
pendingReceipts.add(message.getAckId());
}

OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(messages.size());
Expand Down Expand Up @@ -421,32 +439,6 @@ public void run() {
}
}

private void setupPendingAcksAlarm() {
alarmsLock.lock();
try {
if (pendingAcksAlarm == null) {
pendingAcksAlarm =
systemExecutor.schedule(
new Runnable() {
@Override
public void run() {
alarmsLock.lock();
try {
pendingAcksAlarm = null;
} finally {
alarmsLock.unlock();
}
processOutstandingAckOperations();
}
},
PENDING_ACKS_SEND_DELAY.toMillis(),
TimeUnit.MILLISECONDS);
}
} finally {
alarmsLock.unlock();
}
}

private class AckDeadlineAlarm implements Runnable {
@Override
public void run() {
Expand Down Expand Up @@ -574,31 +566,26 @@ private void processOutstandingAckOperations(
List<PendingModifyAckDeadline> ackDeadlineExtensions) {
List<PendingModifyAckDeadline> modifyAckDeadlinesToSend =
Lists.newArrayList(ackDeadlineExtensions);
List<String> acksToSend = new ArrayList<>(pendingAcks.size());
synchronized (pendingAcks) {
if (!pendingAcks.isEmpty()) {
try {
acksToSend = new ArrayList<>(pendingAcks);
logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());
} finally {
pendingAcks.clear();
}
}
}

List<String> acksToSend = new ArrayList<>();
pendingAcks.drainTo(acksToSend);
logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());

PendingModifyAckDeadline nacksToSend = new PendingModifyAckDeadline(0);
synchronized (pendingNacks) {
if (!pendingNacks.isEmpty()) {
try {
for (String ackId : pendingNacks) {
nacksToSend.addAckId(ackId);
}
logger.log(Level.FINER, "Sending {0} nacks", pendingNacks.size());
} finally {
pendingNacks.clear();
}
modifyAckDeadlinesToSend.add(nacksToSend);
}
pendingNacks.drainTo(nacksToSend.ackIds);
logger.log(Level.FINER, "Sending {0} nacks", nacksToSend.ackIds.size());
if (!nacksToSend.ackIds.isEmpty()) {
modifyAckDeadlinesToSend.add(nacksToSend);
}

PendingModifyAckDeadline receiptsToSend =
new PendingModifyAckDeadline(getMessageDeadlineSeconds());
pendingReceipts.drainTo(receiptsToSend.ackIds);
logger.log(Level.FINER, "Sending {0} receipts", receiptsToSend.ackIds.size());
if (!receiptsToSend.ackIds.isEmpty()) {
modifyAckDeadlinesToSend.add(receiptsToSend);
}

ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public PollingSubscriberConnection(
@Override
protected void doStart() {
logger.config("Starting subscriber.");
messageDispatcher.start();
pullMessages(INITIAL_BACKOFF);
notifyStarted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public StreamingSubscriberConnection(
@Override
protected void doStart() {
logger.config("Starting subscriber.");
messageDispatcher.start();
initialize();
notifyStarted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,46 +295,52 @@ public String waitForRegistedSubscription() throws InterruptedException {

public List<String> waitAndConsumeReceivedAcks(int expectedCount) throws InterruptedException {
synchronized (acks) {
while (acks.size() < expectedCount) {
acks.wait();
}
waitAtLeast(acks, expectedCount);
List<String> receivedAcksCopy = ImmutableList.copyOf(acks.subList(0, expectedCount));
acks.removeAll(receivedAcksCopy);
acks.subList(0, expectedCount).clear();
return receivedAcksCopy;
}
}

public List<ModifyAckDeadline> waitAndConsumeModifyAckDeadlines(int expectedCount)
throws InterruptedException {
synchronized (modAckDeadlines) {
while (modAckDeadlines.size() < expectedCount) {
modAckDeadlines.wait();
}
waitAtLeast(modAckDeadlines, expectedCount);
List<ModifyAckDeadline> modAckDeadlinesCopy =
ImmutableList.copyOf(modAckDeadlines.subList(0, expectedCount));
modAckDeadlines.removeAll(modAckDeadlinesCopy);
modAckDeadlines.subList(0, expectedCount).clear();
return modAckDeadlinesCopy;
}
}

public int waitForClosedStreams(int expectedCount) throws InterruptedException {
synchronized (closedStreams) {
while (closedStreams.size() < expectedCount) {
closedStreams.wait();
}
waitAtLeast(closedStreams, expectedCount);
return closedStreams.size();
}
}

public int waitForOpenedStreams(int expectedCount) throws InterruptedException {
synchronized (openedStreams) {
while (openedStreams.size() < expectedCount) {
openedStreams.wait();
}
waitAtLeast(openedStreams, expectedCount);
return openedStreams.size();
}
}

// wait until the collection has at least target number of elements.
// caller MUST hold the monitor for the collection.
private static void waitAtLeast(Collection<?> collection, int target)
throws InterruptedException {
long untilMillis = System.currentTimeMillis() + 20_000;
while (collection.size() < target) {
long now = System.currentTimeMillis();
if (now >= untilMillis) {
throw new IllegalStateException("timed out, last state: " + collection);
}
collection.wait(untilMillis - now);
}
}

public void waitForStreamAckDeadline(int expectedValue) throws InterruptedException {
synchronized (messageAckDeadline) {
while (messageAckDeadline.get() != expectedValue) {
Expand Down
Loading

0 comments on commit dadb6ff

Please sign in to comment.