Skip to content

Commit

Permalink
Only notify ready global checkpoint listeners (elastic#33690)
Browse files Browse the repository at this point in the history
When we add a global checkpoint listener, it is also carries along with
it a value that it thinks is the current global checkpoint. This value
can be above the actual global checkpoint on a shard if the listener
knows the global checkpoint from another shard copy (e.g., the primary),
and the current shard copy is lagging behind. Today we notify the
listener whenever the global checkpoint advances, regardless if it goes
above the current global checkpoint known to the listener. This commit
reworks this implementation. Rather than thinking of the value
associated with the listener as the current global checkpoint known to
the listener, we think of it as the value that the listener is waiting
for the global checkpoint to advance to (inclusive). Now instead of
notifying all waiting listeners when the global checkpoint advances, we
only notify those that are waiting for a value not larger than the
actual global checkpoint that we advanced to.
jasontedor authored Sep 14, 2018

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 4f68104 commit 3919133
Showing 4 changed files with 186 additions and 166 deletions.
Original file line number Diff line number Diff line change
@@ -21,11 +21,13 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
@@ -34,6 +36,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
@@ -63,7 +66,7 @@ public interface GlobalCheckpointListener {

// guarded by this
private boolean closed;
private Map<GlobalCheckpointListener, ScheduledFuture<?>> listeners;
private final Map<GlobalCheckpointListener, Tuple<Long, ScheduledFuture<?>>> listeners = new LinkedHashMap<>();
private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO;

private final ShardId shardId;
@@ -91,62 +94,56 @@ public interface GlobalCheckpointListener {
}

/**
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
* listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
* checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard
* is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be
* notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for
* the timeout means no timeout will be associated to the listener.
* Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for,
* then the listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners.
* If the shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
* checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated above the
* global checkpoint the listener is waiting for, or the shard is closed. A listener must re-register after one of these events to
* receive subsequent events. Callers may add a timeout to be notified after if the timeout elapses. In this case, the listener will be
* notified with a {@link TimeoutException}. Passing null fo the timeout means no timeout will be associated to the listener.
*
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener
* @param timeout the listener timeout, or null if no timeout
* @param waitingForGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener
* @param timeout the listener timeout, or null if no timeout
*/
synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) {
synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) {
if (closed) {
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)));
return;
}
if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) {
if (lastKnownGlobalCheckpoint >= waitingForGlobalCheckpoint) {
// notify directly
executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null));
} else {
if (listeners == null) {
listeners = new LinkedHashMap<>();
}
if (timeout == null) {
listeners.put(listener, null);
listeners.put(listener, Tuple.tuple(waitingForGlobalCheckpoint, null));
} else {
listeners.put(
listener,
scheduler.schedule(
() -> {
final boolean removed;
synchronized (this) {
/*
* Note that the listeners map can be null if a notification nulled out the map reference when
* notifying listeners, and then our scheduled execution occurred before we could be cancelled by
* the notification. In this case, we would have blocked waiting for access to this critical
* section.
*
* What is more, we know that this listener has a timeout associated with it (otherwise we would
* not be here) so the return value from remove being null is an indication that we are not in the
* map. This can happen if a notification nulled out the listeners, and then our scheduled execution
* occurred before we could be cancelled by the notification, and then another thread added a
* listener causing the listeners map reference to be non-null again. In this case, our listener
* here would not be in the map and we should not fire the timeout logic.
*/
removed = listeners != null && listeners.remove(listener) != null;
}
if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep());
logger.trace("global checkpoint listener timed out", e);
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e));
}
},
timeout.nanos(),
TimeUnit.NANOSECONDS));
Tuple.tuple(
waitingForGlobalCheckpoint,
scheduler.schedule(
() -> {
final boolean removed;
synchronized (this) {
/*
* We know that this listener has a timeout associated with it (otherwise we would not be
* here) so the future component of the return value from remove being null is an indication
* that we are not in the map. This can happen if a notification collected us into listeners
* to be notified and removed us from the map, and then our scheduled execution occurred
* before we could be cancelled by the notification. In this case, our listener here would
* not be in the map and we should not fire the timeout logic.
*/
removed = listeners.remove(listener).v2() != null;
}
if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep());
logger.trace("global checkpoint listener timed out", e);
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e));
}
},
timeout.nanos(),
TimeUnit.NANOSECONDS)));
}
}
}
@@ -163,7 +160,7 @@ public synchronized void close() throws IOException {
* @return the number of listeners pending notification
*/
synchronized int pendingListeners() {
return listeners == null ? 0 : listeners.size();
return listeners.size();
}

/**
@@ -173,7 +170,7 @@ synchronized int pendingListeners() {
* @return a scheduled future representing the timeout future for the listener, otherwise null
*/
synchronized ScheduledFuture<?> getTimeoutFuture(final GlobalCheckpointListener listener) {
return listeners.get(listener);
return listeners.get(listener).v2();
}

/**
@@ -193,22 +190,31 @@ synchronized void globalCheckpointUpdated(final long globalCheckpoint) {
private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) {
assert Thread.holdsLock(this);
assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null);
if (listeners != null) {
// capture the current listeners
final Map<GlobalCheckpointListener, ScheduledFuture<?>> currentListeners = listeners;
listeners = null;
if (currentListeners != null) {
executor.execute(() -> {
for (final Map.Entry<GlobalCheckpointListener, ScheduledFuture<?>> listener : currentListeners.entrySet()) {
/*
* We do not want to interrupt any timeouts that fired, these will detect that the listener has been notified and
* not trigger the timeout.
*/
FutureUtils.cancel(listener.getValue());
notifyListener(listener.getKey(), globalCheckpoint, e);
}
});
}

final Map<GlobalCheckpointListener, Tuple<Long, ScheduledFuture<?>>> listenersToNotify;
if (globalCheckpoint != UNASSIGNED_SEQ_NO) {
listenersToNotify =
listeners
.entrySet()
.stream()
.filter(entry -> entry.getValue().v1() <= globalCheckpoint)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
listenersToNotify.keySet().forEach(listeners::remove);
} else {
listenersToNotify = new HashMap<>(listeners);
listeners.clear();
}
if (listenersToNotify.isEmpty() == false) {
executor.execute(() ->
listenersToNotify
.forEach((listener, t) -> {
/*
* We do not want to interrupt any timeouts that fired, these will detect that the listener has been
* notified and not trigger the timeout.
*/
FutureUtils.cancel(t.v2());
notifyListener(listener, globalCheckpoint, e);
}));
}
}

Original file line number Diff line number Diff line change
@@ -1781,19 +1781,20 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long
}

/**
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
* listener will fire immediately on the calling thread. If the specified timeout elapses before the listener is notified, the listener
* will be notified with an {@link TimeoutException}. A caller may pass null to specify no timeout.
* Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for,
* then the listener will be notified immediately via an executor (so possibly not on the current thread). If the specified timeout
* elapses before the listener is notified, the listener will be notified with an {@link TimeoutException}. A caller may pass null to
* specify no timeout.
*
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener
* @param timeout the timeout
* @param waitingForGlobalCheckpoint the global checkpoint the listener is waiting for
* @param listener the listener
* @param timeout the timeout
*/
public void addGlobalCheckpointListener(
final long currentGlobalCheckpoint,
final long waitingForGlobalCheckpoint,
final GlobalCheckpointListeners.GlobalCheckpointListener listener,
final TimeValue timeout) {
this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener, timeout);
this.globalCheckpointListeners.add(waitingForGlobalCheckpoint, listener, timeout);
}

/**
Loading

0 comments on commit 3919133

Please sign in to comment.