Skip to content

Commit

Permalink
Fix TransportSnapshotsStatusAction ThreadPool Use (#45824) (#45883)
Browse files Browse the repository at this point in the history
In case of an in-progress snapshot this endpoint was broken because
it tried to execute repository operations in the callback on a
transport thread which is not allowed (only generic or snapshot
pool are allowed here).
  • Loading branch information
original-brownbear authored Aug 23, 2019
1 parent f94e4a9 commit ba6d72e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -115,15 +116,13 @@ protected void masterOperation(final SnapshotsStatusRequest request,
for (int i = 0; i < currentSnapshots.size(); i++) {
snapshots[i] = currentSnapshots.get(i).snapshot();
}

TransportNodesSnapshotsStatus.Request nodesRequest =
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()]))
.snapshots(snapshots).timeout(request.masterNodeTimeout());
transportNodesSnapshotsStatus.execute(nodesRequest,
ActionListener.map(
listener, nodeSnapshotStatuses ->
buildResponse(request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
nodeSnapshotStatuses)));
transportNodesSnapshotsStatus.execute(
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY))
.snapshots(snapshots).timeout(request.masterNodeTimeout()),
ActionListener.wrap(
nodeSnapshotStatuses -> threadPool.executor(ThreadPool.Names.GENERIC).execute(
ActionRunnable.wrap(listener, l -> l.onResponse(buildResponse(request, snapshotsService.currentSnapshots(
request.repository(), Arrays.asList(request.snapshots())), nodeSnapshotStatuses)))), listener::onFailure));
} else {
// We don't have any in-progress shards, just return current stats
listener.onResponse(buildResponse(request, currentSnapshots, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.elasticsearch.snapshots;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

import java.util.List;

Expand Down Expand Up @@ -71,4 +74,38 @@ public void testStatusApiConsistency() {
assertEquals(snStatus.getStats().getStartTime(), snapshotInfo.startTime());
assertEquals(snStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime());
}

public void testStatusAPICallInProgressSnapshot() throws InterruptedException {
Client client = client();

logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("mock").setSettings(
Settings.builder().put("location", randomRepoPath()).put("block_on_data", true)));

createIndex("test-idx-1");
ensureGreen();

logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();

logger.info("--> snapshot");
ActionFuture<CreateSnapshotResponse> createSnapshotResponseActionFuture =
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute();

logger.info("--> wait for data nodes to get blocked");
waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));

final List<SnapshotStatus> snapshotStatus = client.admin().cluster().snapshotsStatus(
new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots();
assertEquals(snapshotStatus.get(0).getState(), SnapshotsInProgress.State.STARTED);

logger.info("--> unblock all data nodes");
unblockAllDataNodes("test-repo");

logger.info("--> wait for snapshot to finish");
createSnapshotResponseActionFuture.actionGet();
}
}

0 comments on commit ba6d72e

Please sign in to comment.