diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/features/TransportNodesFeaturesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/features/TransportNodesFeaturesAction.java index 83d1356e5ef62..d20eee96809e8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/features/TransportNodesFeaturesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/features/TransportNodesFeaturesAction.java @@ -16,7 +16,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.core.UpdateForV9; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.features.FeatureService; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; @@ -27,8 +27,7 @@ import java.io.IOException; import java.util.List; -@UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA) -// @UpdateForV10 // this can be removed in v10. It may be called by v8 nodes to v9 nodes. +@UpdateForV10(owner = UpdateForV10.Owner.CORE_INFRA) // this can be removed in v10. It may be called by v8 nodes to v9 nodes. public class TransportNodesFeaturesAction extends TransportNodesAction< NodesFeaturesRequest, NodesFeaturesResponse, diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index c051f0ca7a6f5..09fb70fb06ba4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -299,7 +299,7 @@ protected void performPhaseOnShard(final int shardIndex, final SearchShardIterat } private void doPerformPhaseOnShard(int shardIndex, SearchShardIterator shardIt, SearchShardTarget shard, Releasable releasable) { - executePhaseOnShard(shardIt, shard, new SearchActionListener<>(shard, shardIndex) { + var shardListener = new SearchActionListener(shard, shardIndex) { @Override public void innerOnResponse(Result result) { try { @@ -315,7 +315,15 @@ public void onFailure(Exception e) { releasable.close(); onShardFailure(shardIndex, shard, shardIt, e); } - }); + }; + final Transport.Connection connection; + try { + connection = getConnection(shard.getClusterAlias(), shard.getNodeId()); + } catch (Exception e) { + shardListener.onFailure(e); + return; + } + executePhaseOnShard(shardIt, connection, shardListener); } private void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) { @@ -327,12 +335,12 @@ private void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) { /** * Sends the request to the actual shard. * @param shardIt the shards iterator - * @param shard the shard routing to send the request for + * @param connection to node that the shard is located on * @param listener the listener to notify on response */ protected abstract void executePhaseOnShard( SearchShardIterator shardIt, - SearchShardTarget shard, + Transport.Connection connection, SearchActionListener listener ); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index 69ca1569a7c07..25d59a06664da 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -84,16 +84,9 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction @Override protected void executePhaseOnShard( final SearchShardIterator shardIt, - final SearchShardTarget shard, + final Transport.Connection connection, final SearchActionListener listener ) { - final Transport.Connection connection; - try { - connection = getConnection(shard.getClusterAlias(), shard.getNodeId()); - } catch (Exception e) { - listener.onFailure(e); - return; - } getSearchTransport().sendExecuteDfs(connection, buildShardSearchRequest(shardIt, listener.requestIndex), getTask(), listener); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhase.java index d91ea85e2fa97..986f7210c0d1b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhase.java @@ -79,7 +79,7 @@ protected static void doCheckNoMissingShards( /** * Releases shard targets that are not used in the docsIdsToLoad. */ - protected void releaseIrrelevantSearchContext(SearchPhaseResult searchPhaseResult, AbstractSearchAsyncAction context) { + protected static void releaseIrrelevantSearchContext(SearchPhaseResult searchPhaseResult, AbstractSearchAsyncAction context) { // we only release search context that we did not fetch from, if we are not scrolling // or using a PIT and if it has at least one hit that didn't make it to the global topDocs if (searchPhaseResult == null) { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 84e0e2adea612..f75b84abc2f0f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -91,16 +91,9 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener ) { - final Transport.Connection connection; - try { - connection = getConnection(shard.getClusterAlias(), shard.getNodeId()); - } catch (Exception e) { - listener.onFailure(e); - return; - } ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt, listener.requestIndex)); getSearchTransport().sendExecuteQuery(connection, request, getTask(), listener); } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index 7ba4a7ce59869..9e60eedbad6a2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -35,7 +35,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; -import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchContextId; @@ -252,16 +251,9 @@ protected String missingShardsErrorMessage(StringBuilder missingShards) { @Override protected void executePhaseOnShard( SearchShardIterator shardIt, - SearchShardTarget shard, + Transport.Connection connection, SearchActionListener phaseListener ) { - final Transport.Connection connection; - try { - connection = connectionLookup.apply(shardIt.getClusterAlias(), shard.getNodeId()); - } catch (Exception e) { - phaseListener.onFailure(e); - return; - } transportService.sendChildRequest( connection, OPEN_SHARD_READER_CONTEXT_NAME, diff --git a/server/src/main/java/org/elasticsearch/cluster/features/NodeFeaturesFixupListener.java b/server/src/main/java/org/elasticsearch/cluster/features/NodeFeaturesFixupListener.java deleted file mode 100644 index 4d9074be15695..0000000000000 --- a/server/src/main/java/org/elasticsearch/cluster/features/NodeFeaturesFixupListener.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.cluster.features; - -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.admin.cluster.node.features.NodeFeatures; -import org.elasticsearch.action.admin.cluster.node.features.NodesFeaturesRequest; -import org.elasticsearch.action.admin.cluster.node.features.NodesFeaturesResponse; -import org.elasticsearch.action.admin.cluster.node.features.TransportNodesFeaturesAction; -import org.elasticsearch.client.internal.ClusterAdminClient; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterFeatures; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.ClusterStateTaskExecutor; -import org.elasticsearch.cluster.ClusterStateTaskListener; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.cluster.service.MasterServiceTaskQueue; -import org.elasticsearch.common.Priority; -import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.core.SuppressForbidden; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.UpdateForV9; -import org.elasticsearch.logging.LogManager; -import org.elasticsearch.logging.Logger; -import org.elasticsearch.threadpool.Scheduler; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.stream.Collectors; - -@UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA) // this can be removed in v9 -public class NodeFeaturesFixupListener implements ClusterStateListener { - - private static final Logger logger = LogManager.getLogger(NodeFeaturesFixupListener.class); - - private static final TimeValue RETRY_TIME = TimeValue.timeValueSeconds(30); - - private final MasterServiceTaskQueue taskQueue; - private final ClusterAdminClient client; - private final Scheduler scheduler; - private final Executor executor; - private final Set pendingNodes = Collections.synchronizedSet(new HashSet<>()); - - public NodeFeaturesFixupListener(ClusterService service, ClusterAdminClient client, ThreadPool threadPool) { - // there tends to be a lot of state operations on an upgrade - this one is not time-critical, - // so use LOW priority. It just needs to be run at some point after upgrade. - this( - service.createTaskQueue("fix-node-features", Priority.LOW, new NodesFeaturesUpdater()), - client, - threadPool, - threadPool.executor(ThreadPool.Names.CLUSTER_COORDINATION) - ); - } - - NodeFeaturesFixupListener( - MasterServiceTaskQueue taskQueue, - ClusterAdminClient client, - Scheduler scheduler, - Executor executor - ) { - this.taskQueue = taskQueue; - this.client = client; - this.scheduler = scheduler; - this.executor = executor; - } - - class NodesFeaturesTask implements ClusterStateTaskListener { - private final Map> results; - private final int retryNum; - - NodesFeaturesTask(Map> results, int retryNum) { - this.results = results; - this.retryNum = retryNum; - } - - @Override - public void onFailure(Exception e) { - logger.error("Could not apply features for nodes {} to cluster state", results.keySet(), e); - scheduleRetry(results.keySet(), retryNum); - } - - public Map> results() { - return results; - } - } - - static class NodesFeaturesUpdater implements ClusterStateTaskExecutor { - @Override - public ClusterState execute(BatchExecutionContext context) { - ClusterState.Builder builder = ClusterState.builder(context.initialState()); - var existingFeatures = builder.nodeFeatures(); - - boolean modified = false; - for (var c : context.taskContexts()) { - for (var e : c.getTask().results().entrySet()) { - // double check there are still no features for the node - if (existingFeatures.getOrDefault(e.getKey(), Set.of()).isEmpty()) { - builder.putNodeFeatures(e.getKey(), e.getValue()); - modified = true; - } - } - c.success(() -> {}); - } - return modified ? builder.build() : context.initialState(); - } - } - - @Override - public void clusterChanged(ClusterChangedEvent event) { - if (event.nodesDelta().masterNodeChanged() && event.localNodeMaster()) { - /* - * Execute this if we have just become master. - * Check if there are any nodes that should have features in cluster state, but don't. - * This can happen if the master was upgraded from before 8.13, and one or more non-master nodes - * were already upgraded. They don't re-join the cluster with the new master, so never get their features - * (which the master now understands) added to cluster state. - * So we need to do a separate transport call to get the node features and add them to cluster state. - * We can't use features to determine when this should happen, as the features are incorrect. - * We also can't use transport version, as that is unreliable for upgrades - * from versions before 8.8 (see TransportVersionFixupListener). - * So the only thing we can use is release version. - * This is ok here, as Serverless will never hit this case, so the node feature fetch action will never be called on Serverless. - * This whole class will be removed in ES v9. - */ - ClusterFeatures nodeFeatures = event.state().clusterFeatures(); - Set queryNodes = event.state() - .nodes() - .stream() - .filter(n -> n.getVersion().onOrAfter(Version.V_8_15_0)) - .map(DiscoveryNode::getId) - .filter(n -> getNodeFeatures(nodeFeatures, n).isEmpty()) - .collect(Collectors.toSet()); - - if (queryNodes.isEmpty() == false) { - logger.debug("Fetching actual node features for nodes {}", queryNodes); - queryNodesFeatures(queryNodes, 0); - } - } - } - - @SuppressForbidden(reason = "Need to access a specific node's features") - private static Set getNodeFeatures(ClusterFeatures features, String nodeId) { - return features.nodeFeatures().getOrDefault(nodeId, Set.of()); - } - - private void scheduleRetry(Set nodes, int thisRetryNum) { - // just keep retrying until this succeeds - logger.debug("Scheduling retry {} for nodes {}", thisRetryNum + 1, nodes); - scheduler.schedule(() -> queryNodesFeatures(nodes, thisRetryNum + 1), RETRY_TIME, executor); - } - - private void queryNodesFeatures(Set nodes, int retryNum) { - // some might already be in-progress - Set outstandingNodes = Sets.newHashSetWithExpectedSize(nodes.size()); - synchronized (pendingNodes) { - for (String n : nodes) { - if (pendingNodes.add(n)) { - outstandingNodes.add(n); - } - } - } - if (outstandingNodes.isEmpty()) { - // all nodes already have in-progress requests - return; - } - - NodesFeaturesRequest request = new NodesFeaturesRequest(outstandingNodes.toArray(String[]::new)); - client.execute(TransportNodesFeaturesAction.TYPE, request, new ActionListener<>() { - @Override - public void onResponse(NodesFeaturesResponse response) { - pendingNodes.removeAll(outstandingNodes); - handleResponse(response, retryNum); - } - - @Override - public void onFailure(Exception e) { - pendingNodes.removeAll(outstandingNodes); - logger.warn("Could not read features for nodes {}", outstandingNodes, e); - scheduleRetry(outstandingNodes, retryNum); - } - }); - } - - private void handleResponse(NodesFeaturesResponse response, int retryNum) { - if (response.hasFailures()) { - Set failedNodes = new HashSet<>(); - for (FailedNodeException fne : response.failures()) { - logger.warn("Failed to read features from node {}", fne.nodeId(), fne); - failedNodes.add(fne.nodeId()); - } - scheduleRetry(failedNodes, retryNum); - } - // carry on and read what we can - - Map> results = response.getNodes() - .stream() - .collect(Collectors.toUnmodifiableMap(n -> n.getNode().getId(), NodeFeatures::nodeFeatures)); - - if (results.isEmpty() == false) { - taskQueue.submitTask("fix-node-features", new NodesFeaturesTask(results, retryNum), null); - } - } -} diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index c2471a9a6bb2f..caf65c05cf27d 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -42,7 +42,6 @@ import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.MasterHistoryService; import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService; -import org.elasticsearch.cluster.features.NodeFeaturesFixupListener; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings; import org.elasticsearch.cluster.metadata.IndexMetadataVerifier; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -787,7 +786,6 @@ private void construct( if (DiscoveryNode.isMasterNode(settings)) { clusterService.addListener(new SystemIndexMappingUpdateService(systemIndices, client)); - clusterService.addListener(new NodeFeaturesFixupListener(clusterService, client.admin().cluster(), threadPool)); } SourceFieldMetrics sourceFieldMetrics = new SourceFieldMetrics( diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index f8ecdbd062054..725a4583d104a 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -101,7 +101,7 @@ protected SearchPhase getNextPhase() { @Override protected void executePhaseOnShard( final SearchShardIterator shardIt, - final SearchShardTarget shard, + final Transport.Connection shard, final SearchActionListener listener ) {} diff --git a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index 03c5d0a06f6fb..484b3c6b386fd 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -147,7 +147,7 @@ public void executeNextPhase(SearchPhase currentPhase, Supplier nex @Override protected void executePhaseOnShard( SearchShardIterator shardIt, - SearchShardTarget shard, + Transport.Connection shard, SearchActionListener listener ) { onShardResult(new SearchPhaseResult() { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index f655136cd4ba4..b4ddd48172d01 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; -import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.test.ESTestCase; @@ -119,16 +118,15 @@ public void testSkipSearchShards() throws InterruptedException { @Override protected void executePhaseOnShard( SearchShardIterator shardIt, - SearchShardTarget shard, + Transport.Connection connection, SearchActionListener listener ) { - seenShard.computeIfAbsent(shard.getShardId(), (i) -> { + seenShard.computeIfAbsent(shardIt.shardId(), (i) -> { numRequests.incrementAndGet(); // only count this once per replica return Boolean.TRUE; }); new Thread(() -> { - Transport.Connection connection = getConnection(null, shard.getNodeId()); TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult( new ShardSearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode() @@ -227,23 +225,22 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { @Override protected void executePhaseOnShard( SearchShardIterator shardIt, - SearchShardTarget shard, + Transport.Connection connection, SearchActionListener listener ) { - seenShard.computeIfAbsent(shard.getShardId(), (i) -> { + seenShard.computeIfAbsent(shardIt.shardId(), (i) -> { numRequests.incrementAndGet(); // only count this once per shard copy return Boolean.TRUE; }); new Thread(() -> { safeAwait(awaitInitialRequests); - Transport.Connection connection = getConnection(null, shard.getNodeId()); TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult( new ShardSearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode() ); try { - if (shardFailures[shard.getShardId().id()]) { + if (shardFailures[shardIt.shardId().id()]) { listener.onFailure(new RuntimeException()); } else { listener.onResponse(testSearchPhaseResult); @@ -340,11 +337,11 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI @Override protected void executePhaseOnShard( SearchShardIterator shardIt, - SearchShardTarget shard, + Transport.Connection connection, SearchActionListener listener ) { - assertTrue("shard: " + shard.getShardId() + " has been queried twice", testResponse.queried.add(shard.getShardId())); - Transport.Connection connection = getConnection(null, shard.getNodeId()); + var shardId = shardIt.shardId(); + assertTrue("shard: " + shardId + " has been queried twice", testResponse.queried.add(shardId)); TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult( new ShardSearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode() @@ -464,13 +461,13 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI @Override protected void executePhaseOnShard( SearchShardIterator shardIt, - SearchShardTarget shard, + Transport.Connection connection, SearchActionListener listener ) { - assertTrue("shard: " + shard.getShardId() + " has been queried twice", response.queried.add(shard.getShardId())); - Transport.Connection connection = getConnection(null, shard.getNodeId()); + var shardId = shardIt.shardId(); + assertTrue("shard: " + shardId + " has been queried twice", response.queried.add(shardId)); final TestSearchPhaseResult testSearchPhaseResult; - if (shard.getShardId().id() == 0) { + if (shardId.id() == 0) { testSearchPhaseResult = new TestSearchPhaseResult(null, connection.getNode()); } else { testSearchPhaseResult = new TestSearchPhaseResult( @@ -573,15 +570,14 @@ public void testAllowPartialResults() throws InterruptedException { @Override protected void executePhaseOnShard( SearchShardIterator shardIt, - SearchShardTarget shard, + Transport.Connection connection, SearchActionListener listener ) { - seenShard.computeIfAbsent(shard.getShardId(), (i) -> { + seenShard.computeIfAbsent(shardIt.shardId(), (i) -> { numRequests.incrementAndGet(); // only count this once per shard copy return Boolean.TRUE; }); new Thread(() -> { - Transport.Connection connection = getConnection(null, shard.getNodeId()); TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult( new ShardSearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode() @@ -673,7 +669,7 @@ public void testSkipUnavailableSearchShards() throws InterruptedException { @Override protected void executePhaseOnShard( SearchShardIterator shardIt, - SearchShardTarget shard, + Transport.Connection connection, SearchActionListener listener ) { assert false : "Expected to skip all shards"; diff --git a/server/src/test/java/org/elasticsearch/cluster/features/NodeFeaturesFixupListenerTests.java b/server/src/test/java/org/elasticsearch/cluster/features/NodeFeaturesFixupListenerTests.java deleted file mode 100644 index 00cfac7248da6..0000000000000 --- a/server/src/test/java/org/elasticsearch/cluster/features/NodeFeaturesFixupListenerTests.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.cluster.features; - -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.node.features.NodeFeatures; -import org.elasticsearch.action.admin.cluster.node.features.NodesFeaturesRequest; -import org.elasticsearch.action.admin.cluster.node.features.NodesFeaturesResponse; -import org.elasticsearch.action.admin.cluster.node.features.TransportNodesFeaturesAction; -import org.elasticsearch.client.internal.ClusterAdminClient; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.features.NodeFeaturesFixupListener.NodesFeaturesTask; -import org.elasticsearch.cluster.features.NodeFeaturesFixupListener.NodesFeaturesUpdater; -import org.elasticsearch.cluster.node.DiscoveryNodeUtils; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.node.VersionInformation; -import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; -import org.elasticsearch.cluster.service.MasterServiceTaskQueue; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.Scheduler; -import org.mockito.ArgumentCaptor; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executor; - -import static org.elasticsearch.test.LambdaMatchers.transformedMatch; -import static org.hamcrest.Matchers.arrayContainingInAnyOrder; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.same; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.hamcrest.MockitoHamcrest.argThat; - -public class NodeFeaturesFixupListenerTests extends ESTestCase { - - @SuppressWarnings("unchecked") - private static MasterServiceTaskQueue newMockTaskQueue() { - return mock(MasterServiceTaskQueue.class); - } - - private static DiscoveryNodes nodes(Version... versions) { - var builder = DiscoveryNodes.builder(); - for (int i = 0; i < versions.length; i++) { - builder.add(DiscoveryNodeUtils.create("node" + i, new TransportAddress(TransportAddress.META_ADDRESS, 9200 + i), versions[i])); - } - builder.localNodeId("node0").masterNodeId("node0"); - return builder.build(); - } - - private static DiscoveryNodes nodes(VersionInformation... versions) { - var builder = DiscoveryNodes.builder(); - for (int i = 0; i < versions.length; i++) { - builder.add( - DiscoveryNodeUtils.builder("node" + i) - .address(new TransportAddress(TransportAddress.META_ADDRESS, 9200 + i)) - .version(versions[i]) - .build() - ); - } - builder.localNodeId("node0").masterNodeId("node0"); - return builder.build(); - } - - @SafeVarargs - private static Map> features(Set... nodeFeatures) { - Map> features = new HashMap<>(); - for (int i = 0; i < nodeFeatures.length; i++) { - features.put("node" + i, nodeFeatures[i]); - } - return features; - } - - private static NodesFeaturesResponse getResponse(Map> responseData) { - return new NodesFeaturesResponse( - ClusterName.DEFAULT, - responseData.entrySet() - .stream() - .map( - e -> new NodeFeatures( - e.getValue(), - DiscoveryNodeUtils.create(e.getKey(), new TransportAddress(TransportAddress.META_ADDRESS, 9200)) - ) - ) - .toList(), - List.of() - ); - } - - public void testNothingDoneWhenNothingToFix() { - MasterServiceTaskQueue taskQueue = newMockTaskQueue(); - ClusterAdminClient client = mock(ClusterAdminClient.class); - - ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE) - .nodes(nodes(Version.CURRENT, Version.CURRENT)) - .nodeFeatures(features(Set.of("f1", "f2"), Set.of("f1", "f2"))) - .build(); - - NodeFeaturesFixupListener listener = new NodeFeaturesFixupListener(taskQueue, client, null, null); - listener.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE)); - - verify(taskQueue, never()).submitTask(anyString(), any(), any()); - } - - public void testFeaturesFixedAfterNewMaster() throws Exception { - MasterServiceTaskQueue taskQueue = newMockTaskQueue(); - ClusterAdminClient client = mock(ClusterAdminClient.class); - Set features = Set.of("f1", "f2"); - - ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE) - .nodes(nodes(Version.CURRENT, Version.CURRENT, Version.CURRENT)) - .nodeFeatures(features(features, Set.of(), Set.of())) - .build(); - - ArgumentCaptor> action = ArgumentCaptor.captor(); - ArgumentCaptor task = ArgumentCaptor.captor(); - - NodeFeaturesFixupListener listener = new NodeFeaturesFixupListener(taskQueue, client, null, null); - listener.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE)); - verify(client).execute( - eq(TransportNodesFeaturesAction.TYPE), - argThat(transformedMatch(NodesFeaturesRequest::nodesIds, arrayContainingInAnyOrder("node1", "node2"))), - action.capture() - ); - - action.getValue().onResponse(getResponse(Map.of("node1", features, "node2", features))); - verify(taskQueue).submitTask(anyString(), task.capture(), any()); - - ClusterState newState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( - testState, - new NodesFeaturesUpdater(), - List.of(task.getValue()) - ); - - assertThat(newState.clusterFeatures().allNodeFeatures(), containsInAnyOrder("f1", "f2")); - } - - public void testFeaturesFetchedOnlyForUpdatedNodes() { - MasterServiceTaskQueue taskQueue = newMockTaskQueue(); - ClusterAdminClient client = mock(ClusterAdminClient.class); - - ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE) - .nodes( - nodes( - VersionInformation.CURRENT, - VersionInformation.CURRENT, - new VersionInformation(Version.V_8_12_0, IndexVersion.current(), IndexVersion.current()) - ) - ) - .nodeFeatures(features(Set.of("f1", "f2"), Set.of(), Set.of())) - .build(); - - ArgumentCaptor> action = ArgumentCaptor.captor(); - - NodeFeaturesFixupListener listener = new NodeFeaturesFixupListener(taskQueue, client, null, null); - listener.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE)); - verify(client).execute( - eq(TransportNodesFeaturesAction.TYPE), - argThat(transformedMatch(NodesFeaturesRequest::nodesIds, arrayContainingInAnyOrder("node1"))), - action.capture() - ); - } - - public void testConcurrentChangesDoNotOverlap() { - MasterServiceTaskQueue taskQueue = newMockTaskQueue(); - ClusterAdminClient client = mock(ClusterAdminClient.class); - Set features = Set.of("f1", "f2"); - - ClusterState testState1 = ClusterState.builder(ClusterState.EMPTY_STATE) - .nodes(nodes(Version.CURRENT, Version.CURRENT, Version.CURRENT)) - .nodeFeatures(features(features, Set.of(), Set.of())) - .build(); - - NodeFeaturesFixupListener listeners = new NodeFeaturesFixupListener(taskQueue, client, null, null); - listeners.clusterChanged(new ClusterChangedEvent("test", testState1, ClusterState.EMPTY_STATE)); - verify(client).execute( - eq(TransportNodesFeaturesAction.TYPE), - argThat(transformedMatch(NodesFeaturesRequest::nodesIds, arrayContainingInAnyOrder("node1", "node2"))), - any() - ); - // don't send back the response yet - - ClusterState testState2 = ClusterState.builder(ClusterState.EMPTY_STATE) - .nodes(nodes(Version.CURRENT, Version.CURRENT, Version.CURRENT)) - .nodeFeatures(features(features, features, Set.of())) - .build(); - // should not send any requests - listeners.clusterChanged(new ClusterChangedEvent("test", testState2, testState1)); - verifyNoMoreInteractions(client); - } - - public void testFailedRequestsAreRetried() { - MasterServiceTaskQueue taskQueue = newMockTaskQueue(); - ClusterAdminClient client = mock(ClusterAdminClient.class); - Scheduler scheduler = mock(Scheduler.class); - Executor executor = mock(Executor.class); - Set features = Set.of("f1", "f2"); - - ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE) - .nodes(nodes(Version.CURRENT, Version.CURRENT, Version.CURRENT)) - .nodeFeatures(features(features, Set.of(), Set.of())) - .build(); - - ArgumentCaptor> action = ArgumentCaptor.captor(); - ArgumentCaptor retry = ArgumentCaptor.forClass(Runnable.class); - - NodeFeaturesFixupListener listener = new NodeFeaturesFixupListener(taskQueue, client, scheduler, executor); - listener.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE)); - verify(client).execute( - eq(TransportNodesFeaturesAction.TYPE), - argThat(transformedMatch(NodesFeaturesRequest::nodesIds, arrayContainingInAnyOrder("node1", "node2"))), - action.capture() - ); - - action.getValue().onFailure(new RuntimeException("failure")); - verify(scheduler).schedule(retry.capture(), any(), same(executor)); - - // running the retry should cause another call - retry.getValue().run(); - verify(client, times(2)).execute( - eq(TransportNodesFeaturesAction.TYPE), - argThat(transformedMatch(NodesFeaturesRequest::nodesIds, arrayContainingInAnyOrder("node1", "node2"))), - action.capture() - ); - } -}