Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: send message receipts #2580

Merged
merged 6 commits into from
Nov 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -73,8 +72,9 @@ class MessageDispatcher {
private final MessageWaiter messagesWaiter;

private final PriorityQueue<ExtensionJob> outstandingAckHandlers;
private final Set<String> pendingAcks;
private final Set<String> pendingNacks;
private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<String> pendingReceipts = new LinkedBlockingQueue<>();

private final Lock alarmsLock;
// The deadline should be set by the subscriber connection before use,
Expand Down Expand Up @@ -185,47 +185,41 @@ private class AckHandler implements FutureCallback<AckReply> {
receivedTimeMillis = clock.millisTime();
}

private void onBoth(LinkedBlockingQueue<String> destination) {
acked.getAndSet(true);
destination.add(ackId);
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
}

@Override
public void onFailure(Throwable t) {
logger.log(
Level.WARNING,
"MessageReceiver failed to processes ack ID: " + ackId + ", the message will be nacked.",
t);
acked.getAndSet(true);
synchronized (pendingNacks) {
pendingNacks.add(ackId);
}
setupPendingAcksAlarm();
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
onBoth(pendingNacks);
}

@Override
public void onSuccess(AckReply reply) {
acked.getAndSet(true);
LinkedBlockingQueue<String> destination;
switch (reply) {
case ACK:
synchronized (pendingAcks) {
pendingAcks.add(ackId);
}
destination = pendingAcks;
// Record the latency rounded to the next closest integer.
ackLatencyDistribution.record(

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
break;
case NACK:
synchronized (pendingNacks) {
pendingNacks.add(ackId);
}
destination = pendingNacks;
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
}
setupPendingAcksAlarm();
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
onBoth(destination);
}
}

Expand Down Expand Up @@ -254,8 +248,6 @@ void sendAckOperations(
this.flowController = flowController;
this.outstandingMessageBatches = outstandingMessageBatches;
outstandingAckHandlers = new PriorityQueue<>();
pendingAcks = new HashSet<>();
pendingNacks = new HashSet<>();
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
this.ackLatencyDistribution = ackLatencyDistribution;
alarmsLock = new ReentrantLock();
Expand All @@ -264,6 +256,25 @@ void sendAckOperations(
this.clock = clock;
}

public void start() {
pendingAcksAlarm =
systemExecutor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
try {
processOutstandingAckOperations();
} catch (Throwable t) {
// Catch everything so that one run failing doesn't prevent subsequent runs.
logger.log(Level.WARNING, "failed to send acks/nacks", t);
}
}
},
PENDING_ACKS_SEND_DELAY.toMillis(),
PENDING_ACKS_SEND_DELAY.toMillis(),
TimeUnit.MILLISECONDS);
}

public void stop() {
messagesWaiter.waitNoMessages();
alarmsLock.lock();
Expand All @@ -272,6 +283,10 @@ public void stop() {
ackDeadlineExtensionAlarm.cancel(true);
ackDeadlineExtensionAlarm = null;
}
if (pendingAcksAlarm != null) {
pendingAcksAlarm.cancel(false);
pendingAcksAlarm = null;
}
} finally {
alarmsLock.unlock();
}
Expand Down Expand Up @@ -328,6 +343,9 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
return;
}
messagesWaiter.incrementPendingMessages(messages.size());
for (ReceivedMessage message : messages) {
pendingReceipts.add(message.getAckId());
}

OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(messages.size());
Expand Down Expand Up @@ -421,32 +439,6 @@ public void run() {
}
}

private void setupPendingAcksAlarm() {
alarmsLock.lock();
try {
if (pendingAcksAlarm == null) {
pendingAcksAlarm =
systemExecutor.schedule(
new Runnable() {
@Override
public void run() {
alarmsLock.lock();
try {
pendingAcksAlarm = null;
} finally {
alarmsLock.unlock();
}
processOutstandingAckOperations();
}
},
PENDING_ACKS_SEND_DELAY.toMillis(),
TimeUnit.MILLISECONDS);
}
} finally {
alarmsLock.unlock();
}
}

private class AckDeadlineAlarm implements Runnable {
@Override
public void run() {
Expand Down Expand Up @@ -574,31 +566,26 @@ private void processOutstandingAckOperations(
List<PendingModifyAckDeadline> ackDeadlineExtensions) {
List<PendingModifyAckDeadline> modifyAckDeadlinesToSend =
Lists.newArrayList(ackDeadlineExtensions);
List<String> acksToSend = new ArrayList<>(pendingAcks.size());
synchronized (pendingAcks) {
if (!pendingAcks.isEmpty()) {
try {
acksToSend = new ArrayList<>(pendingAcks);
logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());
} finally {
pendingAcks.clear();
}
}
}

List<String> acksToSend = new ArrayList<>();
pendingAcks.drainTo(acksToSend);
logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());

PendingModifyAckDeadline nacksToSend = new PendingModifyAckDeadline(0);
synchronized (pendingNacks) {
if (!pendingNacks.isEmpty()) {
try {
for (String ackId : pendingNacks) {
nacksToSend.addAckId(ackId);
}
logger.log(Level.FINER, "Sending {0} nacks", pendingNacks.size());
} finally {
pendingNacks.clear();
}
modifyAckDeadlinesToSend.add(nacksToSend);
}
pendingNacks.drainTo(nacksToSend.ackIds);
logger.log(Level.FINER, "Sending {0} nacks", nacksToSend.ackIds.size());
if (!nacksToSend.ackIds.isEmpty()) {
modifyAckDeadlinesToSend.add(nacksToSend);
}

PendingModifyAckDeadline receiptsToSend =
new PendingModifyAckDeadline(getMessageDeadlineSeconds());
pendingReceipts.drainTo(receiptsToSend.ackIds);
logger.log(Level.FINER, "Sending {0} receipts", receiptsToSend.ackIds.size());
if (!receiptsToSend.ackIds.isEmpty()) {
modifyAckDeadlinesToSend.add(receiptsToSend);
}

ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public PollingSubscriberConnection(
@Override
protected void doStart() {
logger.config("Starting subscriber.");
messageDispatcher.start();
pullMessages(INITIAL_BACKOFF);
notifyStarted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public StreamingSubscriberConnection(
@Override
protected void doStart() {
logger.config("Starting subscriber.");
messageDispatcher.start();
initialize();
notifyStarted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,46 +295,52 @@ public String waitForRegistedSubscription() throws InterruptedException {

public List<String> waitAndConsumeReceivedAcks(int expectedCount) throws InterruptedException {
synchronized (acks) {
while (acks.size() < expectedCount) {
acks.wait();
}
waitAtLeast(acks, expectedCount);
List<String> receivedAcksCopy = ImmutableList.copyOf(acks.subList(0, expectedCount));
acks.removeAll(receivedAcksCopy);
acks.subList(0, expectedCount).clear();
return receivedAcksCopy;
}
}

public List<ModifyAckDeadline> waitAndConsumeModifyAckDeadlines(int expectedCount)
throws InterruptedException {
synchronized (modAckDeadlines) {
while (modAckDeadlines.size() < expectedCount) {
modAckDeadlines.wait();
}
waitAtLeast(modAckDeadlines, expectedCount);
List<ModifyAckDeadline> modAckDeadlinesCopy =
ImmutableList.copyOf(modAckDeadlines.subList(0, expectedCount));
modAckDeadlines.removeAll(modAckDeadlinesCopy);
modAckDeadlines.subList(0, expectedCount).clear();
return modAckDeadlinesCopy;
}
}

public int waitForClosedStreams(int expectedCount) throws InterruptedException {
synchronized (closedStreams) {
while (closedStreams.size() < expectedCount) {
closedStreams.wait();
}
waitAtLeast(closedStreams, expectedCount);
return closedStreams.size();
}
}

public int waitForOpenedStreams(int expectedCount) throws InterruptedException {
synchronized (openedStreams) {
while (openedStreams.size() < expectedCount) {
openedStreams.wait();
}
waitAtLeast(openedStreams, expectedCount);
return openedStreams.size();
}
}

// wait until the collection has at least target number of elements.
// caller MUST hold the monitor for the collection.
private static void waitAtLeast(Collection<?> collection, int target)
throws InterruptedException {
long untilMillis = System.currentTimeMillis() + 20_000;
while (collection.size() < target) {
long now = System.currentTimeMillis();
if (now >= untilMillis) {
throw new IllegalStateException("timed out, last state: " + collection);
}
collection.wait(untilMillis - now);
}
}

public void waitForStreamAckDeadline(int expectedValue) throws InterruptedException {
synchronized (messageAckDeadline) {
while (messageAckDeadline.get() != expectedValue) {
Expand Down
Loading