From 7078b1be46baef98dade203552ed37c24ef096f0 Mon Sep 17 00:00:00 2001 From: SwethaGuptha <156877431+SwethaGuptha@users.noreply.github.com> Date: Wed, 4 Sep 2024 14:04:13 +0530 Subject: [PATCH] Reset discovery nodes in most transport node actions request. (#15131) Signed-off-by: Swetha Guptha Co-authored-by: Swetha Guptha --- CHANGELOG.md | 1 + .../hotthreads/NodesHotThreadsRequest.java | 2 +- .../cluster/node/info/NodesInfoRequest.java | 2 +- .../cluster/node/stats/NodesStatsRequest.java | 4 +- .../cluster/node/usage/NodesUsageRequest.java | 2 +- .../status/TransportNodesSnapshotsStatus.java | 2 +- .../cluster/stats/ClusterStatsRequest.java | 2 +- .../find/FindDanglingIndexRequest.java | 2 +- .../list/ListDanglingIndicesRequest.java | 4 +- .../action/search/GetAllPitNodesRequest.java | 2 +- .../support/nodes/BaseNodesRequest.java | 16 ++++-- .../support/nodes/TransportNodesAction.java | 12 ++-- .../TransportNodesListGatewayMetaState.java | 2 +- ...ransportNodesListGatewayStartedShards.java | 2 +- ...ortNodesListGatewayStartedShardsBatch.java | 2 +- .../TransportNodesListShardStoreMetadata.java | 2 +- ...sportNodesListShardStoreMetadataBatch.java | 2 +- .../admin/cluster/RestClusterStatsAction.java | 1 - .../admin/cluster/RestNodesInfoAction.java | 1 - .../admin/cluster/RestNodesStatsAction.java | 1 - .../rest/action/cat/RestNodesAction.java | 2 - .../TransportClusterStatsActionTests.java | 37 ------------- .../nodes/TransportNodesActionTests.java | 55 ++++++++++++++++++- .../nodes/TransportNodesInfoActionTests.java | 20 ------- .../nodes/TransportNodesStatsActionTests.java | 24 +------- 25 files changed, 89 insertions(+), 113 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb29199d52465..d283a79c61343 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363)) - Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))[SnapshotV2] Snapshot Status API changes (#15409)) - [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218)) +- Reset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java index 9e52b90f7bd38..b7b60732fcbfc 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java @@ -70,7 +70,7 @@ public NodesHotThreadsRequest(StreamInput in) throws IOException { * threads for all nodes is used. */ public NodesHotThreadsRequest(String... nodesIds) { - super(nodesIds); + super(false, nodesIds); } public int threads() { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoRequest.java index 1524e284db937..7039267eb1443 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoRequest.java @@ -88,7 +88,7 @@ public NodesInfoRequest(StreamInput in) throws IOException { * for all nodes will be returned. */ public NodesInfoRequest(String... nodesIds) { - super(nodesIds); + super(false, nodesIds); all(); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index 56711b6ec5fb8..ec4d0b82f3f8f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -59,7 +59,7 @@ public class NodesStatsRequest extends BaseNodesRequest { private final Set requestedMetrics = new HashSet<>(); public NodesStatsRequest() { - super((String[]) null); + super(false, (String[]) null); } public NodesStatsRequest(StreamInput in) throws IOException { @@ -90,7 +90,7 @@ public NodesStatsRequest(StreamInput in) throws IOException { * for all nodes will be returned. */ public NodesStatsRequest(String... nodesIds) { - super(nodesIds); + super(false, nodesIds); } /** diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/usage/NodesUsageRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/usage/NodesUsageRequest.java index 56ef366e055ae..f538e76f7dfa2 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/usage/NodesUsageRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/usage/NodesUsageRequest.java @@ -64,7 +64,7 @@ public NodesUsageRequest(StreamInput in) throws IOException { * passed, usage for all nodes will be returned. */ public NodesUsageRequest(String... nodesIds) { - super(nodesIds); + super(false, nodesIds); } /** diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java index 26b8be0f5bb82..d58c021ef0961 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java @@ -161,7 +161,7 @@ public Request(StreamInput in) throws IOException { } public Request(String[] nodesIds) { - super(nodesIds); + super(false, nodesIds); } public Request snapshots(Snapshot[] snapshots) { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsRequest.java index bd75b2210e474..b82a9d256a134 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsRequest.java @@ -62,7 +62,7 @@ public ClusterStatsRequest(StreamInput in) throws IOException { * based on all nodes will be returned. */ public ClusterStatsRequest(String... nodesIds) { - super(nodesIds); + super(false, nodesIds); } public boolean useAggregatedNodeLevelResponses() { diff --git a/server/src/main/java/org/opensearch/action/admin/indices/dangling/find/FindDanglingIndexRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/dangling/find/FindDanglingIndexRequest.java index f853a47b3c2bf..b38cb46b23ace 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/dangling/find/FindDanglingIndexRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/dangling/find/FindDanglingIndexRequest.java @@ -53,7 +53,7 @@ public FindDanglingIndexRequest(StreamInput in) throws IOException { } public FindDanglingIndexRequest(String indexUUID) { - super(Strings.EMPTY_ARRAY); + super(false, Strings.EMPTY_ARRAY); this.indexUUID = indexUUID; } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/dangling/list/ListDanglingIndicesRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/dangling/list/ListDanglingIndicesRequest.java index 119c4acbf4160..0076c9e294b69 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/dangling/list/ListDanglingIndicesRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/dangling/list/ListDanglingIndicesRequest.java @@ -58,12 +58,12 @@ public ListDanglingIndicesRequest(StreamInput in) throws IOException { } public ListDanglingIndicesRequest() { - super(Strings.EMPTY_ARRAY); + super(false, Strings.EMPTY_ARRAY); this.indexUUID = null; } public ListDanglingIndicesRequest(String indexUUID) { - super(Strings.EMPTY_ARRAY); + super(false, Strings.EMPTY_ARRAY); this.indexUUID = indexUUID; } diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java index 336c8139561e9..97c0d66f4b8e1 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java @@ -27,7 +27,7 @@ public class GetAllPitNodesRequest extends BaseNodesRequest * Setting default behavior as `true` but can be explicitly changed in requests that do not require. */ private boolean includeDiscoveryNodes = true; + private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30); private TimeValue timeout; @@ -88,11 +89,22 @@ protected BaseNodesRequest(String... nodesIds) { this.nodesIds = nodesIds; } + protected BaseNodesRequest(boolean includeDiscoveryNodes, String... nodesIds) { + this.nodesIds = nodesIds; + this.includeDiscoveryNodes = includeDiscoveryNodes; + } + protected BaseNodesRequest(DiscoveryNode... concreteNodes) { this.nodesIds = null; this.concreteNodes = concreteNodes; } + protected BaseNodesRequest(boolean includeDiscoveryNodes, DiscoveryNode... concreteNodes) { + this.nodesIds = null; + this.concreteNodes = concreteNodes; + this.includeDiscoveryNodes = includeDiscoveryNodes; + } + public final String[] nodesIds() { return nodesIds; } @@ -127,10 +139,6 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) { this.concreteNodes = concreteNodes; } - public void setIncludeDiscoveryNodes(boolean value) { - includeDiscoveryNodes = value; - } - public boolean getIncludeDiscoveryNodes() { return includeDiscoveryNodes; } diff --git a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java index 3a356a6bba69a..037427892a550 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java @@ -240,18 +240,16 @@ class AsyncAction { } this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); this.concreteNodes = request.concreteNodes(); - if (request.getIncludeDiscoveryNodes() == false) { - // As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class, we - // remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves + // As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class, + // we remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves // the number of concrete nodes in the memory. request.setConcreteNodes(null); } } void start() { - final DiscoveryNode[] nodes = this.concreteNodes; - if (nodes.length == 0) { + if (this.concreteNodes.length == 0) { // nothing to notify threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses))); return; @@ -260,9 +258,9 @@ void start() { if (request.timeout() != null) { builder.withTimeout(request.timeout()); } - for (int i = 0; i < nodes.length; i++) { + for (int i = 0; i < this.concreteNodes.length; i++) { final int idx = i; - final DiscoveryNode node = nodes[i]; + final DiscoveryNode node = this.concreteNodes[i]; final String nodeId = node.getId(); try { TransportRequest nodeRequest = newNodeRequest(request); diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayMetaState.java index bbbc8be534a96..553aa6d9280f3 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayMetaState.java @@ -133,7 +133,7 @@ public Request(StreamInput in) throws IOException { } public Request(String... nodesIds) { - super(nodesIds); + super(false, nodesIds); } } diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 939051e962f5e..c9eeed0d4fd0b 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -197,7 +197,7 @@ public Request(StreamInput in) throws IOException { } public Request(ShardId shardId, String customDataPath, DiscoveryNode[] nodes) { - super(nodes); + super(false, nodes); this.shardId = Objects.requireNonNull(shardId); this.customDataPath = Objects.requireNonNull(customDataPath); } diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java index 44aa49a8defa6..74c57c1be765d 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java @@ -182,7 +182,7 @@ public Request(StreamInput in) throws IOException { } public Request(DiscoveryNode[] nodes, Map shardAttributes) { - super(nodes); + super(false, nodes); this.shardAttributes = Objects.requireNonNull(shardAttributes); } diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java index 9052aae9b7af1..8d4654b70768b 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -176,7 +176,7 @@ public Request(StreamInput in) throws IOException { } public Request(ShardId shardId, String customDataPath, DiscoveryNode[] nodes) { - super(nodes); + super(false, nodes); this.shardId = Objects.requireNonNull(shardId); this.customDataPath = Objects.requireNonNull(customDataPath); } diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java index d8282182787b5..d10bc25b04104 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java @@ -188,7 +188,7 @@ public Request(StreamInput in) throws IOException { } public Request(Map shardAttributes, DiscoveryNode[] nodes) { - super(nodes); + super(false, nodes); this.shardAttributes = Objects.requireNonNull(shardAttributes); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java index d4426a004af8e..ee33bd18db05d 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java @@ -66,7 +66,6 @@ public String getName() { public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null)); clusterStatsRequest.timeout(request.param("timeout")); - clusterStatsRequest.setIncludeDiscoveryNodes(false); clusterStatsRequest.useAggregatedNodeLevelResponses(true); return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java index 4ac51933ea382..37e4c8783d0df 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java @@ -88,7 +88,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final NodesInfoRequest nodesInfoRequest = prepareRequest(request); nodesInfoRequest.timeout(request.param("timeout")); settingsFilter.addFilterSettingParams(request); - nodesInfoRequest.setIncludeDiscoveryNodes(false); return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java index ed9c0b171aa56..267bfde576dec 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java @@ -232,7 +232,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // If no levels are passed in this results in an empty array. String[] levels = Strings.splitStringByCommaToArray(request.param("level")); nodesStatsRequest.indices().setLevels(levels); - nodesStatsRequest.setIncludeDiscoveryNodes(false); return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index 4d2a3c93ea63e..dcb815b430d27 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -125,7 +125,6 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli public void processResponse(final ClusterStateResponse clusterStateResponse) { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); nodesInfoRequest.timeout(request.param("timeout")); - nodesInfoRequest.setIncludeDiscoveryNodes(false); nodesInfoRequest.clear() .addMetrics( NodesInfoRequest.Metric.JVM.metricName(), @@ -138,7 +137,6 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) { public void processResponse(final NodesInfoResponse nodesInfoResponse) { NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); nodesStatsRequest.timeout(request.param("timeout")); - nodesStatsRequest.setIncludeDiscoveryNodes(false); nodesStatsRequest.clear() .indices(true) .addMetrics( diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java index f8e14b477b8ef..fc920b847cef9 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java @@ -33,48 +33,12 @@ public class TransportClusterStatsActionTests extends TransportNodesActionTests { - /** - * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This - * behavior is asserted in this test. - */ - public void testClusterStatsActionWithRetentionOfDiscoveryNodesList() { - ClusterStatsRequest request = new ClusterStatsRequest(); - request.setIncludeDiscoveryNodes(true); - Map> combinedSentRequest = performNodesInfoAction(request); - - assertNotNull(combinedSentRequest); - combinedSentRequest.forEach((node, capturedRequestList) -> { - assertNotNull(capturedRequestList); - capturedRequestList.forEach(sentRequest -> { - assertNotNull(sentRequest.getDiscoveryNodes()); - assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize()); - }); - }); - } - - public void testClusterStatsActionWithPreFilledConcreteNodesAndWithRetentionOfDiscoveryNodesList() { - ClusterStatsRequest request = new ClusterStatsRequest(); - Collection discoveryNodes = clusterService.state().getNodes().getNodes().values(); - request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new)); - Map> combinedSentRequest = performNodesInfoAction(request); - - assertNotNull(combinedSentRequest); - combinedSentRequest.forEach((node, capturedRequestList) -> { - assertNotNull(capturedRequestList); - capturedRequestList.forEach(sentRequest -> { - assertNotNull(sentRequest.getDiscoveryNodes()); - assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize()); - }); - }); - } - /** * In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is * asserted in this test. */ public void testClusterStatsActionWithoutRetentionOfDiscoveryNodesList() { ClusterStatsRequest request = new ClusterStatsRequest(); - request.setIncludeDiscoveryNodes(false); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); @@ -88,7 +52,6 @@ public void testClusterStatsActionWithPreFilledConcreteNodesAndWithoutRetentionO ClusterStatsRequest request = new ClusterStatsRequest(); Collection discoveryNodes = clusterService.state().getNodes().getNodes().values(); request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new)); - request.setIncludeDiscoveryNodes(false); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java index 799c5dd9482b6..c654704979bb9 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java @@ -53,6 +53,7 @@ import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportService; import org.junit.After; import org.junit.AfterClass; @@ -61,6 +62,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -70,6 +72,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; @@ -165,6 +168,36 @@ public void testCustomResolving() throws Exception { assertEquals(clusterService.state().nodes().getDataNodes().size(), capturedRequests.size()); } + public void testTransportNodesActionWithDiscoveryNodesIncluded() { + String[] nodeIds = clusterService.state().nodes().getNodes().keySet().toArray(new String[0]); + TestNodesRequest request = new TestNodesRequest(true, nodeIds); + getTestTransportNodesAction().new AsyncAction(null, request, new PlainActionFuture<>()).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + List capturedTransportNodeRequestList = capturedRequests.values() + .stream() + .flatMap(Collection::stream) + .map(capturedRequest -> (TestNodeRequest) capturedRequest.request) + .collect(Collectors.toList()); + assertEquals(nodeIds.length, capturedTransportNodeRequestList.size()); + capturedTransportNodeRequestList.forEach( + capturedRequest -> assertEquals(nodeIds.length, capturedRequest.testNodesRequest.concreteNodes().length) + ); + } + + public void testTransportNodesActionWithDiscoveryNodesReset() { + String[] nodeIds = clusterService.state().nodes().getNodes().keySet().toArray(new String[0]); + TestNodesRequest request = new TestNodesRequest(false, nodeIds); + getTestTransportNodesAction().new AsyncAction(null, request, new PlainActionFuture<>()).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + List capturedTransportNodeRequestList = capturedRequests.values() + .stream() + .flatMap(Collection::stream) + .map(capturedRequest -> (TestNodeRequest) capturedRequest.request) + .collect(Collectors.toList()); + assertEquals(nodeIds.length, capturedTransportNodeRequestList.size()); + capturedTransportNodeRequestList.forEach(capturedRequest -> assertNull(capturedRequest.testNodesRequest.concreteNodes())); + } + private List mockList(Supplier supplier, int size) { List failures = new ArrayList<>(size); for (int i = 0; i < size; ++i) { @@ -313,7 +346,7 @@ protected TestNodesResponse newResponse( @Override protected TestNodeRequest newNodeRequest(TestNodesRequest request) { - return new TestNodeRequest(); + return new TestNodeRequest(request); } @Override @@ -356,6 +389,10 @@ private static class TestNodesRequest extends BaseNodesRequest TestNodesRequest(String... nodesIds) { super(nodesIds); } + + TestNodesRequest(boolean includeDiscoveryNodes, String... nodesIds) { + super(includeDiscoveryNodes, nodesIds); + } } private static class TestNodesResponse extends BaseNodesResponse { @@ -383,11 +420,25 @@ protected void writeNodesTo(StreamOutput out, List nodes) thro } } - private static class TestNodeRequest extends BaseNodeRequest { + private static class TestNodeRequest extends TransportRequest { + + protected TestNodesRequest testNodesRequest; + TestNodeRequest() {} + TestNodeRequest(TestNodesRequest testNodesRequest) { + this.testNodesRequest = testNodesRequest; + } + TestNodeRequest(StreamInput in) throws IOException { super(in); + testNodesRequest = new TestNodesRequest(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + testNodesRequest.writeTo(out); } } diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java index e9e09d0dbbbf9..8277dcd363a8d 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java @@ -31,32 +31,12 @@ public class TransportNodesInfoActionTests extends TransportNodesActionTests { - /** - * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This - * behavior is asserted in this test. - */ - public void testNodesInfoActionWithRetentionOfDiscoveryNodesList() { - NodesInfoRequest request = new NodesInfoRequest(); - request.setIncludeDiscoveryNodes(true); - Map> combinedSentRequest = performNodesInfoAction(request); - - assertNotNull(combinedSentRequest); - combinedSentRequest.forEach((node, capturedRequestList) -> { - assertNotNull(capturedRequestList); - capturedRequestList.forEach(sentRequest -> { - assertNotNull(sentRequest.getDiscoveryNodes()); - assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize()); - }); - }); - } - /** * In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is * asserted in this test. */ public void testNodesInfoActionWithoutRetentionOfDiscoveryNodesList() { NodesInfoRequest request = new NodesInfoRequest(); - request.setIncludeDiscoveryNodes(false); Map> combinedSentRequest = performNodesInfoAction(request); assertNotNull(combinedSentRequest); diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java index c7c420e353e1a..5e74dcdbc4953 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java @@ -31,31 +31,11 @@ public class TransportNodesStatsActionTests extends TransportNodesActionTests { /** - * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This - * behavior is asserted in this test. - */ - public void testNodesStatsActionWithRetentionOfDiscoveryNodesList() { - NodesStatsRequest request = new NodesStatsRequest(); - request.setIncludeDiscoveryNodes(true); - Map> combinedSentRequest = performNodesStatsAction(request); - - assertNotNull(combinedSentRequest); - combinedSentRequest.forEach((node, capturedRequestList) -> { - assertNotNull(capturedRequestList); - capturedRequestList.forEach(sentRequest -> { - assertNotNull(sentRequest.getDiscoveryNodes()); - assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize()); - }); - }); - } - - /** - * By default, we send discovery nodes list to each request that is sent across from the coordinator node. This - * behavior is asserted in this test. + * We don't want to send discovery nodes list to each request that is sent across from the coordinator node. + * This behavior is asserted in this test. */ public void testNodesStatsActionWithoutRetentionOfDiscoveryNodesList() { NodesStatsRequest request = new NodesStatsRequest(); - request.setIncludeDiscoveryNodes(false); Map> combinedSentRequest = performNodesStatsAction(request); assertNotNull(combinedSentRequest);