From 4fd6eb668a80919e7692fed3f3aaa4e6082b833a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 22 Aug 2019 23:35:12 +0200 Subject: [PATCH] Fix TransportSnapshotsStatusAction ThreadPool Use (#45824) In case of an in-progress snapshot this endpoint was broken because it tried to execute repository operations in the callback on a transport thread which is not allowed (only generic or snapshot pool are allowed here). --- .../TransportSnapshotsStatusAction.java | 17 ++++----- .../snapshots/SnapshotStatusApisIT.java | 37 +++++++++++++++++++ 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index cf5dfe80cef12..063f051b13668 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; @@ -115,15 +116,13 @@ protected void masterOperation(final SnapshotsStatusRequest request, for (int i = 0; i < currentSnapshots.size(); i++) { snapshots[i] = currentSnapshots.get(i).snapshot(); } - - TransportNodesSnapshotsStatus.Request nodesRequest = - new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()])) - .snapshots(snapshots).timeout(request.masterNodeTimeout()); - transportNodesSnapshotsStatus.execute(nodesRequest, - ActionListener.map( - listener, nodeSnapshotStatuses -> - buildResponse(request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())), - nodeSnapshotStatuses))); + transportNodesSnapshotsStatus.execute( + new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY)) + .snapshots(snapshots).timeout(request.masterNodeTimeout()), + ActionListener.wrap( + nodeSnapshotStatuses -> threadPool.executor(ThreadPool.Names.GENERIC).execute( + ActionRunnable.wrap(listener, l -> l.onResponse(buildResponse(request, snapshotsService.currentSnapshots( + request.repository(), Arrays.asList(request.snapshots())), nodeSnapshotStatuses)))), listener::onFailure)); } else { // We don't have any in-progress shards, just return current stats listener.onResponse(buildResponse(request, currentSnapshots, null)); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java index 73864cd75e71d..c29eb9ad86bea 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java @@ -19,11 +19,14 @@ package org.elasticsearch.snapshots; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import java.util.List; @@ -71,4 +74,38 @@ public void testStatusApiConsistency() { assertEquals(snStatus.getStats().getStartTime(), snapshotInfo.startTime()); assertEquals(snStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime()); } + + public void testStatusAPICallInProgressSnapshot() throws InterruptedException { + Client client = client(); + + logger.info("--> creating repository"); + assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("mock").setSettings( + Settings.builder().put("location", randomRepoPath()).put("block_on_data", true))); + + createIndex("test-idx-1"); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx-1", "_doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + + logger.info("--> snapshot"); + ActionFuture createSnapshotResponseActionFuture = + client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute(); + + logger.info("--> wait for data nodes to get blocked"); + waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); + + final List snapshotStatus = client.admin().cluster().snapshotsStatus( + new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots(); + assertEquals(snapshotStatus.get(0).getState(), SnapshotsInProgress.State.STARTED); + + logger.info("--> unblock all data nodes"); + unblockAllDataNodes("test-repo"); + + logger.info("--> wait for snapshot to finish"); + createSnapshotResponseActionFuture.actionGet(); + } }