diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index 06f11ff4e43db..d9240602d8519 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -349,10 +349,12 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get()); + // we are going to synchronize the actions of three threads: the updating thread, the listener thread, and the main test thread final CyclicBarrier barrier = new CyclicBarrier(3); final int numberOfIterations = randomIntBetween(1, 1024); final AtomicBoolean closed = new AtomicBoolean(); final Thread updatingThread = new Thread(() -> { + // synchronize starting with the listener thread and the main test thread awaitQuietly(barrier); for (int i = 0; i < numberOfIterations; i++) { if (rarely() && closed.get() == false) { @@ -367,11 +369,13 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet()); } } + // synchronize ending with the listener thread and the main test thread awaitQuietly(barrier); }); final List invocations = new CopyOnWriteArrayList<>(); final Thread listenersThread = new Thread(() -> { + // synchronize starting with the updating thread and the main test thread awaitQuietly(barrier); for (int i = 0; i < numberOfIterations; i++) { final AtomicBoolean invocation = new AtomicBoolean(); @@ -385,11 +389,14 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio } }); } + // synchronize ending with the updating thread and the main test thread awaitQuietly(barrier); }); updatingThread.start(); listenersThread.start(); + // synchronize starting with the updating thread and the listener thread barrier.await(); + // synchronize ending with the updating thread and the listener thread barrier.await(); // one last update to ensure all listeners are notified if (closed.get() == false) {