Skip to content

Commit

Permalink
Refactor renewer tests and wake up renewer only when needed
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Jun 1, 2016
1 parent 01dbb40 commit d4b8f6c
Show file tree
Hide file tree
Showing 2 changed files with 248 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -41,17 +41,20 @@
*/
class AckDeadlineRenewer implements AutoCloseable {

private static final int MIN_DEADLINE_MILLISECONDS = 10_000;
private static final int RENEW_THRESHOLD_MILLISECONDS = 2_000;
private static final int MIN_DEADLINE_MILLIS = 9_000;
private static final int RENEW_THRESHOLD_MILLIS = 3_000;
private static final int NEXT_RENEWAL_THRESHOLD_MILLIS = 1_000;

private final PubSub pubsub;
private final ScheduledExecutorService executor;
private final ExecutorFactory executorFactory;
private final Clock clock;
private final Queue<Message> messageQueue;
private final Map<MessageId, Long> messageDeadlines;
private final ScheduledFuture<?> renewerFuture;
private final Object lock = new Object();
private final Object futureLock = new Object();
private Future<?> renewerFuture;
private boolean closed;

/**
* This class holds the identity of a message to renew: subscription and acknowledge id.
Expand Down Expand Up @@ -157,19 +160,38 @@ public String toString() {
this.clock = options.clock();
this.messageQueue = new LinkedList<>();
this.messageDeadlines = new HashMap<>();
this.renewerFuture = this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
renewAckDeadlines();
}

private void unsetAndScheduleNextRenewal() {
synchronized (futureLock) {
renewerFuture = null;
scheduleNextRenewal();
}
}

private void scheduleNextRenewal() {
// Schedules next renewal if there are still messages to process and no renewals scheduled that
// could handle them, otherwise does nothing
synchronized (futureLock) {
Message nextMessage = messageQueue.peek();

This comment has been minimized.

Copy link
@aozarov

aozarov Jun 1, 2016

Contributor

peek is not synchronized using the same lock that can modify messageQueue.

If you decide to apply both locks, may sure that they are always acquired in the same order.

Nit, that nextMessage may be one that was actually removed messageDeadlines.get(messageId) == null) but fixing that is merely optimization.

This comment has been minimized.

Copy link
@mziccard

mziccard Jun 1, 2016

Author Contributor

You are right, fixed in 501ba7d (no need for double locking).

if (renewerFuture == null && nextMessage != null) {
long delay =
(nextMessage.expectedDeadline() - clock.millis()) - NEXT_RENEWAL_THRESHOLD_MILLIS;
renewerFuture = executor.schedule(new Runnable() {
@Override
public void run() {
renewAckDeadlines();
}
}, delay, TimeUnit.MILLISECONDS);
}
}, 0, 1, TimeUnit.SECONDS);
}
}

private void renewAckDeadlines() {
ListMultimap<String, String> messagesToRenewNext = LinkedListMultimap.create();
// At every activation we renew all ack deadlines that will expier in the following
// RENEW_THRESHOLD_MILLISECONDS
long threshold = clock.millis() + RENEW_THRESHOLD_MILLISECONDS;
// At every activation we renew all ack deadlines that will expire in the following
// RENEW_THRESHOLD_MILLIS
long threshold = clock.millis() + RENEW_THRESHOLD_MILLIS;
Message message;
while ((message = nextMessageToRenew(threshold)) != null) {
// If the expected deadline is null the message was removed and we must ignore it, otherwise
Expand All @@ -180,9 +202,10 @@ private void renewAckDeadlines() {
}
for (Map.Entry<String, List<String>> entry : Multimaps.asMap(messagesToRenewNext).entrySet()) {
// We send all ack deadline renewals for a subscription
pubsub.modifyAckDeadlineAsync(entry.getKey(), MIN_DEADLINE_MILLISECONDS,
pubsub.modifyAckDeadlineAsync(entry.getKey(), MIN_DEADLINE_MILLIS,
TimeUnit.MILLISECONDS, entry.getValue());
}
unsetAndScheduleNextRenewal();
}

private Message nextMessageToRenew(long threshold) {
Expand Down Expand Up @@ -211,39 +234,41 @@ private Message nextMessageToRenew(long threshold) {
/**
* Adds a new message for which the acknowledge deadline should be automatically renewed. The
* message is identified by the subscription from which it was pulled and its acknowledge id.
* Auto-renewal will take place until the message is removed (see {@link #remove(String, String)}
* or {@link #remove(String, Iterable)}).
* Auto-renewal will take place until the message is removed (see
* {@link #remove(String, String)}).
*
* @param subscription the subscription from which the message has been pulled
* @param ackId the message's acknowledge id
*/
void add(String subscription, String ackId) {
synchronized (lock) {
long deadline = clock.millis() + MIN_DEADLINE_MILLISECONDS;
long deadline = clock.millis() + MIN_DEADLINE_MILLIS;
Message message = new Message(new MessageId(subscription, ackId), deadline);
messageQueue.add(message);
messageDeadlines.put(message.messageId(), deadline);
}
scheduleNextRenewal();
}

/**
* Adds new messages for which the acknowledge deadlined should be automatically renewed. The
* messages are identified by the subscription from which they were pulled and their
* acknowledge id. Auto-renewal will take place until the messages are removed (see
* {@link #remove(String, String)} or {@link #remove(String, Iterable)}).
* {@link #remove(String, String)}).
*
* @param subscription the subscription from which the messages have been pulled
* @param ackIds the acknowledge ids of the messages
*/
void add(String subscription, Iterable<String> ackIds) {
synchronized (lock) {
long deadline = clock.millis() + MIN_DEADLINE_MILLISECONDS;
long deadline = clock.millis() + MIN_DEADLINE_MILLIS;
for (String ackId : ackIds) {
Message message = new Message(new MessageId(subscription, ackId), deadline);
messageQueue.add(message);
messageDeadlines.put(message.messageId(), deadline);
}
}
scheduleNextRenewal();
}

/**
Expand All @@ -262,7 +287,19 @@ void remove(String subscription, String ackId) {

@Override
public void close() throws Exception {
renewerFuture.cancel(false);
if (closed) {
return;
}
closed = true;
synchronized (lock) {
messageDeadlines.clear();
messageQueue.clear();
}
synchronized (futureLock) {
if (renewerFuture != null) {
renewerFuture.cancel(true);
}
}
executorFactory.release(executor);
}
}
Loading

0 comments on commit d4b8f6c

Please sign in to comment.