Skip to content

Commit

Permalink
Immediately notify listener on close
Browse files Browse the repository at this point in the history
  • Loading branch information
jasontedor committed Aug 13, 2018
1 parent dc5d326 commit ea3c198
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,16 @@ public interface GlobalCheckpointListener {

/**
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
* listener will be asynchronously fired on the executor used to construct this collection of global checkpoint listeners.
* listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
* checkpoint listeners.
*
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener
*/
synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) {
if (closed) {
throw new IllegalStateException("can not listen for global checkpoint changes on a closed shard [" + shardId + "]");
executor.execute(() -> listener.accept(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)));
}
if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) {
// notify directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -206,15 +207,24 @@ public void testClose() throws IOException {
}
}

public void testAddAfterClose() throws IOException {
public void testAddAfterClose() throws InterruptedException, IOException {
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger);
globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED);
globalCheckpointListeners.close();
final IllegalStateException expected =
expectThrows(IllegalStateException.class, () -> globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> {}));
assertThat(
expected,
hasToString(containsString("can not listen for global checkpoint changes on a closed shard [" + shardId + "]")));
final AtomicBoolean invoked = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
final GlobalCheckpointListeners.GlobalCheckpointListener listener = (g, e) -> {
assert g == UNASSIGNED_SEQ_NO;
assert e != null;
if (invoked.compareAndSet(false, true) == false) {
latch.countDown();
throw new IllegalStateException("listener invoked twice");
}
latch.countDown();
};
globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE), listener);
latch.await();
assertTrue(invoked.get());
}

public void testFailingListenerOnUpdate() {
Expand Down

0 comments on commit ea3c198

Please sign in to comment.