From e8bdc7f0338f73c630106262cd17d78587f6e7c8 Mon Sep 17 00:00:00 2001 From: INDRAJIT BANERJEE Date: Tue, 8 Aug 2023 10:18:35 +0530 Subject: [PATCH] Moving get snapshot requests to listener based async calls (#8377) * Moving get snapshot requests to listener based async calls --------- Signed-off-by: INDRAJIT BANERJEE Signed-off-by: Indrajit Banerjee Signed-off-by: Ivan Brusic --- CHANGELOG.md | 1 + .../get/TransportGetSnapshotsAction.java | 87 +++++++------- .../snapshots/SnapshotResiliencyTests.java | 112 +++++++++++++++++- 3 files changed, 155 insertions(+), 45 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2bec6f9339eb5..a3df2760f1b7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Add support to restore only unassigned shards of an index ([#8792](https://github.com/opensearch-project/OpenSearch/pull/8792)) - Replace the deprecated IndexReader APIs with new storedFields() & termVectors() ([#7792](https://github.com/opensearch-project/OpenSearch/pull/7792)) - [Remote Store] Restrict user override for remote store index level settings ([#8812](https://github.com/opensearch-project/OpenSearch/pull/8812)) +- Removed blocking wait in TransportGetSnapshotsAction which was exhausting generic threadpool ([#8377](https://github.com/opensearch-project/OpenSearch/pull/8377)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index 11ea4112f6e67..dbf47f2b121e3 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -37,8 +37,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.CollectionUtil; import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.SnapshotsInProgress; @@ -138,57 +138,64 @@ protected void clusterManagerOperation( currentSnapshots.add(snapshotInfo); } - final RepositoryData repositoryData; + final StepListener repositoryDataListener = new StepListener<>(); if (isCurrentSnapshotsOnly(request.snapshots()) == false) { - repositoryData = PlainActionFuture.get(fut -> repositoriesService.getRepositoryData(repository, fut)); - for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { - allSnapshotIds.put(snapshotId.getName(), snapshotId); - } + repositoriesService.getRepositoryData(repository, repositoryDataListener); } else { - repositoryData = null; + // Setting repositoryDataListener response to be null if the request has only current snapshot + repositoryDataListener.onResponse(null); } + repositoryDataListener.whenComplete(repositoryData -> { + if (repositoryData != null) { + for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { + allSnapshotIds.put(snapshotId.getName(), snapshotId); + } + } - final Set toResolve = new HashSet<>(); - if (isAllSnapshots(request.snapshots())) { - toResolve.addAll(allSnapshotIds.values()); - } else { - for (String snapshotOrPattern : request.snapshots()) { - if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { - toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList())); - } else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) { - if (allSnapshotIds.containsKey(snapshotOrPattern)) { - toResolve.add(allSnapshotIds.get(snapshotOrPattern)); - } else if (request.ignoreUnavailable() == false) { - throw new SnapshotMissingException(repository, snapshotOrPattern); - } - } else { - for (Map.Entry entry : allSnapshotIds.entrySet()) { - if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { - toResolve.add(entry.getValue()); + final Set toResolve = new HashSet<>(); + if (isAllSnapshots(request.snapshots())) { + toResolve.addAll(allSnapshotIds.values()); + } else { + for (String snapshotOrPattern : request.snapshots()) { + if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { + toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList())); + } else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) { + if (allSnapshotIds.containsKey(snapshotOrPattern)) { + toResolve.add(allSnapshotIds.get(snapshotOrPattern)); + } else if (request.ignoreUnavailable() == false) { + throw new SnapshotMissingException(repository, snapshotOrPattern); + } + } else { + for (Map.Entry entry : allSnapshotIds.entrySet()) { + if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { + toResolve.add(entry.getValue()); + } } } } - } - if (toResolve.isEmpty() && request.ignoreUnavailable() == false && isCurrentSnapshotsOnly(request.snapshots()) == false) { - throw new SnapshotMissingException(repository, request.snapshots()[0]); + if (toResolve.isEmpty() + && request.ignoreUnavailable() == false + && isCurrentSnapshotsOnly(request.snapshots()) == false) { + throw new SnapshotMissingException(repository, request.snapshots()[0]); + } } - } - final List snapshotInfos; - if (request.verbose()) { - snapshotInfos = snapshots(snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable()); - } else { - if (repositoryData != null) { - // want non-current snapshots as well, which are found in the repository data - snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots); + final List snapshotInfos; + if (request.verbose()) { + snapshotInfos = snapshots(snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable()); } else { - // only want current snapshots - snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList()); - CollectionUtil.timSort(snapshotInfos); + if (repositoryData != null) { + // want non-current snapshots as well, which are found in the repository data + snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots); + } else { + // only want current snapshots + snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList()); + CollectionUtil.timSort(snapshotInfos); + } } - } - listener.onResponse(new GetSnapshotsResponse(snapshotInfos)); + listener.onResponse(new GetSnapshotsResponse(snapshotInfos)); + }, listener::onFailure); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 92781cf160afc..c04e2821d7931 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -57,6 +57,9 @@ import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsAction; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; +import org.opensearch.action.admin.cluster.snapshots.get.TransportGetSnapshotsAction; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; @@ -743,7 +746,6 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { ); final StepListener createOtherSnapshotResponseStepListener = new StepListener<>(); - continueOrDie( createSnapshotResponseStepListener, createSnapshotResponse -> client().admin() @@ -751,7 +753,6 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { .prepareCreateSnapshot(repoName, "snapshot-2") .execute(createOtherSnapshotResponseStepListener) ); - final StepListener deleteSnapshotStepListener = new StepListener<>(); continueOrDie( @@ -784,7 +785,6 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); // We end up with two snapshots no matter if the delete worked out or not assertThat(snapshotIds, hasSize(2)); - for (SnapshotId snapshotId : snapshotIds) { final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); @@ -794,6 +794,96 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { } } + public void testTransportGetSnapshotsAction() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + final String[] snapshotsList = { "snapshot-1", "snapshot-2" }; + final String[] indexList = { "index-1", "index-2" }; + final int shards = randomIntBetween(1, 10); + + TestClusterNodes.TestClusterNode clusterManagerNode = testClusterNodes.currentClusterManager( + testClusterNodes.nodes.values().iterator().next().clusterService.state() + ); + + for (int i = 0; i < snapshotsList.length; i++) { + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + final String snapshot = snapshotsList[i]; + final String index = indexList[i]; + continueOrDie( + createRepoAndIndex(repoName, index, shards), + createSnapshotResponse -> client().admin() + .cluster() + .prepareCreateSnapshot(repoName, snapshot) + .setWaitForCompletion(true) + .execute(createSnapshotResponseStepListener) + ); + } + deterministicTaskQueue.runAllRunnableTasks(); + + TransportAction getSnapshotsAction = clusterManagerNode.actions.get(GetSnapshotsAction.INSTANCE); + TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction; + GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName).snapshots(snapshotsList); + + transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("Snapshot list should not be null", repoSnapshotResponse.getSnapshots()); + assertThat(repoSnapshotResponse.getSnapshots(), hasSize(2)); + List snapshotInfos = repoSnapshotResponse.getSnapshots(); + for (SnapshotInfo snapshotInfo : snapshotInfos) { + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertEquals(0, snapshotInfo.failedShards()); + assertTrue(Arrays.stream(snapshotsList).anyMatch(snapshotInfo.snapshotId().getName()::equals)); + } + }, exception -> { throw new AssertionError(exception); })); + } + + public void testTransportGetCurrentSnapshotsAction() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + final String index = "index-1"; + final String[] snapshotsList = { GetSnapshotsRequest.CURRENT_SNAPSHOT }; + final int shards = randomIntBetween(1, 10); + + TestClusterNodes.TestClusterNode clusterManagerNode = testClusterNodes.currentClusterManager( + testClusterNodes.nodes.values().iterator().next().clusterService.state() + ); + + final StepListener createSnapshotResponseListener = new StepListener<>(); + clusterManagerNode.clusterService.addListener(new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.state().custom(SnapshotsInProgress.TYPE) != null) { + TransportAction getSnapshotsAction = clusterManagerNode.actions.get(GetSnapshotsAction.INSTANCE); + TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction; + GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) + .snapshots(snapshotsList) + .ignoreUnavailable(false) + .verbose(false); + transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("Snapshot list should not be null", repoSnapshotResponse.getSnapshots()); + List snapshotInfos = repoSnapshotResponse.getSnapshots(); + assertThat(repoSnapshotResponse.getSnapshots(), hasSize(snapshotsList.length)); + for (SnapshotInfo snapshotInfo : snapshotInfos) { + assertEquals(SnapshotState.IN_PROGRESS, snapshotInfo.state()); + assertEquals(0, snapshotInfo.failedShards()); + assertTrue(snapshotInfo.snapshotId().getName().contains("last-snapshot")); + } + }, exception -> { throw new AssertionError(exception); })); + clusterManagerNode.clusterService.removeListener(this); + } + } + }); + continueOrDie( + createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin() + .cluster() + .prepareCreateSnapshot(repoName, GetSnapshotsRequest.CURRENT_SNAPSHOT) + .execute(createSnapshotResponseListener) + ); + deterministicTaskQueue.runAllRunnableTasks(); + } + public void testBulkSnapshotDeleteWithAbort() { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); @@ -1756,7 +1846,6 @@ public TestClusterNode currentClusterManager(ClusterState state) { } private final class TestClusterNode { - private final Logger logger = LogManager.getLogger(TestClusterNode.class); private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry( @@ -1802,6 +1891,8 @@ private final class TestClusterNode { private Coordinator coordinator; + private Map actions = new HashMap<>(); + TestClusterNode(DiscoveryNode node) throws IOException { this.node = node; final Environment environment = createEnvironment(node.getName()); @@ -2038,7 +2129,7 @@ public void onFailure(final Exception e) { SegmentReplicationCheckpointPublisher.EMPTY, mock(RemoteRefreshSegmentPressureService.class) ); - Map actions = new HashMap<>(); + final SystemIndices systemIndices = new SystemIndices(emptyMap()); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices); final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService( @@ -2270,6 +2361,17 @@ public void onFailure(final Exception e) { indexNameExpressionResolver ) ); + actions.put( + GetSnapshotsAction.INSTANCE, + new TransportGetSnapshotsAction( + transportService, + clusterService, + threadPool, + repositoriesService, + actionFilters, + indexNameExpressionResolver + ) + ); actions.put( ClusterStateAction.INSTANCE, new TransportClusterStateAction(