From 18f3f52d72a53841a372dfb12912b2608abdb464 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 26 Sep 2019 14:36:16 -0400 Subject: [PATCH 1/4] Use only engineMutex to protect engine reference --- .../elasticsearch/index/shard/IndexShard.java | 98 +++++++------------ .../index/shard/IndexShardTests.java | 93 +++++++++++++----- .../org/elasticsearch/test/ESTestCase.java | 13 ++- .../test/store/MockFSDirectoryFactory.java | 12 ++- 4 files changed, 119 insertions(+), 97 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index fa452270fab6b..8cd1858a995fe 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1190,11 +1190,9 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { synchronized (engineMutex) { // if the engine is not running, we can access the store directly, but we need to make sure no one starts // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. - synchronized (mutex) { - final Engine engine = getEngineOrNull(); - if (engine != null) { - indexCommit = engine.acquireLastIndexCommit(false); - } + final Engine engine = getEngineOrNull(); + if (engine != null) { + indexCommit = engine.acquireLastIndexCommit(false); } if (indexCommit == null) { return store.getMetadata(null, true); @@ -1309,20 +1307,22 @@ public CacheHelper getReaderCacheHelper() { } public void close(String reason, boolean flushEngine) throws IOException { - synchronized (mutex) { - try { - changeState(IndexShardState.CLOSED, reason); - } finally { - final Engine engine = this.currentEngineReference.getAndSet(null); + synchronized (engineMutex) { + synchronized (mutex) { try { - if (engine != null && flushEngine) { - engine.flushAndClose(); - } + changeState(IndexShardState.CLOSED, reason); } finally { - // playing safe here and close the engine even if the above succeeds - close can be called multiple times - // Also closing refreshListeners to prevent us from accumulating any more listeners - IOUtils.close(engine, globalCheckpointListeners, refreshListeners); - indexShardOperationPermits.close(); + final Engine engine = this.currentEngineReference.getAndSet(null); + try { + if (engine != null && flushEngine) { + engine.flushAndClose(); + } + } finally { + // playing safe here and close the engine even if the above succeeds - close can be called multiple times + // Also closing refreshListeners to prevent us from accumulating any more listeners + IOUtils.close(engine, globalCheckpointListeners, refreshListeners); + indexShardOperationPermits.close(); + } } } } @@ -1417,7 +1417,7 @@ public long recoverLocallyUpToGlobalCheckpoint() { getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint); logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint)); } finally { - synchronized (mutex) { + synchronized (engineMutex) { IOUtils.close(currentEngineReference.getAndSet(null)); } } @@ -1592,23 +1592,15 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t : "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource() + "] but got " + getRetentionLeases(); synchronized (engineMutex) { + assert currentEngineReference.get() == null : "engine is running"; + verifyNotClosed(); // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = engineFactory.newReadWriteEngine(config); - synchronized (mutex) { - try { - verifyNotClosed(); - assert currentEngineReference.get() == null : "engine is running"; - onNewEngine(newEngine); - currentEngineReference.set(newEngine); - // We set active because we are now writing operations to the engine; this way, - // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. - active.set(true); - } finally { - if (currentEngineReference.get() != newEngine) { - newEngine.close(); - } - } - } + onNewEngine(newEngine); + currentEngineReference.set(newEngine); + // We set active because we are now writing operations to the engine; this way, + // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. + active.set(true); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. @@ -1639,7 +1631,7 @@ private void onNewEngine(Engine newEngine) { * called if recovery has to be restarted after network error / delay ** */ public void performRecoveryRestart() throws IOException { - synchronized (mutex) { + synchronized (engineMutex) { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; IOUtils.close(currentEngineReference.getAndSet(null)); resetRecoveryStage(); @@ -3316,7 +3308,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. */ void resetEngineToGlobalCheckpoint() throws IOException { - assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex"; + assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert getActiveOperationsCount() == OPERATIONS_BLOCKED : "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']'; sync(); // persist the global checkpoint to disk @@ -3329,6 +3321,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED final long globalCheckpoint = getLastKnownGlobalCheckpoint(); assert globalCheckpoint == getLastSyncedGlobalCheckpoint(); synchronized (engineMutex) { + verifyNotClosed(); // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, // acquireXXXCommit and close works. final Engine readOnlyEngine = @@ -3336,9 +3329,6 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { synchronized (engineMutex) { - if (newEngineReference.get() == null) { - throw new AlreadyClosedException("engine was closed"); - } // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay return newEngineReference.get().acquireLastIndexCommit(false); } @@ -3347,16 +3337,13 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { @Override public IndexCommitRef acquireSafeIndexCommit() { synchronized (engineMutex) { - if (newEngineReference.get() == null) { - throw new AlreadyClosedException("engine was closed"); - } return newEngineReference.get().acquireSafeIndexCommit(); } } @Override public void close() throws IOException { - assert Thread.holdsLock(mutex); + assert Thread.holdsLock(engineMutex); Engine newEngine = newEngineReference.get(); if (newEngine == currentEngineReference.get()) { @@ -3366,35 +3353,16 @@ public void close() throws IOException { IOUtils.close(super::close, newEngine); } }; - synchronized (mutex) { - try { - verifyNotClosed(); - IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); - } finally { - if (currentEngineReference.get() != readOnlyEngine) { - readOnlyEngine.close(); - } - } - } - final Engine newReadWriteEngine = engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)); - synchronized (mutex) { - try { - verifyNotClosed(); - newEngineReference.set(newReadWriteEngine); - onNewEngine(newReadWriteEngine); - } finally { - if (newEngineReference.get() != newReadWriteEngine) { - newReadWriteEngine.close(); // shard was closed - } - } - } + IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); + onNewEngine(newEngineReference.get()); } final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { // TODO: add a dedicate recovery stats for the reset translog }); newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint); - synchronized (mutex) { + synchronized (engineMutex) { verifyNotClosed(); IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get())); // We set active because we are now writing operations to the engine; this way, diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 90ec2ece394e9..7778faaffb024 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3818,18 +3818,42 @@ public void testResetEngine() throws Exception { public void testCloseShardWhileResettingEngine() throws Exception { CountDownLatch readyToCloseLatch = new CountDownLatch(1); CountDownLatch closeDoneLatch = new CountDownLatch(1); - IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { - @Override - public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, - long recoverUpToSeqNo) throws IOException { + AtomicBoolean resetWasStarted = new AtomicBoolean(); + IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> { + if (resetWasStarted.get() && randomBoolean()) { readyToCloseLatch.countDown(); - try { - closeDoneLatch.await(); - } catch (InterruptedException e) { - throw new AssertionError(e); + } + final InternalEngine engine = new InternalEngine(config) { + @Override + public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, + long recoverUpToSeqNo) throws IOException { + readyToCloseLatch.countDown(); + try { + if (randomBoolean()) { + closeDoneLatch.await(); + } + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); + } + }; + if (resetWasStarted.get() && randomBoolean()) { + readyToCloseLatch.countDown(); + int numDocs = randomIntBetween(0, 100); + for (int i = 0; i < numDocs; i++) { + try { + ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(i), null); + engine.index(new Engine.Index(EngineTestCase.newUid(doc), config.getPrimaryTermSupplier().getAsLong(), doc)); + if (rarely()) { + engine.syncTranslog(); + } + } catch (IOException e) { + throw new AssertionError(e); + } } - return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); } + return engine; }); Thread closeShardThread = new Thread(() -> { @@ -3851,6 +3875,7 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), shard.getLastKnownGlobalCheckpoint(), 0L, ActionListener.wrap(r -> { try (r) { + resetWasStarted.set(true); shard.resetEngineToGlobalCheckpoint(); } finally { engineResetLatch.countDown(); @@ -3872,19 +3897,29 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover public void testSnapshotWhileResettingEngine() throws Exception { CountDownLatch readyToSnapshotLatch = new CountDownLatch(1); CountDownLatch snapshotDoneLatch = new CountDownLatch(1); - IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { - @Override - public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, - long recoverUpToSeqNo) throws IOException { - InternalEngine internalEngine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); + AtomicBoolean resetWasStarted = new AtomicBoolean(); + IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> { + if (resetWasStarted.get() && randomBoolean()) { readyToSnapshotLatch.countDown(); - try { - snapshotDoneLatch.await(); - } catch (InterruptedException e) { - throw new AssertionError(e); + } + final InternalEngine engine = new InternalEngine(config) { + @Override + public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, + long recoverUpToSeqNo) throws IOException { + InternalEngine internalEngine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); + readyToSnapshotLatch.countDown(); + try { + snapshotDoneLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return internalEngine; } - return internalEngine; + }; + if (resetWasStarted.get() && randomBoolean()) { + readyToSnapshotLatch.countDown(); } + return engine; }); indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint())); @@ -3914,6 +3949,7 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), shard.getLastKnownGlobalCheckpoint(), 0L, ActionListener.wrap(r -> { try (r) { + resetWasStarted.set(true); shard.resetEngineToGlobalCheckpoint(); } finally { engineResetLatch.countDown(); @@ -4126,7 +4162,7 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { closeShards(readonlyShard); } - public void testCloseShardWhileEngineIsWarming() throws Exception { + public void testEngineWarmingDoesNotBlockClusterUpdate() throws Exception { CountDownLatch warmerStarted = new CountDownLatch(1); CountDownLatch warmerBlocking = new CountDownLatch(1); IndexShard shard = newShard(true, Settings.EMPTY, config -> { @@ -4148,16 +4184,25 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); return new InternalEngine(configWithWarmer); }); - Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard))); + Thread recoveryThread = new Thread(() -> { + try { + recoverShardFromStore(shard); + } catch (IOException e) { + throw new AssertionError(e); + } + }); recoveryThread.start(); try { warmerStarted.await(); - shard.close("testing", false); - assertThat(shard.state, equalTo(IndexShardState.CLOSED)); + assertThat(shard.state, equalTo(IndexShardState.RECOVERING)); + shard.updateShardState(shard.routingEntry(), shard.getOperationPrimaryTerm(), (indexShard, listener) -> {}, + 0L, Collections.singleton(shard.routingEntry().allocationId().getId()), + new IndexShardRoutingTable.Builder(shard.routingEntry().shardId()).addShard(shard.routingEntry()).build()); } finally { warmerBlocking.countDown(); } recoveryThread.join(); - shard.store().close(); + assertThat(shard.state, equalTo(IndexShardState.STARTED)); + closeShards(shard); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 5a50abd3a6fc9..feb8f9e4cde79 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -133,6 +133,7 @@ import java.util.Random; import java.util.Set; import java.util.TimeZone; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -511,15 +512,21 @@ public final void ensureAllSearchContextsReleased() throws Exception { // TODO: can we do this cleaner??? /** MockFSDirectoryService sets this: */ - public static boolean checkIndexFailed; + public static final List checkIndexFailures = new CopyOnWriteArrayList<>(); @Before public final void resetCheckIndexStatus() throws Exception { - checkIndexFailed = false; + checkIndexFailures.clear(); } public final void ensureCheckIndexPassed() { - assertFalse("at least one shard failed CheckIndex", checkIndexFailed); + if (checkIndexFailures.isEmpty() == false) { + final AssertionError e = new AssertionError("at least one shard failed CheckIndex"); + for (Exception failure : checkIndexFailures) { + e.addSuppressed(failure); + } + throw e; + } } // ----------------------------------------------------------------- diff --git a/test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryFactory.java b/test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryFactory.java index 58e881b296a7d..89fc8877fcf28 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryFactory.java @@ -83,17 +83,19 @@ public static void checkIndex(Logger logger, Store store, ShardId shardId) { CheckIndex.Status status = store.checkIndex(out); out.flush(); if (!status.clean) { - ESTestCase.checkIndexFailed = true; - logger.warn("check index [failure] index files={}\n{}", Arrays.toString(dir.listAll()), os.bytes().utf8ToString()); - throw new IOException("index check failure"); + IOException failure = new IOException("failed to check index for shard " + shardId + + ";index files [" + Arrays.toString(dir.listAll()) + "] os [" + os.bytes().utf8ToString() + "]"); + ESTestCase.checkIndexFailures.add(failure); + throw failure; } else { if (logger.isDebugEnabled()) { logger.debug("check index [success]\n{}", os.bytes().utf8ToString()); } } } catch (LockObtainFailedException e) { - ESTestCase.checkIndexFailed = true; - throw new IllegalStateException("IndexWriter is still open on shard " + shardId, e); + IllegalStateException failure = new IllegalStateException("IndexWriter is still open on shard " + shardId, e); + ESTestCase.checkIndexFailures.add(failure); + throw failure; } } catch (Exception e) { logger.warn("failed to check index", e); From a56d9ff2a6cfac17a728bc0bc4d6f32f2d6aaf90 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 1 Nov 2019 11:57:58 -0400 Subject: [PATCH 2/4] add tests --- .../index/shard/IndexShardTests.java | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index d818bd49aa083..1c14d43e0e4a1 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -150,6 +150,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Phaser; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -3925,6 +3926,78 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover closeShard(shard, false); } + /** + * Verifies that after closing shard is returned, we should have released the engine, and won't open a new engine. + */ + public void testCloseShardWhileOpeningEngineDuringRecovery() throws Exception { + CountDownLatch readyToCloseLatch = new CountDownLatch(1); + CountDownLatch closeDoneLatch = new CountDownLatch(1); + IndexShard shard = newShard(false, Settings.EMPTY, config -> { + InternalEngine engine = new InternalEngine(config); + readyToCloseLatch.countDown(); + try { + closeDoneLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return engine; + }); + + Thread closeShardThread = new Thread(() -> { + try { + readyToCloseLatch.await(); + shard.close("testing", false); + // in integration tests, this is done as a listener on IndexService. + MockFSDirectoryFactory.checkIndex(logger, shard.store(), shard.shardId); + } catch (InterruptedException | IOException e) { + throw new AssertionError(e); + } finally { + closeDoneLatch.countDown(); + } + }); + closeShardThread.start(); + recoveryEmptyReplica(shard, true); + closeShardThread.join(); + closeShard(shard, false); + } + + /** + * Similar to {@link #testCloseShardWhileOpeningEngineDuringRecovery()} but verifies a scenario where a shard is being reset. + */ + public void testCloseShardWhileOpeningEngineDuringReset() throws Exception { + CountDownLatch readyToClose = new CountDownLatch(2); // close shard after we have created two engines in recovery and reset. + Phaser readyToReturnEngine = new Phaser(1); + IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> { + InternalEngine engine = new InternalEngine(config); + readyToClose.countDown(); + readyToReturnEngine.arriveAndAwaitAdvance(); + return engine; + }); + + readyToReturnEngine.register(); // for close thread + Thread closeShardThread = new Thread(() -> { + try { + readyToClose.await(); + shard.close("testing", false); + // in integration tests, this is done as a listener on IndexService. + MockFSDirectoryFactory.checkIndex(logger, shard.store(), shard.shardId); + } catch (InterruptedException | IOException e) { + throw new AssertionError(e); + } finally { + readyToReturnEngine.arrive(); + } + }); + closeShardThread.start(); + shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), shard.getLastKnownGlobalCheckpoint(), 0L, + ActionListener.wrap(r -> { + try (r) { + shard.resetEngineToGlobalCheckpoint(); + } + }, Assert::assertNotNull), TimeValue.timeValueMinutes(1L)); + closeShardThread.join(); + closeShard(shard, false); + } + public void testResetEngineWithBrokenTranslog() throws Exception { IndexShard shard = newStartedShard(false); updateMappings(shard, IndexMetaData.builder(shard.indexSettings.getIndexMetaData()) From 56739d570f199349b3acc6131c9e5296bc4f0542 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 1 Nov 2019 11:58:05 -0400 Subject: [PATCH 3/4] Revert "add tests" This reverts commit a56d9ff2a6cfac17a728bc0bc4d6f32f2d6aaf90. --- .../index/shard/IndexShardTests.java | 73 ------------------- 1 file changed, 73 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 1c14d43e0e4a1..d818bd49aa083 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -150,7 +150,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Phaser; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -3926,78 +3925,6 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover closeShard(shard, false); } - /** - * Verifies that after closing shard is returned, we should have released the engine, and won't open a new engine. - */ - public void testCloseShardWhileOpeningEngineDuringRecovery() throws Exception { - CountDownLatch readyToCloseLatch = new CountDownLatch(1); - CountDownLatch closeDoneLatch = new CountDownLatch(1); - IndexShard shard = newShard(false, Settings.EMPTY, config -> { - InternalEngine engine = new InternalEngine(config); - readyToCloseLatch.countDown(); - try { - closeDoneLatch.await(); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - return engine; - }); - - Thread closeShardThread = new Thread(() -> { - try { - readyToCloseLatch.await(); - shard.close("testing", false); - // in integration tests, this is done as a listener on IndexService. - MockFSDirectoryFactory.checkIndex(logger, shard.store(), shard.shardId); - } catch (InterruptedException | IOException e) { - throw new AssertionError(e); - } finally { - closeDoneLatch.countDown(); - } - }); - closeShardThread.start(); - recoveryEmptyReplica(shard, true); - closeShardThread.join(); - closeShard(shard, false); - } - - /** - * Similar to {@link #testCloseShardWhileOpeningEngineDuringRecovery()} but verifies a scenario where a shard is being reset. - */ - public void testCloseShardWhileOpeningEngineDuringReset() throws Exception { - CountDownLatch readyToClose = new CountDownLatch(2); // close shard after we have created two engines in recovery and reset. - Phaser readyToReturnEngine = new Phaser(1); - IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> { - InternalEngine engine = new InternalEngine(config); - readyToClose.countDown(); - readyToReturnEngine.arriveAndAwaitAdvance(); - return engine; - }); - - readyToReturnEngine.register(); // for close thread - Thread closeShardThread = new Thread(() -> { - try { - readyToClose.await(); - shard.close("testing", false); - // in integration tests, this is done as a listener on IndexService. - MockFSDirectoryFactory.checkIndex(logger, shard.store(), shard.shardId); - } catch (InterruptedException | IOException e) { - throw new AssertionError(e); - } finally { - readyToReturnEngine.arrive(); - } - }); - closeShardThread.start(); - shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), shard.getLastKnownGlobalCheckpoint(), 0L, - ActionListener.wrap(r -> { - try (r) { - shard.resetEngineToGlobalCheckpoint(); - } - }, Assert::assertNotNull), TimeValue.timeValueMinutes(1L)); - closeShardThread.join(); - closeShard(shard, false); - } - public void testResetEngineWithBrokenTranslog() throws Exception { IndexShard shard = newStartedShard(false); updateMappings(shard, IndexMetaData.builder(shard.indexSettings.getIndexMetaData()) From 54f052b0c8490fb8f5dc0b7bf603e253a6ec4622 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 7 Nov 2019 14:39:20 -0500 Subject: [PATCH 4/4] add assertions --- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b7137d402b2ee..26ac9075b3dc2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1343,6 +1343,7 @@ public void prepareForIndexRecovery() { * This is the first operation after the local checkpoint of the safe commit if exists. */ public long recoverLocallyUpToGlobalCheckpoint() { + assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -1608,6 +1609,7 @@ private void onNewEngine(Engine newEngine) { * called if recovery has to be restarted after network error / delay ** */ public void performRecoveryRestart() throws IOException { + assert Thread.holdsLock(mutex) == false : "restart recovery under mutex"; synchronized (engineMutex) { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; IOUtils.close(currentEngineReference.getAndSet(null));