diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 33829e88b4d15..26ac9075b3dc2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1158,11 +1158,9 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { synchronized (engineMutex) { // if the engine is not running, we can access the store directly, but we need to make sure no one starts // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. - synchronized (mutex) { - final Engine engine = getEngineOrNull(); - if (engine != null) { - indexCommit = engine.acquireLastIndexCommit(false); - } + final Engine engine = getEngineOrNull(); + if (engine != null) { + indexCommit = engine.acquireLastIndexCommit(false); } if (indexCommit == null) { return store.getMetadata(null, true); @@ -1286,9 +1284,11 @@ public CacheHelper getReaderCacheHelper() { } public void close(String reason, boolean flushEngine) throws IOException { - synchronized (mutex) { + synchronized (engineMutex) { try { - changeState(IndexShardState.CLOSED, reason); + synchronized (mutex) { + changeState(IndexShardState.CLOSED, reason); + } } finally { final Engine engine = this.currentEngineReference.getAndSet(null); try { @@ -1343,6 +1343,7 @@ public void prepareForIndexRecovery() { * This is the first operation after the local checkpoint of the safe commit if exists. */ public long recoverLocallyUpToGlobalCheckpoint() { + assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -1394,7 +1395,7 @@ public long recoverLocallyUpToGlobalCheckpoint() { getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint); logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint)); } finally { - synchronized (mutex) { + synchronized (engineMutex) { IOUtils.close(currentEngineReference.getAndSet(null)); } } @@ -1569,23 +1570,15 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t : "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource() + "] but got " + getRetentionLeases(); synchronized (engineMutex) { + assert currentEngineReference.get() == null : "engine is running"; + verifyNotClosed(); // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = engineFactory.newReadWriteEngine(config); - synchronized (mutex) { - try { - verifyNotClosed(); - assert currentEngineReference.get() == null : "engine is running"; - onNewEngine(newEngine); - currentEngineReference.set(newEngine); - // We set active because we are now writing operations to the engine; this way, - // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. - active.set(true); - } finally { - if (currentEngineReference.get() != newEngine) { - newEngine.close(); - } - } - } + onNewEngine(newEngine); + currentEngineReference.set(newEngine); + // We set active because we are now writing operations to the engine; this way, + // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. + active.set(true); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. @@ -1616,7 +1609,8 @@ private void onNewEngine(Engine newEngine) { * called if recovery has to be restarted after network error / delay ** */ public void performRecoveryRestart() throws IOException { - synchronized (mutex) { + assert Thread.holdsLock(mutex) == false : "restart recovery under mutex"; + synchronized (engineMutex) { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; IOUtils.close(currentEngineReference.getAndSet(null)); resetRecoveryStage(); @@ -3288,7 +3282,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. */ void resetEngineToGlobalCheckpoint() throws IOException { - assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex"; + assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert getActiveOperationsCount() == OPERATIONS_BLOCKED : "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']'; sync(); // persist the global checkpoint to disk @@ -3301,6 +3295,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED final long globalCheckpoint = getLastKnownGlobalCheckpoint(); assert globalCheckpoint == getLastSyncedGlobalCheckpoint(); synchronized (engineMutex) { + verifyNotClosed(); // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, // acquireXXXCommit and close works. final Engine readOnlyEngine = @@ -3328,7 +3323,7 @@ public IndexCommitRef acquireSafeIndexCommit() { @Override public void close() throws IOException { - assert Thread.holdsLock(mutex); + assert Thread.holdsLock(engineMutex); Engine newEngine = newEngineReference.get(); if (newEngine == currentEngineReference.get()) { @@ -3338,28 +3333,9 @@ public void close() throws IOException { IOUtils.close(super::close, newEngine); } }; - synchronized (mutex) { - try { - verifyNotClosed(); - IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); - } finally { - if (currentEngineReference.get() != readOnlyEngine) { - readOnlyEngine.close(); - } - } - } - final Engine newReadWriteEngine = engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)); - synchronized (mutex) { - try { - verifyNotClosed(); - newEngineReference.set(newReadWriteEngine); - onNewEngine(newReadWriteEngine); - } finally { - if (newEngineReference.get() != newReadWriteEngine) { - newReadWriteEngine.close(); // shard was closed - } - } - } + IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); + onNewEngine(newEngineReference.get()); } final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { @@ -3367,7 +3343,7 @@ public void close() throws IOException { }); newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint); newEngineReference.get().refresh("reset_engine"); - synchronized (mutex) { + synchronized (engineMutex) { verifyNotClosed(); IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get())); // We set active because we are now writing operations to the engine; this way, diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 5a50abd3a6fc9..feb8f9e4cde79 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -133,6 +133,7 @@ import java.util.Random; import java.util.Set; import java.util.TimeZone; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -511,15 +512,21 @@ public final void ensureAllSearchContextsReleased() throws Exception { // TODO: can we do this cleaner??? /** MockFSDirectoryService sets this: */ - public static boolean checkIndexFailed; + public static final List checkIndexFailures = new CopyOnWriteArrayList<>(); @Before public final void resetCheckIndexStatus() throws Exception { - checkIndexFailed = false; + checkIndexFailures.clear(); } public final void ensureCheckIndexPassed() { - assertFalse("at least one shard failed CheckIndex", checkIndexFailed); + if (checkIndexFailures.isEmpty() == false) { + final AssertionError e = new AssertionError("at least one shard failed CheckIndex"); + for (Exception failure : checkIndexFailures) { + e.addSuppressed(failure); + } + throw e; + } } // ----------------------------------------------------------------- diff --git a/test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryFactory.java b/test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryFactory.java index 58e881b296a7d..89fc8877fcf28 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryFactory.java @@ -83,17 +83,19 @@ public static void checkIndex(Logger logger, Store store, ShardId shardId) { CheckIndex.Status status = store.checkIndex(out); out.flush(); if (!status.clean) { - ESTestCase.checkIndexFailed = true; - logger.warn("check index [failure] index files={}\n{}", Arrays.toString(dir.listAll()), os.bytes().utf8ToString()); - throw new IOException("index check failure"); + IOException failure = new IOException("failed to check index for shard " + shardId + + ";index files [" + Arrays.toString(dir.listAll()) + "] os [" + os.bytes().utf8ToString() + "]"); + ESTestCase.checkIndexFailures.add(failure); + throw failure; } else { if (logger.isDebugEnabled()) { logger.debug("check index [success]\n{}", os.bytes().utf8ToString()); } } } catch (LockObtainFailedException e) { - ESTestCase.checkIndexFailed = true; - throw new IllegalStateException("IndexWriter is still open on shard " + shardId, e); + IllegalStateException failure = new IllegalStateException("IndexWriter is still open on shard " + shardId, e); + ESTestCase.checkIndexFailures.add(failure); + throw failure; } } catch (Exception e) { logger.warn("failed to check index", e);