Skip to content

Commit

Permalink
Do not release safe commit with CancellableThreads (#59182)
Browse files Browse the repository at this point in the history
We are leaking a FileChannel in #39585 if we release a safe commit with 
CancellableThreads. Although it is a bug in Lucene where we do not close
a FileChannel if we failed to create a NIOFSIndexInput, I think it's
safer if we release a safe commit using the generic thread pool instead.

Closes #39585
Relates #45409
  • Loading branch information
dnhatn committed Jul 8, 2020
1 parent cc3c8be commit 6a0f741
Showing 1 changed file with 28 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -84,6 +85,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
Expand Down Expand Up @@ -230,7 +232,7 @@ && isTargetSameHistory()
} else {
final Engine.IndexCommitRef safeCommitRef;
try {
safeCommitRef = shard.acquireSafeIndexCommit();
safeCommitRef = acquireSafeCommit(shard);
resources.add(safeCommitRef);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
Expand Down Expand Up @@ -401,17 +403,34 @@ public void onFailure(Exception e) {
*/
private Releasable acquireStore(Store store) {
store.incRef();
return Releasables.releaseOnce(() -> {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
assert threadPool.generic().isShutdown() == false;
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
threadPool.generic().execute(ActionRunnable.run(future, store::decRef));
FutureUtils.get(future);
return Releasables.releaseOnce(() -> runWithGenericThreadPool(store::decRef));
}

/**
* Releasing a safe commit can access some commit files. It's better not to use {@link CancellableThreads} to interact
* with the file systems due to interrupt (see {@link org.apache.lucene.store.NIOFSDirectory} javadocs for more detail).
* This method acquires a safe commit and wraps it to make sure that it will be released using the generic thread pool.
*/
private Engine.IndexCommitRef acquireSafeCommit(IndexShard shard) {
final Engine.IndexCommitRef commitRef = shard.acquireSafeIndexCommit();
final AtomicBoolean closed = new AtomicBoolean(false);
return new Engine.IndexCommitRef(commitRef.getIndexCommit(), () -> {
if (closed.compareAndSet(false, true)) {
runWithGenericThreadPool(commitRef::close);
}
});
}

private void runWithGenericThreadPool(CheckedRunnable<Exception> task) {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
assert threadPool.generic().isShutdown() == false;
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
threadPool.generic().execute(ActionRunnable.run(future, task));
FutureUtils.get(future);
}

static final class SendFileResult {
final List<String> phase1FileNames;
final List<Long> phase1FileSizes;
Expand Down

0 comments on commit 6a0f741

Please sign in to comment.