Skip to content

Commit

Permalink
Never release store using CancellableThreads (#45409)
Browse files Browse the repository at this point in the history
Today we can release a Store using CancellableThreads. If we are holding
the last reference, then we will verify the node lock before deleting
the store. Checking node lock performs some I/O on FileChannel. If the
current thread is interrupted, then the channel will be closed and the
node lock will also be invalid.

Closes #45237
  • Loading branch information
dnhatn authored Aug 21, 2019
1 parent 34d6913 commit 0fb695e
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public void messageReceived(final StartRecoveryRequest request, final TransportC
}
}

// exposed for testing
final int numberOfOngoingRecoveries() {
return ongoingRecoveries.ongoingRecoveries.size();
}

final class OngoingRecoveries {
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -230,8 +232,7 @@ && isTargetSameHistory()

try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
shard.store().incRef();
final Releasable releaseStore = Releasables.releaseOnce(shard.store()::decRef);
final Releasable releaseStore = acquireStore(shard.store());
resources.add(releaseStore);
sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
try {
Expand Down Expand Up @@ -393,6 +394,25 @@ public void onFailure(Exception e) {
});
}

/**
* Increases the store reference and returns a {@link Releasable} that will decrease the store reference using the generic thread pool.
* We must never release the store using an interruptible thread as we can risk invalidating the node lock.
*/
private Releasable acquireStore(Store store) {
store.incRef();
return Releasables.releaseOnce(() -> {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
threadPool.generic().execute(new ActionRunnable<>(future) {
@Override
protected void doRun() {
store.decRef();
listener.onResponse(null);
}
});
FutureUtils.get(future);
});
}

static final class SendFileResult {
final List<String> phase1FileNames;
final List<Long> phase1FileSizes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public void testShardsAllocatedAfterDataNodesStart() {
equalTo(false));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45237")
public void testAutoExpandReplicasAdjustedWhenDataNodeJoins() {
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
client().admin().indices().create(createIndexRequest("test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand Down Expand Up @@ -1486,4 +1487,20 @@ public void testPeerRecoveryTrimsLocalTranslog() throws Exception {
}
ensureGreen(indexName);
}

public void testCancelRecoveryWithAutoExpandReplicas() throws Exception {
internalCluster().startMasterOnlyNode();
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all"))
.setWaitForActiveShards(ActiveShardCount.NONE));
internalCluster().startNode();
internalCluster().startNode();
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertAcked(client().admin().indices().prepareDelete("test")); // cancel recoveries
assertBusy(() -> {
for (PeerRecoverySourceService recoveryService : internalCluster().getDataNodeInstances(PeerRecoverySourceService.class)) {
assertThat(recoveryService.numberOfOngoingRecoveries(), equalTo(0));
}
});
}
}

0 comments on commit 0fb695e

Please sign in to comment.