diff --git a/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/src/main/java/org/elasticsearch/common/lucene/Lucene.java index b219bbffe780c..0d64a49bcc9e6 100644 --- a/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -24,7 +24,6 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.*; import org.apache.lucene.search.*; -import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -396,6 +395,10 @@ public static final boolean indexExists(final Directory directory) throws IOExce return DirectoryReader.indexExists(directory); } + /** + * Returns true iff the given exception or + * one of it's causes is an instance of {@link CorruptIndexException} otherwise false. + */ public static boolean isCorruptionException(Throwable t) { return ExceptionsHelper.unwrap(t, CorruptIndexException.class) != null; } diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index 8eec7fced303b..45179ff80e0b9 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -1275,34 +1275,35 @@ public void onFailedMerge(MergePolicy.MergeException e) { public void failEngine(String reason, Throwable failure) { assert failure != null; if (failEngineLock.tryLock()) { - - assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine"; - if (failedEngine != null) { - logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure); - return; - } try { - logger.warn("failed engine [{}]", reason, failure); - // we must set a failure exception, generate one if not supplied - failedEngine = failure; - for (FailedEngineListener listener : failedEngineListeners) { - listener.onFailedEngine(shardId, reason, failure); + // we first mark the store as corrupted before we notify any listeners + // this must happen first otherwise we might try to reallocate so quickly + // on the same node that we don't see the corrupted marker file when + // the shard is initializing + if (Lucene.isCorruptionException(failure)) { + try { + store.markStoreCorrupted(ExceptionsHelper.unwrap(failure, CorruptIndexException.class)); + } catch (IOException e) { + logger.warn("Couldn't marks store corrupted", e); + } } } finally { + assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine"; + if (failedEngine != null) { + logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure); + return; + } try { - if (Lucene.isCorruptionException(failure)) { - try { - store.markStoreCorrupted(ExceptionsHelper.unwrap(failure, CorruptIndexException.class)); - } catch (IOException e) { - logger.warn("Couldn't marks store corrupted", e); - } + logger.warn("failed engine [{}]", reason, failure); + // we must set a failure exception, generate one if not supplied + failedEngine = failure; + for (FailedEngineListener listener : failedEngineListeners) { + listener.onFailedEngine(shardId, reason, failure); } } finally { - // close the engine whatever happens... close(); } } - } else { logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure); }