Skip to content

Commit

Permalink
Die with dignity while merging
Browse files Browse the repository at this point in the history
If an out of memory error is thrown while merging, today we quietly
rewrap it into a merge exception and the out of memory error is
lost. Instead, we need to rethrow out of memory errors, and in fact any
fatal error here, and let those go uncaught so that the node is torn
down. This commit causes this to be the case.

Relates #27265
  • Loading branch information
jasontedor committed Nov 6, 2017
1 parent fc09a09 commit 0251ff4
Show file tree
Hide file tree
Showing 8 changed files with 592 additions and 332 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.logging.Loggers;

Expand Down Expand Up @@ -68,11 +67,7 @@ public void uncaughtException(Thread t, Throwable e) {

// visible for testing
static boolean isFatalUncaught(Throwable e) {
return isFatalCause(e) || (e instanceof MergePolicy.MergeException && isFatalCause(e.getCause()));
}

private static boolean isFatalCause(Throwable cause) {
return cause instanceof Error;
return e instanceof Error;
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,23 +1224,15 @@ public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineExc
}
}

@SuppressWarnings("finally")
private boolean failOnTragicEvent(AlreadyClosedException ex) {
final boolean engineFailed;
// if we are already closed due to some tragic exception
// we need to fail the engine. it might have already been failed before
// but we are double-checking it's failed and closed
if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) {
if (indexWriter.getTragicException() instanceof Error) {
try {
logger.error("tragic event in index writer", ex);
} finally {
throw (Error) indexWriter.getTragicException();
}
} else {
failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException());
engineFailed = true;
}
maybeDie("tragic event in index writer", indexWriter.getTragicException());
failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException());
engineFailed = true;
} else if (translog.isOpen() == false && translog.getTragicException() != null) {
failEngine("already closed by tragic event on the translog", translog.getTragicException());
engineFailed = true;
Expand Down Expand Up @@ -1381,34 +1373,43 @@ private long loadCurrentVersionFromIndex(Term uid) throws IOException {
// pkg-private for testing
IndexWriter createWriter(boolean create) throws IOException {
try {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexDeletionPolicy(deletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
try {
verbose = Boolean.parseBoolean(System.getProperty("tests.verbose"));
} catch (Exception ignore) {
}
iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger));
iwc.setMergeScheduler(mergeScheduler);
MergePolicy mergePolicy = config().getMergePolicy();
// Give us the opportunity to upgrade old segments while performing
// background merges
mergePolicy = new ElasticsearchMergePolicy(mergePolicy);
iwc.setMergePolicy(mergePolicy);
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
iwc.setCodec(engineConfig.getCodec());
iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh
return new IndexWriter(store.directory(), iwc);
final IndexWriterConfig iwc = getIndexWriterConfig(create);
return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
logger.warn("could not lock IndexWriter", ex);
throw ex;
}
}

IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
return new IndexWriter(directory, iwc);
}

private IndexWriterConfig getIndexWriterConfig(boolean create) {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexDeletionPolicy(deletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
try {
verbose = Boolean.parseBoolean(System.getProperty("tests.verbose"));
} catch (Exception ignore) {
}
iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger));
iwc.setMergeScheduler(mergeScheduler);
MergePolicy mergePolicy = config().getMergePolicy();
// Give us the opportunity to upgrade old segments while performing
// background merges
mergePolicy = new ElasticsearchMergePolicy(mergePolicy);
iwc.setMergePolicy(mergePolicy);
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
iwc.setCodec(engineConfig.getCodec());
iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh
return iwc;
}

/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
static final class SearchFactory extends EngineSearcherFactory {
private final Engine.Warmer warmer;
Expand Down Expand Up @@ -1539,7 +1540,6 @@ protected void doRun() throws Exception {

@Override
protected void handleMergeException(final Directory dir, final Throwable exc) {
logger.error("failed to merge", exc);
engineConfig.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
Expand All @@ -1548,13 +1548,39 @@ public void onFailure(Exception e) {

@Override
protected void doRun() throws Exception {
MergePolicy.MergeException e = new MergePolicy.MergeException(exc, dir);
failEngine("merge failed", e);
/*
* We do this on another thread rather than the merge thread that we are initially called on so that we have complete
* confidence that the call stack does not contain catch statements that would cause the error that might be thrown
* here from being caught and never reaching the uncaught exception handler.
*/
maybeDie("fatal error while merging", exc);
logger.error("failed to merge", exc);
failEngine("merge failed", new MergePolicy.MergeException(exc, dir));
}
});
}
}

/**
* If the specified throwable is a fatal error, this throwable will be thrown. Callers should ensure that there are no catch statements
* that would catch an error in the stack as the fatal error here should go uncaught and be handled by the uncaught exception handler
* that we install during bootstrap. If the specified throwable is indeed a fatal error, the specified message will attempt to be logged
* before throwing the fatal error. If the specified throwable is not a fatal error, this method is a no-op.
*
* @param maybeMessage the message to maybe log
* @param maybeFatal the throwable that is maybe fatal
*/
@SuppressWarnings("finally")
private void maybeDie(final String maybeMessage, final Throwable maybeFatal) {
if (maybeFatal instanceof Error) {
try {
logger.error(maybeMessage, maybeFatal);
} finally {
throw (Error) maybeFatal;
}
}
}

private void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
ensureCanFlush();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.bootstrap;

import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

Expand Down Expand Up @@ -131,7 +130,6 @@ void onNonFatalUncaught(String threadName, Throwable t) {
}

public void testIsFatalCause() {
assertFatal(new MergePolicy.MergeException(new OutOfMemoryError(), null));
assertFatal(new OutOfMemoryError());
assertFatal(new StackOverflowError());
assertFatal(new InternalError());
Expand Down
Loading

0 comments on commit 0251ff4

Please sign in to comment.