Skip to content

Commit

Permalink
Fix SnapshotStatus Transport Action Doing IO on Transport Thread
Browse files Browse the repository at this point in the history
There is a small chance here that elastic#67947 would cause the callback
for the repository data to run on a transport or CS updater thread
and do a lot of IO to fetch `SnapshotInfo`.

Fixed by always forking to the generic pool for the callback.
Added test that triggers lots of deserializing repository data from
cache on the transport thread concurrently which triggers this bug
relatively reliable (more than half the runs) but is still reasonably
fast (under 5s).
  • Loading branch information
original-brownbear committed Jan 26, 2021
1 parent 80107e8 commit b06dcd5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {

Expand Down Expand Up @@ -514,6 +516,38 @@ public void testGetSnapshotsRequest() throws Exception {
awaitNoMoreRunningOperations();
}

public void testConcurrentCreateAndStatusAPICalls() throws Exception {
for (int i = 0; i < randomIntBetween(1, 10); i++) {
createIndexWithContent("test-idx-" + i);
}
final String repoName = "test-repo";
createRepository(repoName, "fs");
final int snapshots = randomIntBetween(10, 20);
final List<ActionFuture<SnapshotsStatusResponse>> statuses = new ArrayList<>(snapshots);
final List<ActionFuture<GetSnapshotsResponse>> gets = new ArrayList<>(snapshots);
final Client dataNodeClient = dataNodeClient();
final String[] snapshotNames = createNSnapshots(repoName, snapshots).toArray(Strings.EMPTY_ARRAY);

for (int i = 0; i < snapshots; i++) {
statuses.add(dataNodeClient.admin().cluster().prepareSnapshotStatus(repoName).setSnapshots(snapshotNames).execute());
gets.add(dataNodeClient.admin().cluster().prepareGetSnapshots(repoName).setSnapshots(snapshotNames).execute());
}

for (ActionFuture<SnapshotsStatusResponse> status : statuses) {
assertThat(status.get().getSnapshots(), hasSize(snapshots));
for (SnapshotStatus snapshot : status.get().getSnapshots()) {
assertThat(snapshot.getState(), allOf(not(SnapshotsInProgress.State.FAILED), not(SnapshotsInProgress.State.ABORTED)));
}
}
for (ActionFuture<GetSnapshotsResponse> get : gets) {
final List<SnapshotInfo> snapshotInfos = get.get().getSnapshots(repoName);
assertThat(snapshotInfos, hasSize(snapshots));
for (SnapshotInfo snapshotInfo : snapshotInfos) {
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
}
}
}

private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) {
return snapshotStatus.getIndices().get(indexName).getShards().get(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.node.NodeClient;
Expand All @@ -39,6 +38,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
Expand Down Expand Up @@ -238,9 +238,9 @@ private void loadRepositoryData(SnapshotsInProgress snapshotsInProgress, Snapsho
List<SnapshotStatus> builder, Set<String> currentSnapshotNames, String repositoryName,
ActionListener<SnapshotsStatusResponse> listener) {
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
final ListenableFuture<RepositoryData> repositoryDataListener = new ListenableFuture<>();
repositoriesService.getRepositoryData(repositoryName, repositoryDataListener);
repositoryDataListener.whenComplete(repositoryData -> {
repositoryDataListener.addListener(ActionListener.wrap(repositoryData -> {
final Map<String, SnapshotId> matchedSnapshotIds = repositoryData.getSnapshotIds().stream()
.filter(s -> requestedSnapshotNames.contains(s.getName()))
.collect(Collectors.toMap(SnapshotId::getName, Function.identity()));
Expand Down Expand Up @@ -295,7 +295,7 @@ private void loadRepositoryData(SnapshotsInProgress snapshotsInProgress, Snapsho
}
}
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
}, listener::onFailure);
}, listener::onFailure), threadPool.generic());
}

/**
Expand Down

0 comments on commit b06dcd5

Please sign in to comment.