From d0295d6710c5e91d391574d020a86d2e69f057b5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 26 Feb 2022 10:58:51 -0500 Subject: [PATCH] Group field-caps node requests by index mapping hash --- .../search/fieldcaps/FieldCapabilitiesIT.java | 52 +- .../action/fieldcaps/RequestDispatcher.java | 165 +++--- .../fieldcaps/RequestDispatcherTests.java | 468 +++++++++++++----- 3 files changed, 494 insertions(+), 191 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java index a52940ae9a413..978d3b5341c05 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java @@ -56,8 +56,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.IntStream; import static java.util.Collections.singletonList; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -65,6 +67,7 @@ import static org.hamcrest.Matchers.array; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; @@ -470,10 +473,18 @@ public void testNoActiveCopy() throws Exception { .put("index.routing.allocation.require._id", "unknown") ).setWaitForActiveShards(ActiveShardCount.NONE).setMapping("timestamp", "type=date", "field1", "type=keyword") ); + assertAcked( + prepareCreate("log-index-another").setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.require._id", "unknown") + ).setWaitForActiveShards(ActiveShardCount.NONE).setMapping("timestamp", "type=date", "another_field", "type=keyword") + ); { final ElasticsearchException ex = expectThrows( ElasticsearchException.class, - () -> client().prepareFieldCaps("log-index-*").setFields("*").get() + () -> client().prepareFieldCaps("log-index-in*").setFields("*").get() ); assertThat(ex.getMessage(), equalTo("index [log-index-inactive] has no active shard copy")); } @@ -486,15 +497,48 @@ public void testNoActiveCopy() throws Exception { request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01")); } final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet(); - assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2")); + // log-index-inactive can be resolved by either log-index-1 + assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2", "log-index-inactive")); assertThat(response.getField("field1"), aMapWithSize(2)); assertThat(response.getField("field1"), hasKey("long")); assertThat(response.getField("field1"), hasKey("long")); assertThat(response.getFailures(), hasSize(1)); final FieldCapabilitiesFailure failure = response.getFailures().get(0); - assertThat(failure.getIndices(), arrayContainingInAnyOrder("log-index-inactive")); - assertThat(failure.getException().getMessage(), equalTo("index [log-index-inactive] has no active shard copy")); + assertThat(failure.getIndices(), arrayContainingInAnyOrder("log-index-another")); + assertThat(failure.getException().getMessage(), equalTo("index [log-index-another] has no active shard copy")); + } + } + + public void testSingleNodeRequest() { + String[] indices = IntStream.range(0, randomIntBetween(1, 5)).mapToObj(n -> "event_index_" + n).toArray(String[]::new); + for (String index : indices) { + assertAcked(prepareCreate(index).setMapping("timestamp", "type=date", "message", "type=text")); + } + FieldCapabilitiesRequest fieldCapRequest = new FieldCapabilitiesRequest(); + fieldCapRequest.indices("event_index_*"); + fieldCapRequest.fields("*"); + + AtomicInteger receivedRequests = new AtomicInteger(); + for (String node : internalCluster().getNodeNames()) { + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node); + transportService.addRequestHandlingBehavior( + TransportFieldCapabilitiesAction.ACTION_NODE_NAME, + (handler, request, channel, task) -> { + receivedRequests.incrementAndGet(); + handler.messageReceived(request, channel, task); + } + ); + } + final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, fieldCapRequest).actionGet(); + assertThat(response.getIndices(), equalTo(indices)); + assertThat(response.getField("message"), aMapWithSize(1)); + assertThat(response.getField("message"), hasKey("text")); + assertThat(response.getFailures(), empty()); + assertThat(receivedRequests.get(), equalTo(1)); + for (String node : internalCluster().getNodeNames()) { + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node); + transportService.clearAllRules(); } } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java index f9d5cff2471b4..fa461b32a45c5 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; @@ -34,11 +35,13 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -56,7 +59,7 @@ final class RequestDispatcher { private final OriginalIndices originalIndices; private final long nowInMillis; - private final boolean hasFilter; + private final boolean withIndexFilter; private final Executor executor; private final Consumer onIndexResponse; private final BiConsumer onIndexFailure; @@ -64,7 +67,7 @@ final class RequestDispatcher { private final AtomicInteger pendingRequests = new AtomicInteger(); private final AtomicInteger executionRound = new AtomicInteger(); - private final Map indexSelectors; + private final Map groups = ConcurrentCollections.newConcurrentMap(); RequestDispatcher( ClusterService clusterService, @@ -85,20 +88,25 @@ final class RequestDispatcher { this.originalIndices = originalIndices; this.nowInMillis = nowInMillis; this.clusterState = clusterService.state(); - this.hasFilter = fieldCapsRequest.indexFilter() != null && fieldCapsRequest.indexFilter() instanceof MatchAllQueryBuilder == false; + this.withIndexFilter = fieldCapsRequest.indexFilter() != null + && fieldCapsRequest.indexFilter() instanceof MatchAllQueryBuilder == false; this.executor = executor; this.onIndexResponse = onIndexResponse; this.onIndexFailure = onIndexFailure; this.onComplete = new RunOnce(onComplete); - this.indexSelectors = ConcurrentCollections.newConcurrentMap(); - for (String index : indices) { - final GroupShardsIterator shardIts = clusterService.operationRouting() - .searchShards(clusterState, new String[] { index }, null, null, null, null); - final IndexSelector indexResult = new IndexSelector(shardIts); - if (indexResult.nodeToShards.isEmpty()) { - onIndexFailure.accept(index, new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy")); + final List groupedIndices = groupIndicesByMappingHash(clusterService, clusterState, withIndexFilter, indices); + for (Group group : groupedIndices) { + if (group.nodeToShards.isEmpty()) { + for (String index : group.indices) { + onIndexFailure.accept( + index, + new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy") + ); + } } else { - this.indexSelectors.put(index, indexResult); + for (String index : group.indices) { + this.groups.put(index, group); + } } } } @@ -108,11 +116,10 @@ void execute() { @Override public void onFailure(Exception e) { // If we get rejected, mark pending indices as failed and complete - final List failedIndices = new ArrayList<>(indexSelectors.keySet()); - for (String failedIndex : failedIndices) { - final IndexSelector removed = indexSelectors.remove(failedIndex); - assert removed != null; - onIndexFailure.accept(failedIndex, e); + for (Group g : groups.values()) { + if (g.completed.compareAndSet(false, true)) { + g.indices.forEach(index -> onIndexFailure.accept(index, e)); + } } onComplete.run(); } @@ -127,31 +134,27 @@ protected void doRun() { private void innerExecute() { final Map> nodeToSelectedShards = new HashMap<>(); assert pendingRequests.get() == 0 : "pending requests = " + pendingRequests; - final List failedIndices = new ArrayList<>(); - for (Map.Entry e : indexSelectors.entrySet()) { - final String index = e.getKey(); - final IndexSelector indexSelector = e.getValue(); - final List selectedShards = indexSelector.nextTarget(hasFilter); - if (selectedShards.isEmpty()) { - failedIndices.add(index); - } else { - pendingRequests.addAndGet(selectedShards.size()); - for (ShardRouting shard : selectedShards) { - nodeToSelectedShards.computeIfAbsent(shard.currentNodeId(), n -> new ArrayList<>()).add(shard.shardId()); + groups.values().removeIf(g -> g.completed.get()); + final Set visited = Collections.newSetFromMap(new IdentityHashMap<>()); + for (Group group : groups.values()) { + if (visited.add(group)) { + final List selectedShards = group.nextTarget(withIndexFilter); + if (selectedShards.isEmpty()) { + if (group.completed.compareAndSet(false, true)) { + group.getFailures().forEach(onIndexFailure); + } + } else { + for (ShardRouting shard : selectedShards) { + nodeToSelectedShards.computeIfAbsent(shard.currentNodeId(), n -> new ArrayList<>()).add(shard.shardId()); + } } } } - for (String failedIndex : failedIndices) { - final IndexSelector indexSelector = indexSelectors.remove(failedIndex); - assert indexSelector != null; - final Exception failure = indexSelector.getFailure(); - if (failure != null) { - onIndexFailure.accept(failedIndex, failure); - } - } if (nodeToSelectedShards.isEmpty()) { + assert groups.values().stream().allMatch(g -> g.completed.get()) : "Some groups aren't completed yet"; onComplete.run(); } else { + pendingRequests.addAndGet(nodeToSelectedShards.size()); for (Map.Entry> e : nodeToSelectedShards.entrySet()) { sendRequestToNode(e.getKey(), e.getValue()); } @@ -168,7 +171,7 @@ private void sendRequestToNode(String nodeId, List shardIds) { assert node != null; LOGGER.debug("round {} sends field caps node request to node {} for shardIds {}", executionRound, node, shardIds); final ActionListener listener = ActionListener.wrap( - r -> onRequestResponse(shardIds, r), + this::onRequestResponse, failure -> onRequestFailure(shardIds, failure) ); final FieldCapabilitiesNodeRequest nodeRequest = new FieldCapabilitiesNodeRequest( @@ -191,8 +194,8 @@ private void sendRequestToNode(String nodeId, List shardIds) { ); } - private void afterRequestsCompleted(int numRequests) { - if (pendingRequests.addAndGet(-numRequests) == 0) { + private void afterRequestsCompleted() { + if (pendingRequests.decrementAndGet() == 0) { // Here we only retry after all pending requests have responded to avoid exploding network requests // when the cluster is unstable or overloaded as an eager retry approach can add more load to the cluster. executionRound.incrementAndGet(); @@ -200,45 +203,89 @@ private void afterRequestsCompleted(int numRequests) { } } - private void onRequestResponse(List shardIds, FieldCapabilitiesNodeResponse nodeResponse) { + private void onRequestResponse(FieldCapabilitiesNodeResponse nodeResponse) { for (FieldCapabilitiesIndexResponse indexResponse : nodeResponse.getIndexResponses()) { if (indexResponse.canMatch()) { - if (indexSelectors.remove(indexResponse.getIndexName()) != null) { - onIndexResponse.accept(indexResponse); + final Group group = groups.remove(indexResponse.getIndexName()); + if (group == null) { + continue; + } + if (group.completed.compareAndSet(false, true)) { + final String mappingHash = group.mappingHash != null ? group.mappingHash : indexResponse.getIndexMappingHash(); + for (String index : group.indices) { + onIndexResponse.accept(new FieldCapabilitiesIndexResponse(index, mappingHash, indexResponse.get(), true)); + } } } } for (ShardId unmatchedShardId : nodeResponse.getUnmatchedShardIds()) { - final IndexSelector indexSelector = indexSelectors.get(unmatchedShardId.getIndexName()); - if (indexSelector != null) { - indexSelector.addUnmatchedShardId(unmatchedShardId); + final Group group = groups.get(unmatchedShardId.getIndexName()); + if (group != null) { + group.addUnmatchedShardId(unmatchedShardId); } } for (Map.Entry e : nodeResponse.getFailures().entrySet()) { - final IndexSelector indexSelector = indexSelectors.get(e.getKey().getIndexName()); - if (indexSelector != null) { - indexSelector.setFailure(e.getKey(), e.getValue()); + final Group group = groups.get(e.getKey().getIndexName()); + if (group != null) { + group.setFailure(e.getKey(), e.getValue()); } } - afterRequestsCompleted(shardIds.size()); + afterRequestsCompleted(); } private void onRequestFailure(List shardIds, Exception e) { for (ShardId shardId : shardIds) { - final IndexSelector indexSelector = indexSelectors.get(shardId.getIndexName()); - if (indexSelector != null) { - indexSelector.setFailure(shardId, e); + final Group group = groups.get(shardId.getIndexName()); + if (group != null) { + group.setFailure(shardId, e); } } - afterRequestsCompleted(shardIds.size()); + afterRequestsCompleted(); + } + + private List groupIndicesByMappingHash( + ClusterService clusterService, + ClusterState clusterState, + boolean withIndexFilter, + String[] indices + ) { + final Map> withMappingHashes = new HashMap<>(); + final List groups = new ArrayList<>(); + for (String index : indices) { + final IndexMetadata indexMetadata = clusterState.metadata().index(index); + if (withIndexFilter == false + && indexMetadata != null + && indexMetadata.mapping() != null + && indexMetadata.mapping().getSha256() != null) { + withMappingHashes.computeIfAbsent(indexMetadata.mapping().getSha256(), k -> new ArrayList<>()).add(index); + } else { + final GroupShardsIterator shardIts = clusterService.operationRouting() + .searchShards(clusterState, new String[] { index }, null, null, null, null); + groups.add(new Group(List.of(index), null, shardIts)); + } + } + for (Map.Entry> e : withMappingHashes.entrySet()) { + final GroupShardsIterator shardIts = clusterService.operationRouting() + .searchShards(clusterState, e.getValue().toArray(String[]::new), null, null, null, null); + groups.add(new Group(e.getValue(), e.getKey(), shardIts)); + } + return groups; } - private static class IndexSelector { + /** + * A group of indices that have the same mapping hash + */ + private static class Group { + private final List indices; + private final String mappingHash; private final Map> nodeToShards = new HashMap<>(); private final Set unmatchedShardIds = new HashSet<>(); private final Map failures = new HashMap<>(); + private final AtomicBoolean completed = new AtomicBoolean(); - IndexSelector(GroupShardsIterator shardIts) { + Group(List indices, String mappingHash, GroupShardsIterator shardIts) { + this.indices = indices; + this.mappingHash = mappingHash; for (ShardIterator shardIt : shardIts) { for (ShardRouting shard : shardIt) { nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard); @@ -246,12 +293,12 @@ private static class IndexSelector { } } - synchronized Exception getFailure() { - Exception first = null; - for (Exception e : failures.values()) { - first = ExceptionsHelper.useOrSuppress(first, e); + synchronized Map getFailures() { + final Map perIndex = new HashMap<>(); + for (Map.Entry e : failures.entrySet()) { + perIndex.compute(e.getKey().getIndexName(), (unused, curr) -> ExceptionsHelper.useOrSuppress(curr, e.getValue())); } - return first; + return perIndex; } synchronized void setFailure(ShardId shardId, Exception failure) { diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java index db53854cf9667..cb1e1dda94f33 100644 --- a/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -46,6 +47,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.shard.ShardId; @@ -80,7 +82,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import java.util.stream.IntStream; import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.hamcrest.Matchers.anEmptyMap; @@ -103,34 +104,17 @@ public class RequestDispatcherTests extends ESAllocationTestCase { static final Logger logger = LogManager.getLogger(RequestDispatcherTests.class); public void testHappyCluster() throws Exception { - final List allIndices = IntStream.rangeClosed(1, 5).mapToObj(n -> "index_" + n).toList(); - final ClusterState clusterState; - { - DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); - int numNodes = randomIntBetween(1, 10); - for (int i = 0; i < numNodes; i++) { - discoNodes.add(newNode("node_" + i, randomVersion(random()))); - } - Metadata.Builder metadata = Metadata.builder(); - for (String index : allIndices) { - final Settings.Builder settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 10)) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 2)) - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion()); - metadata.put(IndexMetadata.builder(index).settings(settings)); - } - clusterState = newClusterState(metadata.build(), discoNodes.build()); - } + final boolean withIndexFilter = randomBoolean(); + final ClusterState clusterState = randomClusterState(withIndexFilter && randomBoolean(), 1, 0); try (TestTransportService transportService = TestTransportService.newTestTransportService()) { - final List indices = randomSubsetOf(between(1, allIndices.size()), allIndices); + final List indices = randomIndices(clusterState); logger.debug("--> test with indices {}", indices); - final boolean withFilter = randomBoolean(); final ResponseCollector responseCollector = new ResponseCollector(); final RequestDispatcher dispatcher = new RequestDispatcher( mockClusterService(clusterState), transportService, newRandomParentTask(), - randomFieldCapRequest(withFilter), + randomFieldCapRequest(withIndexFilter), OriginalIndices.NONE, randomNonNegativeLong(), indices.toArray(new String[0]), @@ -139,7 +123,7 @@ public void testHappyCluster() throws Exception { responseCollector::addIndexFailure, responseCollector::onComplete ); - final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withFilter); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withIndexFilter); transportService.requestTracker.set(requestTracker); dispatcher.execute(); responseCollector.awaitCompletion(); @@ -151,7 +135,7 @@ public void testHappyCluster() throws Exception { } for (String index : indices) { final List nodeRequests = requestTracker.nodeRequests(index); - if (withFilter) { + if (withIndexFilter) { Set requestedShardIds = new HashSet<>(); for (NodeRequest nodeRequest : nodeRequests) { for (ShardId shardId : nodeRequest.requestedShardIds(index)) { @@ -174,34 +158,17 @@ public void testHappyCluster() throws Exception { } public void testRetryThenOk() throws Exception { - final List allIndices = IntStream.rangeClosed(1, 5).mapToObj(n -> "index_" + n).toList(); - final ClusterState clusterState; - { - DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); - int numNodes = randomIntBetween(2, 10); - for (int i = 0; i < numNodes; i++) { - discoNodes.add(newNode("node_" + i, randomVersion(random()))); - } - Metadata.Builder metadata = Metadata.builder(); - for (String index : allIndices) { - final Settings.Builder settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 10)) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(1, 3)) - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion()); - metadata.put(IndexMetadata.builder(index).settings(settings)); - } - clusterState = newClusterState(metadata.build(), discoNodes.build()); - } + final boolean withIndexFilter = randomBoolean(); + final ClusterState clusterState = randomClusterState(withIndexFilter && randomBoolean(), 1, 1); try (TestTransportService transportService = TestTransportService.newTestTransportService()) { - final List indices = randomSubsetOf(between(1, allIndices.size()), allIndices); + final List indices = randomIndices(clusterState); logger.debug("--> test with indices {}", indices); - final boolean withFilter = randomBoolean(); final ResponseCollector responseCollector = new ResponseCollector(); final RequestDispatcher dispatcher = new RequestDispatcher( mockClusterService(clusterState), transportService, newRandomParentTask(), - randomFieldCapRequest(withFilter), + randomFieldCapRequest(withIndexFilter), OriginalIndices.NONE, randomNonNegativeLong(), indices.toArray(new String[0]), @@ -210,12 +177,12 @@ public void testRetryThenOk() throws Exception { responseCollector::addIndexFailure, responseCollector::onComplete ); - final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withFilter); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withIndexFilter); transportService.requestTracker.set(requestTracker); final Map maxFailedRounds = new HashMap<>(); for (String index : randomSubsetOf(between(1, indices.size()), indices)) { - maxFailedRounds.put(index, randomIntBetween(1, maxPossibleRounds(clusterState, index, withFilter) - 1)); + maxFailedRounds.put(index, randomIntBetween(1, maxPossibleRounds(clusterState, index, withIndexFilter) - 1)); } final AtomicInteger failedTimes = new AtomicInteger(); @@ -260,7 +227,7 @@ public void sendRequest( int maxRound = maxFailedRounds.values().stream().mapToInt(n -> n).max().getAsInt(); assertThat(dispatcher.executionRound(), equalTo(maxRound + 1)); for (String index : indices) { - if (withFilter) { + if (withIndexFilter) { ObjectIntMap copies = new ObjectIntHashMap<>(); for (ShardRouting shardRouting : clusterState.routingTable().index(index).randomAllActiveShardsIt()) { copies.addTo(shardRouting.shardId(), 1); @@ -296,34 +263,17 @@ public void sendRequest( } public void testRetryButFails() throws Exception { - final List allIndices = IntStream.rangeClosed(1, 5).mapToObj(n -> "index_" + n).toList(); - final ClusterState clusterState; - { - DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); - int numNodes = randomIntBetween(1, 10); - for (int i = 0; i < numNodes; i++) { - discoNodes.add(newNode("node_" + i, randomVersion(random()))); - } - Metadata.Builder metadata = Metadata.builder(); - for (String index : allIndices) { - final Settings.Builder settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 10)) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 3)) - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion()); - metadata.put(IndexMetadata.builder(index).settings(settings)); - } - clusterState = newClusterState(metadata.build(), discoNodes.build()); - } + final boolean withIndexFilter = randomBoolean(); + final ClusterState clusterState = randomClusterState(withIndexFilter && randomBoolean(), 1, 1); try (TestTransportService transportService = TestTransportService.newTestTransportService()) { - final List indices = randomSubsetOf(between(1, allIndices.size()), allIndices); + final List indices = randomIndices(clusterState); logger.debug("--> test with indices {}", indices); - final boolean withFilter = randomBoolean(); final ResponseCollector responseCollector = new ResponseCollector(); final RequestDispatcher dispatcher = new RequestDispatcher( mockClusterService(clusterState), transportService, newRandomParentTask(), - randomFieldCapRequest(withFilter), + randomFieldCapRequest(withIndexFilter), OriginalIndices.NONE, randomNonNegativeLong(), indices.toArray(new String[0]), @@ -332,7 +282,7 @@ public void testRetryButFails() throws Exception { responseCollector::addIndexFailure, responseCollector::onComplete ); - final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withFilter); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withIndexFilter); transportService.requestTracker.set(requestTracker); List failedIndices = randomSubsetOf(between(1, indices.size()), indices); @@ -377,10 +327,13 @@ public void sendRequest( ); assertThat(responseCollector.failures.keySet(), equalTo(Sets.newHashSet(failedIndices))); - int maxRound = failedIndices.stream().mapToInt(index -> maxPossibleRounds(clusterState, index, withFilter)).max().getAsInt(); + int maxRound = failedIndices.stream() + .mapToInt(index -> maxPossibleRounds(clusterState, index, withIndexFilter)) + .max() + .getAsInt(); assertThat(dispatcher.executionRound(), equalTo(maxRound)); for (String index : indices) { - if (withFilter) { + if (withIndexFilter) { ObjectIntMap copies = new ObjectIntHashMap<>(); for (ShardRouting shardRouting : clusterState.routingTable().index(index).randomAllActiveShardsIt()) { copies.addTo(shardRouting.shardId(), 1); @@ -420,34 +373,16 @@ public void sendRequest( } public void testSuccessWithAnyMatch() throws Exception { - final List allIndices = IntStream.rangeClosed(1, 5).mapToObj(n -> "index_" + n).toList(); - final ClusterState clusterState; - { - DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); - int numNodes = randomIntBetween(1, 10); - for (int i = 0; i < numNodes; i++) { - discoNodes.add(newNode("node_" + i, randomVersion(random()))); - } - Metadata.Builder metadata = Metadata.builder(); - for (String index : allIndices) { - final Settings.Builder settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(2, 10)) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 2)) - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion()); - metadata.put(IndexMetadata.builder(index).settings(settings)); - } - clusterState = newClusterState(metadata.build(), discoNodes.build()); - } + final ClusterState clusterState = randomClusterState(randomBoolean(), 2, 0); try (TestTransportService transportService = TestTransportService.newTestTransportService()) { - final List indices = randomSubsetOf(between(1, allIndices.size()), allIndices); + final List indices = randomIndices(clusterState); logger.debug("--> test with indices {}", indices); - final boolean withFilter = true; final ResponseCollector responseCollector = new ResponseCollector(); final RequestDispatcher dispatcher = new RequestDispatcher( mockClusterService(clusterState), transportService, newRandomParentTask(), - randomFieldCapRequest(withFilter), + randomFieldCapRequest(true), OriginalIndices.NONE, randomNonNegativeLong(), indices.toArray(new String[0]), @@ -456,9 +391,8 @@ public void testSuccessWithAnyMatch() throws Exception { responseCollector::addIndexFailure, responseCollector::onComplete ); - final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withFilter); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), true); transportService.requestTracker.set(requestTracker); - final AtomicInteger failedTimes = new AtomicInteger(); final Set allUnmatchedShardIds = new HashSet<>(); for (String index : indices) { final Set shardIds = new HashSet<>(); @@ -519,35 +453,16 @@ public void sendRequest( } public void testStopAfterAllShardsUnmatched() throws Exception { - final List allIndices = IntStream.rangeClosed(1, 5).mapToObj(n -> "index_" + n).toList(); - final ClusterState clusterState; - final boolean newVersionOnly = randomBoolean(); - { - DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); - int numNodes = randomIntBetween(1, 10); - for (int i = 0; i < numNodes; i++) { - discoNodes.add(newNode("node_" + i, randomVersion(random()))); - } - Metadata.Builder metadata = Metadata.builder(); - for (String index : allIndices) { - final Settings.Builder settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 10)) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 2)) - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion()); - metadata.put(IndexMetadata.builder(index).settings(settings)); - } - clusterState = newClusterState(metadata.build(), discoNodes.build()); - } + final ClusterState clusterState = randomClusterState(randomBoolean(), 1, 1); try (TestTransportService transportService = TestTransportService.newTestTransportService()) { - final List indices = randomSubsetOf(between(1, allIndices.size()), allIndices); + final List indices = randomIndices(clusterState); logger.debug("--> test with indices {}", indices); - final boolean withFilter = true; final ResponseCollector responseCollector = new ResponseCollector(); final RequestDispatcher dispatcher = new RequestDispatcher( mockClusterService(clusterState), transportService, newRandomParentTask(), - randomFieldCapRequest(withFilter), + randomFieldCapRequest(true), OriginalIndices.NONE, randomNonNegativeLong(), indices.toArray(new String[0]), @@ -556,9 +471,8 @@ public void testStopAfterAllShardsUnmatched() throws Exception { responseCollector::addIndexFailure, responseCollector::onComplete ); - final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withFilter); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), true); transportService.requestTracker.set(requestTracker); - final AtomicInteger failedTimes = new AtomicInteger(); final List unmatchedIndices = randomSubsetOf(between(1, indices.size()), indices); transportService.setTransportInterceptor(new TransportInterceptor.AsyncSender() { @Override @@ -613,6 +527,210 @@ public void sendRequest( } } + public void testSingleRoundWithGroup() throws Exception { + final ClusterState clusterState = randomClusterState(true, 1, 0); + try (TestTransportService transportService = TestTransportService.newTestTransportService()) { + final List testGroups = randomSubsetOf(between(1, INDEX_GROUPS.size()), INDEX_GROUPS); + final List testIndices = clusterState.metadata().indices().keySet().stream().filter(index -> { + String g = getIndexGroup(index); + return g != null && testGroups.contains(g); + }).toList(); + logger.debug("--> test with indices {}", testIndices); + final ResponseCollector responseCollector = new ResponseCollector(); + final RequestDispatcher dispatcher = new RequestDispatcher( + mockClusterService(clusterState), + transportService, + newRandomParentTask(), + randomFieldCapRequest(false), + OriginalIndices.NONE, + randomNonNegativeLong(), + testIndices.toArray(new String[0]), + transportService.threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION), + responseCollector::addIndexResponse, + responseCollector::addIndexFailure, + responseCollector::onComplete + ); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), true); + transportService.requestTracker.set(requestTracker); + dispatcher.execute(); + responseCollector.awaitCompletion(); + assertThat(responseCollector.responses.keySet(), equalTo(Sets.newHashSet(testIndices))); + assertThat(responseCollector.failures, anEmptyMap()); + assertThat("Happy case should complete after one round", dispatcher.executionRound(), equalTo(1)); + for (NodeRequest nodeRequest : requestTracker.sentNodeRequests) { + assertThat("All requests occur in round 0", nodeRequest.round, equalTo(0)); + } + for (String group : testGroups) { + Set requests = requestTracker.nodeRequestsPerGroup(group); + assertThat("Group sent more than one node request", requests, hasSize(1)); + } + } + } + + public void testGroupRetryAndOk() throws Exception { + final ClusterState clusterState = randomClusterState(true, 1, 0); + try (TestTransportService transportService = TestTransportService.newTestTransportService()) { + final List testGroups = randomSubsetOf(between(1, INDEX_GROUPS.size()), INDEX_GROUPS); + final List testIndices = clusterState.metadata().indices().keySet().stream().filter(index -> { + String g = getIndexGroup(index); + return g != null && testGroups.contains(g); + }).toList(); + logger.debug("--> test with indices {}", testIndices); + final ResponseCollector responseCollector = new ResponseCollector(); + final RequestDispatcher dispatcher = new RequestDispatcher( + mockClusterService(clusterState), + transportService, + newRandomParentTask(), + randomFieldCapRequest(false), + OriginalIndices.NONE, + randomNonNegativeLong(), + testIndices.toArray(new String[0]), + transportService.threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION), + responseCollector::addIndexResponse, + responseCollector::addIndexFailure, + responseCollector::onComplete + ); + final Map toFailRounds = new HashMap<>(); + for (String group : randomSubsetOf(between(1, testGroups.size()), testGroups)) { + toFailRounds.put(group, randomIntBetween(1, assignedNodes(clusterState, group).size() - 1)); + } + transportService.setTransportInterceptor(new TransportInterceptor.AsyncSender() { + @Override + public void sendRequest( + Transport.Connection connection, + String action, + TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler handler + ) { + final int currentRound = dispatcher.executionRound(); + FieldCapabilitiesNodeRequest nodeRequest = (FieldCapabilitiesNodeRequest) request; + Set requestedGroups = nodeRequest.shardIds() + .stream() + .map(shr -> getIndexGroup(shr.getIndexName())) + .collect(Collectors.toSet()); + if (currentRound > 0) { + assertThat( + "Only failed groups are retried after the first found", + requestedGroups, + everyItem(in(toFailRounds.keySet())) + ); + } + Set successIndices = new HashSet<>(); + List failedShards = new ArrayList<>(); + for (ShardId shardId : nodeRequest.shardIds()) { + final Integer maxRound = toFailRounds.get(getIndexGroup(shardId.getIndexName())); + if (maxRound == null || currentRound >= maxRound) { + successIndices.add(shardId.getIndexName()); + } else { + failedShards.add(shardId); + } + } + transportService.sendResponse(handler, randomNodeResponse(successIndices, failedShards, Collections.emptySet())); + } + }); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), true); + transportService.requestTracker.set(requestTracker); + dispatcher.execute(); + responseCollector.awaitCompletion(); + assertThat(responseCollector.responses.keySet(), equalTo(Sets.newHashSet(testIndices))); + assertThat(responseCollector.failures, anEmptyMap()); + int maxRound = toFailRounds.values().stream().mapToInt(n -> n).max().orElseThrow(); + assertThat(dispatcher.executionRound(), equalTo(maxRound + 1)); + for (String group : testGroups) { + int expectedRequests = toFailRounds.getOrDefault(group, 0) + 1; + assertThat(requestTracker.nodeRequestsPerGroup(group), hasSize(expectedRequests)); + } + } + } + + public void testGroupRetryButFail() throws Exception { + final ClusterState clusterState = randomClusterState(true, 1, 0); + try (TestTransportService transportService = TestTransportService.newTestTransportService()) { + final List testGroups = randomSubsetOf(between(1, INDEX_GROUPS.size()), INDEX_GROUPS); + final List testIndices = clusterState.metadata().indices().keySet().stream().filter(index -> { + String g = getIndexGroup(index); + return g != null && testGroups.contains(g); + }).toList(); + logger.debug("--> test with indices {}", testIndices); + final ResponseCollector responseCollector = new ResponseCollector(); + final RequestDispatcher dispatcher = new RequestDispatcher( + mockClusterService(clusterState), + transportService, + newRandomParentTask(), + randomFieldCapRequest(false), + OriginalIndices.NONE, + randomNonNegativeLong(), + testIndices.toArray(new String[0]), + transportService.threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION), + responseCollector::addIndexResponse, + responseCollector::addIndexFailure, + responseCollector::onComplete + ); + final List toFailGroups = randomSubsetOf(between(1, testGroups.size()), testGroups); + transportService.setTransportInterceptor(new TransportInterceptor.AsyncSender() { + @Override + public void sendRequest( + Transport.Connection connection, + String action, + TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler handler + ) { + final int currentRound = dispatcher.executionRound(); + FieldCapabilitiesNodeRequest nodeRequest = (FieldCapabilitiesNodeRequest) request; + Set requestedGroups = nodeRequest.shardIds() + .stream() + .map(shr -> getIndexGroup(shr.getIndexName())) + .collect(Collectors.toSet()); + if (currentRound > 0) { + assertThat("Only failed groups are retried after the first found", requestedGroups, everyItem(in(toFailGroups))); + } + Set successIndices = new HashSet<>(); + List failedShards = new ArrayList<>(); + for (ShardId shardId : nodeRequest.shardIds()) { + if (toFailGroups.contains(getIndexGroup(shardId.getIndexName()))) { + failedShards.add(shardId); + } else { + successIndices.add(shardId.getIndexName()); + } + } + transportService.sendResponse(handler, randomNodeResponse(successIndices, failedShards, Collections.emptySet())); + } + }); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), true); + transportService.requestTracker.set(requestTracker); + dispatcher.execute(); + responseCollector.awaitCompletion(); + final Set successfulIndices = new HashSet<>(); + final Set failedIndices = new HashSet<>(); + for (String index : testIndices) { + if (toFailGroups.contains(getIndexGroup(index))) { + failedIndices.add(index); + } else { + successfulIndices.add(index); + } + } + assertThat(responseCollector.responses.keySet(), equalTo(successfulIndices)); + assertThat(responseCollector.failures.keySet(), equalTo(failedIndices)); + int maxRound = 0; + for (String group : testGroups) { + if (toFailGroups.contains(group)) { + Set assignedNodes = assignedNodes(clusterState, group); + Set sentNodes = requestTracker.nodeRequestsPerGroup(group) + .stream() + .map(r -> r.node.getId()) + .collect(Collectors.toSet()); + assertThat(sentNodes, equalTo(assignedNodes)); + maxRound = Math.max(maxRound, assignedNodes.size() - 1); + } else { + assertThat(requestTracker.nodeRequestsPerGroup(group), hasSize(1)); + } + } + assertThat(dispatcher.executionRound(), equalTo(maxRound + 1)); + } + } + private static class NodeRequest { final int round; final DiscoveryNode node; @@ -631,19 +749,24 @@ Set indices() { Set requestedShardIds(String index) { return request.shardIds().stream().filter(s -> s.getIndexName().equals(index)).collect(Collectors.toSet()); } + + @Override + public String toString() { + return "NodeRequest{" + "round=" + round + ", node=" + node + ", indices=" + indices() + '}'; + } } private static class RequestTracker { private final RequestDispatcher dispatcher; private final RoutingTable routingTable; - private final boolean withFilter; + private final boolean withIndexFilter; private final AtomicInteger currentRound = new AtomicInteger(); final List sentNodeRequests = new CopyOnWriteArrayList<>(); - RequestTracker(RequestDispatcher dispatcher, RoutingTable routingTable, boolean withFilter) { + RequestTracker(RequestDispatcher dispatcher, RoutingTable routingTable, boolean withIndexFilter) { this.dispatcher = dispatcher; this.routingTable = routingTable; - this.withFilter = withFilter; + this.withIndexFilter = withIndexFilter; } void verifyAfterComplete() { @@ -655,7 +778,7 @@ void verifyAfterComplete() { for (int i = 0; i < lastRound; i++) { int round = i; List nodeRequests = sentNodeRequests.stream().filter(r -> r.round == round).toList(); - if (withFilter == false) { + if (withIndexFilter == false) { // Without filter, each index is requested once in each round. ObjectIntMap requestsPerIndex = new ObjectIntHashMap<>(); nodeRequests.forEach(r -> r.indices().forEach(index -> requestsPerIndex.addTo(index, 1))); @@ -717,6 +840,18 @@ List nodeRequests(String index, int round) { List nodeRequests(String index) { return sentNodeRequests.stream().filter(r -> r.indices().contains(index)).toList(); } + + Set nodeRequestsPerGroup(String group) { + Set requests = new HashSet<>(); + for (NodeRequest r : sentNodeRequests) { + for (String index : r.indices()) { + if (group.equals(getIndexGroup(index))) { + requests.add(r); + } + } + } + return requests; + } } private static class TestTransportService extends TransportService { @@ -825,9 +960,9 @@ protected void doRun() { } } - static FieldCapabilitiesRequest randomFieldCapRequest(boolean withFilter) { - final QueryBuilder filter = withFilter ? new RangeQueryBuilder("timestamp").from(randomNonNegativeLong()) : null; - return new FieldCapabilitiesRequest().fields("*").indexFilter(filter); + static FieldCapabilitiesRequest randomFieldCapRequest(boolean withIndexFilter) { + final QueryBuilder indexFilter = withIndexFilter ? new RangeQueryBuilder("timestamp").from(randomNonNegativeLong()) : null; + return new FieldCapabilitiesRequest().fields("*").indexFilter(indexFilter); } static FieldCapabilitiesNodeResponse randomNodeResponse( @@ -897,13 +1032,74 @@ static Task newRandomParentTask() { return new Task(0, "type", "action", randomAlphaOfLength(10), TaskId.EMPTY_TASK_ID, Collections.emptyMap()); } - private ClusterState newClusterState(Metadata metadata, DiscoveryNodes discoveryNodes) { + private static List randomIndices(ClusterState clusterState) { + Set indices = clusterState.metadata().indices().keySet(); + return randomSubsetOf(randomIntBetween(1, indices.size()), indices); + } + + private static final List INDEX_GROUPS = List.of("red", "yellow", "green"); + + private static String getIndexGroup(String index) { + for (String group : INDEX_GROUPS) { + if (index.startsWith(group)) { + return group; + } + } + return null; + } + + private static Map> requestsPerGroupIndex(List requests) { + final Map> groups = new HashMap<>(); + for (NodeRequest r : requests) { + for (String index : r.indices()) { + String group = getIndexGroup(index); + groups.computeIfAbsent(group, k -> new HashSet<>()).add(r); + } + } + return groups; + } + + private ClusterState randomClusterState(boolean includeGroupMappingHash, int minNumberOfShards, int minNumberOfReplicas) { + final DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + final int numNodes = randomIntBetween(2, 10); + for (int i = 0; i < numNodes; i++) { + discoNodes.add(newNode("node_" + i, randomVersion(random()))); + } + final Metadata.Builder metadataBuilder = Metadata.builder(); + if (includeGroupMappingHash) { + for (String group : INDEX_GROUPS) { + MappingMetadata mapping = new MappingMetadata(MapperService.SINGLE_MAPPING_NAME, Map.of("mapping", group)); + int numIndices = between(1, 5); + for (int i = 0; i < numIndices; i++) { + final String index = group + "_" + i; + final Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(minNumberOfShards, 10)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(minNumberOfReplicas, 3)) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion()); + metadataBuilder.put(IndexMetadata.builder(index).settings(settings).putMapping(mapping)); + } + } + } + // indices without mapping hash + { + int oldIndices = randomIntBetween(includeGroupMappingHash ? 0 : 1, 5); + for (int i = 0; i < oldIndices; i++) { + final String index = "index_" + i; + final Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(minNumberOfShards, 10)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(minNumberOfReplicas, 3)) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion()); + metadataBuilder.put(IndexMetadata.builder(index).settings(settings)); + } + } + + Metadata metadata = metadataBuilder.build(); final RoutingTable.Builder routingTable = RoutingTable.builder(); for (IndexMetadata imd : metadata) { routingTable.addAsNew(metadata.index(imd.getIndex())); } final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .nodes(discoveryNodes) + .nodes(discoNodes) .metadata(metadata) .routingTable(routingTable.build()) .build(); @@ -952,6 +1148,22 @@ static int maxPossibleRounds(ClusterState clusterState, String index, boolean wi } } + static Set assignedNodes(ClusterState clusterState, String indexGroup) { + List indices = clusterState.metadata() + .indices() + .keySet() + .stream() + .filter(index -> indexGroup.equals(getIndexGroup(index))) + .toList(); + Set assignedNodes = new HashSet<>(); + for (String index : indices) { + for (ShardRouting shard : clusterState.routingTable().index(index).randomAllActiveShardsIt()) { + assignedNodes.add(shard.currentNodeId()); + } + } + return assignedNodes; + } + static ClusterService mockClusterService(ClusterState clusterState) { final ClusterService clusterService = mock(ClusterService.class); when(clusterService.state()).thenReturn(clusterState);