Skip to content

Commit

Permalink
[ENGINE] Mark store as corrupted before sending failed shard
Browse files Browse the repository at this point in the history
We have to mark a shard as corrupted if necessary before the
shard failed event is fired ie. before we call the corresponding
listener in the engine. Otherwise the shard might be re-allocated
on the same node and just started up without being marked as corrupted.

Relates to #5924
  • Loading branch information
s1monw committed Jul 14, 2014
1 parent de1e7fe commit 4a89c0d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 20 deletions.
5 changes: 4 additions & 1 deletion src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.*;
import org.apache.lucene.search.*;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
Expand Down Expand Up @@ -396,6 +395,10 @@ public static final boolean indexExists(final Directory directory) throws IOExce
return DirectoryReader.indexExists(directory);
}

/**
* Returns <tt>true</tt> iff the given exception or
* one of it's causes is an instance of {@link CorruptIndexException} otherwise <tt>false</tt>.
*/
public static boolean isCorruptionException(Throwable t) {
return ExceptionsHelper.unwrap(t, CorruptIndexException.class) != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1275,34 +1275,35 @@ public void onFailedMerge(MergePolicy.MergeException e) {
public void failEngine(String reason, Throwable failure) {
assert failure != null;
if (failEngineLock.tryLock()) {

assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine";
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
return;
}
try {
logger.warn("failed engine [{}]", reason, failure);
// we must set a failure exception, generate one if not supplied
failedEngine = failure;
for (FailedEngineListener listener : failedEngineListeners) {
listener.onFailedEngine(shardId, reason, failure);
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrap(failure, CorruptIndexException.class));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
}
} finally {
assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine";
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
return;
}
try {
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrap(failure, CorruptIndexException.class));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
logger.warn("failed engine [{}]", reason, failure);
// we must set a failure exception, generate one if not supplied
failedEngine = failure;
for (FailedEngineListener listener : failedEngineListeners) {
listener.onFailedEngine(shardId, reason, failure);
}
} finally {
// close the engine whatever happens...
close();
}
}

} else {
logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure);
}
Expand Down

0 comments on commit 4a89c0d

Please sign in to comment.