From ea3c198ca0b66981248df597074f39f3f7719e59 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 13 Aug 2018 08:12:30 -0400 Subject: [PATCH] Immediately notify listener on close --- .../shard/GlobalCheckpointListeners.java | 6 +++-- .../shard/GlobalCheckpointListenersTests.java | 22 ++++++++++++++----- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index d8f7e4cbdab37..ee30b9d85056c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -81,14 +81,16 @@ public interface GlobalCheckpointListener { /** * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the - * listener will be asynchronously fired on the executor used to construct this collection of global checkpoint listeners. + * listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the + * shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global + * checkpoint listeners. * * @param currentGlobalCheckpoint the current global checkpoint known to the listener * @param listener the listener */ synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) { if (closed) { - throw new IllegalStateException("can not listen for global checkpoint changes on a closed shard [" + shardId + "]"); + executor.execute(() -> listener.accept(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId))); } if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) { // notify directly diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index 51459209009fa..b64f6c2293e6c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -206,15 +207,24 @@ public void testClose() throws IOException { } } - public void testAddAfterClose() throws IOException { + public void testAddAfterClose() throws InterruptedException, IOException { final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); globalCheckpointListeners.close(); - final IllegalStateException expected = - expectThrows(IllegalStateException.class, () -> globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> {})); - assertThat( - expected, - hasToString(containsString("can not listen for global checkpoint changes on a closed shard [" + shardId + "]"))); + final AtomicBoolean invoked = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(1); + final GlobalCheckpointListeners.GlobalCheckpointListener listener = (g, e) -> { + assert g == UNASSIGNED_SEQ_NO; + assert e != null; + if (invoked.compareAndSet(false, true) == false) { + latch.countDown(); + throw new IllegalStateException("listener invoked twice"); + } + latch.countDown(); + }; + globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE), listener); + latch.await(); + assertTrue(invoked.get()); } public void testFailingListenerOnUpdate() {