From 4d4a09bee5ed5919b8eeedc3c18b1a0ad93a53e6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 27 Aug 2020 16:38:22 -0400 Subject: [PATCH 1/5] Disable allocation rebalance in point in time test --- .../elasticsearch/xpack/core/search/PointInTimeIT.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/PointInTimeIT.java b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/PointInTimeIT.java index 42a9ab211b844..b3842065508de 100644 --- a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/PointInTimeIT.java +++ b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/PointInTimeIT.java @@ -8,6 +8,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; @@ -62,6 +63,15 @@ protected Collection> nodePlugins() { return plugins; } + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + // If shards are relocated to new nodes, then searches with point-in-time will fail + .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none") + .build(); + } + public void testBasic() { createIndex("test"); int numDocs = randomIntBetween(10, 50); From 279fe909dab68980a48fad558990ed873f32b514 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 31 Aug 2020 17:25:14 -0400 Subject: [PATCH 2/5] Always include target when resolving point in time --- .../search/AbstractSearchAsyncAction.java | 29 ++-- .../search/CanMatchPreFilterSearchPhase.java | 6 +- .../SearchDfsQueryThenFetchAsyncAction.java | 6 +- .../SearchQueryThenFetchAsyncAction.java | 7 +- .../action/search/SearchShardIterator.java | 92 +++++++------ .../action/search/TransportSearchAction.java | 24 ++-- .../cluster/routing/GroupShardsIterator.java | 12 +- .../cluster/routing/PlainShardsIterator.java | 56 +------- .../cluster/routing/ShardsIterator.java | 4 +- .../elasticsearch/common/util/Countable.java | 24 ++++ .../common/util/PlainIterator.java | 69 ++++++++++ .../AbstractSearchAsyncActionTests.java | 2 +- .../CanMatchPreFilterSearchPhaseTests.java | 3 +- .../action/search/SearchAsyncActionTests.java | 37 ++--- .../search/SearchShardIteratorTests.java | 21 ++- .../search/TransportSearchActionTests.java | 31 +++-- .../routing/GroupShardsIteratorTests.java | 4 +- .../search/slice/SliceBuilderTests.java | 3 +- .../xpack/core/search/PointInTimeIT.java | 128 +++++++++++++----- 19 files changed, 344 insertions(+), 214 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/util/Countable.java create mode 100644 server/src/main/java/org/elasticsearch/common/util/PlainIterator.java diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index c5e1564b5ee2a..9a37dfc932502 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -32,7 +32,6 @@ import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -212,7 +211,7 @@ void skipShard(SearchShardIterator iterator) { successfulShardExecution(iterator); } - private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) { + private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) { /* * We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the * same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we @@ -221,16 +220,16 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator * we can continue (cf. InitialSearchPhase#maybeFork). */ if (shard == null) { - fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()))); + fork(() -> onShardFailure(shardIndex, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()))); } else { final PendingExecutions pendingExecutions = throttleConcurrentRequests ? - pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode)) + pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode)) : null; Runnable r = () -> { final Thread thread = Thread.currentThread(); try { executePhaseOnShard(shardIt, shard, - new SearchActionListener(shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) { + new SearchActionListener(shard, shardIndex) { @Override public void innerOnResponse(Result result) { try { @@ -243,7 +242,7 @@ public void innerOnResponse(Result result) { @Override public void onFailure(Exception t) { try { - onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t); + onShardFailure(shardIndex, shard, shardIt, t); } finally { executeNext(pendingExecutions, thread); } @@ -255,7 +254,7 @@ public void onFailure(Exception t) { * It is possible to run into connection exceptions here because we are getting the connection early and might * run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy. */ - fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e)); + fork(() -> onShardFailure(shardIndex, shard, shardIt, e)); } finally { executeNext(pendingExecutions, thread); } @@ -275,7 +274,9 @@ public void onFailure(Exception t) { * @param shard the shard routing to send the request for * @param listener the listener to notify on response */ - protected abstract void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, SearchActionListener listener); + protected abstract void executePhaseOnShard(SearchShardIterator shardIt, + SearchShardTarget shard, + SearchActionListener listener); private void fork(final Runnable runnable) { executor.execute(new AbstractRunnable() { @@ -370,18 +371,16 @@ ShardSearchFailure[] buildShardFailures() { return failures; } - private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, - final SearchShardIterator shardIt, Exception e) { + private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) { // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard - SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId); - onShardFailure(shardIndex, shardTarget, e); - final ShardRouting nextShard = shardIt.nextOrNull(); + onShardFailure(shardIndex, shard, e); + final SearchShardTarget nextShard = shardIt.nextOrNull(); final boolean lastShard = nextShard == null; logger.debug(() -> new ParameterizedMessage("{}: Failed to execute [{}] lastShard [{}]", - shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e); + shard != null ? shard : shardIt.shardId(), request, lastShard), e); if (lastShard) { - onShardGroupFailure(shardIndex, shardTarget, e); + onShardGroupFailure(shardIndex, shard, e); } final int totalOps = this.totalOps.incrementAndGet(); if (totalOps == expectedTotalOps) { diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index f594ba80f8ef5..db59c39559ed2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -23,8 +23,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.search.SearchService.CanMatchResponse; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.sort.FieldSortBuilder; @@ -77,9 +77,9 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction listener) { - getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()), + getSearchTransport().sendCanMatch(getConnection(shard.getClusterAlias(), shard.getNodeId()), buildShardSearchRequest(shardIt), getTask(), listener); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index 26a91430fc913..0762d70dc5cbf 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -23,7 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -65,9 +65,9 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction } @Override - protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard, + protected void executePhaseOnShard(final SearchShardIterator shardIt, final SearchShardTarget shard, final SearchActionListener listener) { - getSearchTransport().sendExecuteDfs(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()), + getSearchTransport().sendExecuteDfs(getConnection(shard.getClusterAlias(), shard.getNodeId()), buildShardSearchRequest(shardIt) , getTask(), listener); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 88e3edfc8ed46..f841c6e55f44b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -75,11 +74,11 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener) { ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt)); - getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()), - request, getTask(), listener); + getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java index 50a1351c3642f..4b9a27fc975c9 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java @@ -21,16 +21,19 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.cluster.routing.PlainShardIterator; -import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.Countable; +import org.elasticsearch.common.util.PlainIterator; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.ShardSearchContextId; +import java.util.Comparator; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; /** * Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices} @@ -38,32 +41,36 @@ * the cluster alias. * @see OriginalIndices */ -public final class SearchShardIterator extends PlainShardIterator { +public final class SearchShardIterator implements Comparable, Countable { private final OriginalIndices originalIndices; private final String clusterAlias; + private final ShardId shardId; private boolean skip = false; private final ShardSearchContextId searchContextId; private final TimeValue searchContextKeepAlive; + private final PlainIterator targetNodesIterator; /** * Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards * this the a given shardId. * - * @param clusterAlias the alias of the cluster where the shard is located - * @param shardId shard id of the group - * @param shards shards to iterate + * @param clusterAlias the alias of the cluster where the shard is located + * @param shardId shard id of the group + * @param shards shards to iterate * @param originalIndices the indices that the search request originally related to (before any rewriting happened) */ public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List shards, OriginalIndices originalIndices) { - this(clusterAlias, shardId, shards, originalIndices, null, null); + this(clusterAlias, shardId, shards.stream().map(ShardRouting::currentNodeId).collect(Collectors.toList()), + originalIndices, null, null); } public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, - List shards, OriginalIndices originalIndices, + List targetNodeIds, OriginalIndices originalIndices, ShardSearchContextId searchContextId, TimeValue searchContextKeepAlive) { - super(shardId, shards); + this.shardId = shardId; + this.targetNodesIterator = new PlainIterator<>(targetNodeIds); this.originalIndices = originalIndices; this.clusterAlias = clusterAlias; this.searchContextId = searchContextId; @@ -86,12 +93,16 @@ public String getClusterAlias() { return clusterAlias; } - /** - * Creates a new shard target from this iterator, pointing at the node identified by the provided identifier. - * @see SearchShardTarget - */ - SearchShardTarget newSearchShardTarget(String nodeId) { - return new SearchShardTarget(nodeId, shardId(), clusterAlias, originalIndices); + SearchShardTarget nextOrNull() { + final String nodeId = targetNodesIterator.nextOrNull(); + if (nodeId != null) { + return new SearchShardTarget(nodeId, shardId, clusterAlias, originalIndices); + } + return null; + } + + int remaining() { + return targetNodesIterator.remaining(); } /** @@ -105,6 +116,10 @@ TimeValue getSearchContextKeepAlive() { return searchContextKeepAlive; } + List getTargetNodeIds() { + return targetNodesIterator.list(); + } + /** * Reset the iterator and mark it as skippable * @see #skip() @@ -114,6 +129,10 @@ void resetAndSkip() { skip = true; } + void reset() { + targetNodesIterator.reset(); + } + /** * Returns true if the search execution should skip this shard since it can not match any documents given the query. */ @@ -121,42 +140,33 @@ boolean skip() { return skip; } + + @Override + public int size() { + return targetNodesIterator.size(); + } + + ShardId shardId() { + return shardId; + } + @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (super.equals(o) == false) { - return false; - } + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; SearchShardIterator that = (SearchShardIterator) o; - return Objects.equals(clusterAlias, that.clusterAlias); + return shardId.equals(that.shardId) && Objects.equals(clusterAlias, that.clusterAlias); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), clusterAlias); + return Objects.hash(clusterAlias, shardId); } @Override - public int compareTo(ShardIterator o) { - int superCompareTo = super.compareTo(o); - if (superCompareTo != 0 || (o instanceof SearchShardIterator == false)) { - return superCompareTo; - } - SearchShardIterator searchShardIterator = (SearchShardIterator)o; - if (clusterAlias == null && searchShardIterator.getClusterAlias() == null) { - return 0; - } - if (clusterAlias == null) { - return -1; - } - if (searchShardIterator.getClusterAlias() == null) { - return 1; - } - return clusterAlias.compareTo(searchShardIterator.getClusterAlias()); + public int compareTo(SearchShardIterator o) { + return Comparator.comparing(SearchShardIterator::shardId) + .thenComparing(SearchShardIterator::getClusterAlias, Comparator.nullsFirst(String::compareTo)) + .compare(this, o); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 33de77862857f..21553c603c034 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -245,11 +245,10 @@ public AbstractSearchAsyncAction asyncSearchAction( indexRoutings, executor, searchRequest, listener, shardsIts, timeProvider, clusterState, task, new ArraySearchPhaseResults<>(shardsIts.size()), 1, clusters) { @Override - protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, + protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard, SearchActionListener listener) { - final Transport.Connection connection = getConnection(shardIt.getClusterAlias(), shard.currentNodeId()); - final SearchShardTarget searchShardTarget = shardIt.newSearchShardTarget(shard.currentNodeId()); - phaseSearchAction.executeOnShardTarget(task, searchShardTarget, connection, listener); + final Transport.Connection connection = getConnection(shard.getClusterAlias(), shard.getNodeId()); + phaseSearchAction.executeOnShardTarget(task, shard, connection, listener); } @Override @@ -596,7 +595,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts); localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false) .map(it -> new SearchShardIterator( - searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices, null, null)) + searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices)) .collect(Collectors.toList()); aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap); indexRoutings = routingMap; @@ -874,7 +873,8 @@ static Map getIndicesFromSearchContexts(SearchContextId .collect(Collectors.toMap(Map.Entry::getKey, e -> new OriginalIndices(e.getValue().toArray(String[]::new), indicesOptions))); } - static List getSearchShardsFromSearchContexts(ClusterState clusterState, OriginalIndices originalIndices, + static List getSearchShardsFromSearchContexts(ClusterState clusterState, + OriginalIndices originalIndices, String localClusterAlias, SearchContextId searchContext, TimeValue keepAlive) { @@ -882,15 +882,15 @@ static List getSearchShardsFromSearchContexts(ClusterState for (Map.Entry entry : searchContext.shards().entrySet()) { final ShardId shardId = entry.getKey(); final ShardIterator shards = OperationRouting.getShards(clusterState, shardId); - final List matchingNodeFirstRoutings = new ArrayList<>(); + final List matchingNodeFirst = new ArrayList<>(shards.size()); + final String nodeId = entry.getValue().getNode(); + matchingNodeFirst.add(nodeId); // always try the matching node first even its shard was relocated for (ShardRouting shard : shards) { - if (shard.currentNodeId().equals(entry.getValue().getNode())) { - matchingNodeFirstRoutings.add(0, shard); - } else { - matchingNodeFirstRoutings.add(shard); + if (shard.currentNodeId().equals(nodeId) == false) { + matchingNodeFirst.add(shard.currentNodeId()); } } - iterators.add(new SearchShardIterator(localClusterAlias, shardId, matchingNodeFirstRoutings, originalIndices, + iterators.add(new SearchShardIterator(localClusterAlias, shardId, matchingNodeFirst, originalIndices, entry.getValue().getSearchContextId(), keepAlive)); } return iterators; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java b/server/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java index 1cb105ac775e3..3f5040feaad10 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.routing; import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.common.util.Countable; import java.util.Iterator; import java.util.List; @@ -30,16 +31,15 @@ * ShardsIterators are always returned in ascending order independently of their order at construction * time. The incoming iterators are sorted to ensure consistent iteration behavior across Nodes / JVMs. */ -public final class GroupShardsIterator implements Iterable { +public final class GroupShardsIterator & Countable> implements Iterable { private final List iterators; /** * Constructs a new sorted GroupShardsIterator from the given list. Items are sorted based on their natural ordering. * @see PlainShardIterator#compareTo(ShardIterator) - * @see org.elasticsearch.action.search.SearchShardIterator#compareTo(ShardIterator) */ - public static GroupShardsIterator sortAndCreate(List iterators) { + public static & Countable> GroupShardsIterator sortAndCreate(List iterators) { CollectionUtil.timSort(iterators); return new GroupShardsIterator<>(iterators); } @@ -56,11 +56,7 @@ public GroupShardsIterator(List iterators) { * @return total number of shards */ public int totalSize() { - int size = 0; - for (ShardIterator shard : iterators) { - size += shard.size(); - } - return size; + return iterators.stream().mapToInt(Countable::size).sum(); } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java b/server/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java index e9a99b7b456c4..4e71b2aeda701 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java @@ -18,70 +18,26 @@ */ package org.elasticsearch.cluster.routing; -import java.util.Collections; -import java.util.Iterator; +import org.elasticsearch.common.util.PlainIterator; + import java.util.List; /** * A simple {@link ShardsIterator} that iterates a list or sub-list of * {@link ShardRouting shard indexRoutings}. */ -public class PlainShardsIterator implements ShardsIterator { - - private final List shards; - - // Calls to nextOrNull might be performed on different threads in the transport actions so we need the volatile - // keyword in order to ensure visibility. Note that it is fine to use `volatile` for a counter in that case given - // that although nextOrNull might be called from different threads, it can never happen concurrently. - private volatile int index; - +public class PlainShardsIterator extends PlainIterator implements ShardsIterator { public PlainShardsIterator(List shards) { - this.shards = shards; - reset(); - } - - @Override - public void reset() { - index = 0; - } - - @Override - public int remaining() { - return shards.size() - index; - } - - @Override - public ShardRouting nextOrNull() { - if (index == shards.size()) { - return null; - } else { - return shards.get(index++); - } - } - - @Override - public int size() { - return shards.size(); + super(shards); } @Override public int sizeActive() { - int count = 0; - for (ShardRouting shard : shards) { - if (shard.active()) { - count++; - } - } - return count; + return Math.toIntExact(getShardRoutings().stream().filter(ShardRouting::active).count()); } @Override public List getShardRoutings() { - return Collections.unmodifiableList(shards); - } - - @Override - public Iterator iterator() { - return shards.iterator(); + return list(); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java index dacf49cb73684..9c0396394a3a2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java @@ -18,12 +18,14 @@ */ package org.elasticsearch.cluster.routing; +import org.elasticsearch.common.util.Countable; + import java.util.List; /** * Allows to iterate over unrelated shards. */ -public interface ShardsIterator extends Iterable { +public interface ShardsIterator extends Iterable, Countable { /** * Resets the iterator to its initial state. diff --git a/server/src/main/java/org/elasticsearch/common/util/Countable.java b/server/src/main/java/org/elasticsearch/common/util/Countable.java new file mode 100644 index 0000000000000..8bb1e55a890d0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/Countable.java @@ -0,0 +1,24 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util; + +public interface Countable { + int size(); +} diff --git a/server/src/main/java/org/elasticsearch/common/util/PlainIterator.java b/server/src/main/java/org/elasticsearch/common/util/PlainIterator.java new file mode 100644 index 0000000000000..4ccbf35ecd3fd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/PlainIterator.java @@ -0,0 +1,69 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +public class PlainIterator implements Iterable, Countable { + private final List elements; + + // Calls to nextOrNull might be performed on different threads in the transport actions so we need the volatile + // keyword in order to ensure visibility. Note that it is fine to use `volatile` for a counter in that case given + // that although nextOrNull might be called from different threads, it can never happen concurrently. + private volatile int index; + + public PlainIterator(List elements) { + this.elements = elements; + reset(); + } + + public void reset() { + index = 0; + } + + public int remaining() { + return elements.size() - index; + } + + public T nextOrNull() { + if (index == elements.size()) { + return null; + } else { + return elements.get(index++); + } + } + + @Override + public int size() { + return elements.size(); + } + + + public List list() { + return Collections.unmodifiableList(elements); + } + + @Override + public Iterator iterator() { + return elements.iterator(); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 5f077cd3dcb4c..c1ba0dd30dc01 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -102,7 +102,7 @@ protected SearchPhase getNextPhase(final SearchPhaseResults r } @Override - protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard, + protected void executePhaseOnShard(final SearchShardIterator shardIt, final SearchShardTarget shard, final SearchActionListener listener) { } diff --git a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 3d46a3e13a434..8c6ce44c00f9c 100644 --- a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchRequest; @@ -266,7 +267,7 @@ public void run() { @Override protected void executePhaseOnShard( final SearchShardIterator shardIt, - final ShardRouting shard, + final SearchShardTarget shard, final SearchActionListener listener) { if (randomBoolean()) { listener.onResponse(new SearchPhaseResult() {}); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index f92902d631a1f..42da1e193279d 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchContextId; @@ -117,15 +118,15 @@ public void testSkipSearchShards() throws InterruptedException { SearchResponse.Clusters.EMPTY) { @Override - protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, + protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard, SearchActionListener listener) { - seenShard.computeIfAbsent(shard.shardId(), (i) -> { + seenShard.computeIfAbsent(shard.getShardId(), (i) -> { numRequests.incrementAndGet(); // only count this once per replica return Boolean.TRUE; }); new Thread(() -> { - Transport.Connection connection = getConnection(null, shard.currentNodeId()); + Transport.Connection connection = getConnection(null, shard.getNodeId()); TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult( new ShardSearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode()); @@ -223,9 +224,9 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { SearchResponse.Clusters.EMPTY) { @Override - protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, + protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard, SearchActionListener listener) { - seenShard.computeIfAbsent(shard.shardId(), (i) -> { + seenShard.computeIfAbsent(shard.getShardId(), (i) -> { numRequests.incrementAndGet(); // only count this once per shard copy return Boolean.TRUE; }); @@ -236,10 +237,10 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting sha } catch (InterruptedException e) { throw new AssertionError(e); } - Transport.Connection connection = getConnection(null, shard.currentNodeId()); + Transport.Connection connection = getConnection(null, shard.getNodeId()); TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult( new ShardSearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode()); - if (shardFailures[shard.shardId().id()]) { + if (shardFailures[shard.getShardId().id()]) { listener.onFailure(new RuntimeException()); } else { listener.onResponse(testSearchPhaseResult); @@ -327,10 +328,10 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI TestSearchResponse response = new TestSearchResponse(); @Override - protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, SearchActionListener - listener) { - assertTrue("shard: " + shard.shardId() + " has been queried twice", response.queried.add(shard.shardId())); - Transport.Connection connection = getConnection(null, shard.currentNodeId()); + protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard, + SearchActionListener listener) { + assertTrue("shard: " + shard.getShardId() + " has been queried twice", response.queried.add(shard.getShardId())); + Transport.Connection connection = getConnection(null, shard.getNodeId()); TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult( new ShardSearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode()); Set ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> newConcurrentSet()); @@ -438,12 +439,12 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI @Override protected void executePhaseOnShard(SearchShardIterator shardIt, - ShardRouting shard, + SearchShardTarget shard, SearchActionListener listener) { - assertTrue("shard: " + shard.shardId() + " has been queried twice", response.queried.add(shard.shardId())); - Transport.Connection connection = getConnection(null, shard.currentNodeId()); + assertTrue("shard: " + shard.getShardId() + " has been queried twice", response.queried.add(shard.getShardId())); + Transport.Connection connection = getConnection(null, shard.getNodeId()); final TestSearchPhaseResult testSearchPhaseResult; - if (shard.shardId().id() == 0) { + if (shard.getShardId().id() == 0) { testSearchPhaseResult = new TestSearchPhaseResult(null, connection.getNode()); } else { testSearchPhaseResult = new TestSearchPhaseResult(new ShardSearchContextId(UUIDs.randomBase64UUID(), @@ -538,14 +539,14 @@ public void testAllowPartialResults() throws InterruptedException { SearchResponse.Clusters.EMPTY) { @Override - protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, + protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard, SearchActionListener listener) { - seenShard.computeIfAbsent(shard.shardId(), (i) -> { + seenShard.computeIfAbsent(shard.getShardId(), (i) -> { numRequests.incrementAndGet(); // only count this once per shard copy return Boolean.TRUE; }); new Thread(() -> { - Transport.Connection connection = getConnection(null, shard.currentNodeId()); + Transport.Connection connection = getConnection(null, shard.getNodeId()); TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult( new ShardSearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode()); if (shardIt.remaining() > 0) { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java index 8fdd0838e984c..4d83d40a3326c 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java @@ -34,6 +34,8 @@ import java.util.Collections; import java.util.List; +import static org.hamcrest.Matchers.equalTo; + public class SearchShardIteratorTests extends ESTestCase { public void testShardId() { @@ -63,9 +65,13 @@ public void testNewSearchShardTarget() { ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); OriginalIndices originalIndices = new OriginalIndices(new String[]{randomAlphaOfLengthBetween(3, 10)}, IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); - SearchShardIterator searchShardIterator = new SearchShardIterator(clusterAlias, shardId, Collections.emptyList(), originalIndices); + String nodeId = randomAlphaOfLengthBetween(3, 10); - SearchShardTarget searchShardTarget = searchShardIterator.newSearchShardTarget(nodeId); + SearchShardIterator searchShardIterator = new SearchShardIterator(clusterAlias, shardId, + List.of(nodeId),originalIndices, null, null); + final SearchShardTarget searchShardTarget = searchShardIterator.nextOrNull(); + assertNotNull(searchShardTarget); + assertThat(searchShardTarget.getNodeId(), equalTo(nodeId)); assertEquals(clusterAlias, searchShardTarget.getClusterAlias()); assertSame(shardId, searchShardTarget.getShardId()); assertEquals(nodeId, searchShardTarget.getNodeId()); @@ -74,7 +80,7 @@ public void testNewSearchShardTarget() { public void testEqualsAndHashcode() { EqualsHashCodeTestUtils.checkEqualsAndHashCode(randomSearchShardIterator(), s -> new SearchShardIterator(s.getClusterAlias(), - s.shardId(), s.getShardRoutings(), s.getOriginalIndices()), s -> { + s.shardId(), s.getTargetNodeIds(), s.getOriginalIndices(), s.getSearchContextId(), s.getSearchContextKeepAlive()), s -> { if (randomBoolean()) { String clusterAlias; if (s.getClusterAlias() == null) { @@ -82,11 +88,13 @@ public void testEqualsAndHashcode() { } else { clusterAlias = randomBoolean() ? null : s.getClusterAlias() + randomAlphaOfLength(3); } - return new SearchShardIterator(clusterAlias, s.shardId(), s.getShardRoutings(), s.getOriginalIndices()); + return new SearchShardIterator(clusterAlias, s.shardId(), s.getTargetNodeIds(), s.getOriginalIndices(), + s.getSearchContextId(), s.getSearchContextKeepAlive()); } else { ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomIntBetween(0, Integer.MAX_VALUE)); - return new SearchShardIterator(s.getClusterAlias(), shardId, s.getShardRoutings(), s.getOriginalIndices()); + return new SearchShardIterator(s.getClusterAlias(), shardId, s.getTargetNodeIds(), s.getOriginalIndices(), + s.getSearchContextId(), s.getSearchContextKeepAlive()); } }); } @@ -134,7 +142,8 @@ public void testCompareTo() { public void testCompareToEqualItems() { SearchShardIterator shardIterator1 = randomSearchShardIterator(); SearchShardIterator shardIterator2 = new SearchShardIterator(shardIterator1.getClusterAlias(), shardIterator1.shardId(), - shardIterator1.getShardRoutings(), shardIterator1.getOriginalIndices()); + shardIterator1.getTargetNodeIds(), shardIterator1.getOriginalIndices(), shardIterator1.getSearchContextId(), + shardIterator1.getSearchContextKeepAlive()); assertEquals(shardIterator1, shardIterator2); assertEquals(0, shardIterator1.compareTo(shardIterator2)); assertEquals(0, shardIterator2.compareTo(shardIterator1)); diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index cd7eb980892cf..fe4ebab4ba53c 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -53,6 +53,7 @@ import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -256,33 +257,33 @@ public void testProcessRemoteShards() { assertTrue(iterator.shardId().getId() == 0 || iterator.shardId().getId() == 1); assertEquals("test_cluster_1", iterator.getClusterAlias()); assertEquals("foo", iterator.shardId().getIndexName()); - ShardRouting shardRouting = iterator.nextOrNull(); - assertNotNull(shardRouting); - assertEquals(shardRouting.getIndexName(), "foo"); - shardRouting = iterator.nextOrNull(); - assertNotNull(shardRouting); - assertEquals(shardRouting.getIndexName(), "foo"); + SearchShardTarget shard = iterator.nextOrNull(); + assertNotNull(shard); + assertEquals(shard.getShardId().getIndexName(), "foo"); + shard = iterator.nextOrNull(); + assertNotNull(shard); + assertEquals(shard.getShardId().getIndexName(), "foo"); assertNull(iterator.nextOrNull()); } else if (iterator.shardId().getIndexName().endsWith("bar")) { assertArrayEquals(new String[]{"bar"}, iterator.getOriginalIndices().indices()); assertEquals(0, iterator.shardId().getId()); assertEquals("test_cluster_1", iterator.getClusterAlias()); assertEquals("bar", iterator.shardId().getIndexName()); - ShardRouting shardRouting = iterator.nextOrNull(); - assertNotNull(shardRouting); - assertEquals(shardRouting.getIndexName(), "bar"); - shardRouting = iterator.nextOrNull(); - assertNotNull(shardRouting); - assertEquals(shardRouting.getIndexName(), "bar"); + SearchShardTarget shard = iterator.nextOrNull(); + assertNotNull(shard); + assertEquals(shard.getShardId().getIndexName(), "bar"); + shard = iterator.nextOrNull(); + assertNotNull(shard); + assertEquals(shard.getShardId().getIndexName(), "bar"); assertNull(iterator.nextOrNull()); } else if (iterator.shardId().getIndexName().endsWith("xyz")) { assertArrayEquals(new String[]{"some_alias_for_xyz"}, iterator.getOriginalIndices().indices()); assertEquals(0, iterator.shardId().getId()); assertEquals("xyz", iterator.shardId().getIndexName()); assertEquals("test_cluster_2", iterator.getClusterAlias()); - ShardRouting shardRouting = iterator.nextOrNull(); - assertNotNull(shardRouting); - assertEquals(shardRouting.getIndexName(), "xyz"); + SearchShardTarget shard = iterator.nextOrNull(); + assertNotNull(shard); + assertEquals(shard.getShardId().getIndexName(), "xyz"); assertNull(iterator.nextOrNull()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/GroupShardsIteratorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/GroupShardsIteratorTests.java index 45c57a0cdce84..75ef22e404f8b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/GroupShardsIteratorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/GroupShardsIteratorTests.java @@ -155,7 +155,7 @@ public void testOrderingWithSearchShardIterators() { List shuffled = new ArrayList<>(sorted); Collections.shuffle(shuffled, random()); { - List actualIterators = new ArrayList<>(); + List actualIterators = new ArrayList<>(); GroupShardsIterator iter = new GroupShardsIterator<>(shuffled); for (SearchShardIterator searchShardIterator : iter) { actualIterators.add(searchShardIterator); @@ -163,7 +163,7 @@ public void testOrderingWithSearchShardIterators() { assertEquals(shuffled, actualIterators); } { - List actualIterators = new ArrayList<>(); + List actualIterators = new ArrayList<>(); GroupShardsIterator iter = GroupShardsIterator.sortAndCreate(shuffled); for (SearchShardIterator searchShardIterator : iter) { actualIterators.add(searchShardIterator); diff --git a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java index 18253f5aaee6f..704715d0573ba 100644 --- a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.routing.PlainShardIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -326,7 +327,7 @@ public void testToFilterWithRouting() throws IOException { OperationRouting routing = mock(OperationRouting.class); GroupShardsIterator it = new GroupShardsIterator<>( Collections.singletonList( - new SearchShardIterator(null, new ShardId("index", "index", 1), null, null) + new PlainShardIterator(new ShardId("index", "index", 1), Collections.emptyList()) ) ); when(routing.searchShards(any(), any(), any(), any())).thenReturn(it); diff --git a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/PointInTimeIT.java b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/PointInTimeIT.java index b3842065508de..7c3a66d0d2a3e 100644 --- a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/PointInTimeIT.java +++ b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/PointInTimeIT.java @@ -8,16 +8,12 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.admin.indices.stats.CommonStats; -import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; -import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; -import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; @@ -27,9 +23,13 @@ import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchService; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeResponse; @@ -37,14 +37,20 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; public class PointInTimeIT extends ESIntegTestCase { @@ -63,15 +69,6 @@ protected Collection> nodePlugins() { return plugins; } - @Override - public Settings indexSettings() { - return Settings.builder() - .put(super.indexSettings()) - // If shards are relocated to new nodes, then searches with point-in-time will fail - .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none") - .build(); - } - public void testBasic() { createIndex("test"); int numDocs = randomIntBetween(10, 50); @@ -124,30 +121,95 @@ public void testMultipleIndices() { client().prepareIndex(index).setId(id).setSource("value", i).get(); } refresh(); - String readerId = openPointInTime(new String[] { "*" }, TimeValue.timeValueMinutes(2)); - SearchResponse resp1 = client().prepareSearch().setPreference(null).setSearchContext(readerId, TimeValue.timeValueMinutes(2)).get(); - assertNoFailures(resp1); - assertHitCount(resp1, numDocs); - int moreDocs = randomIntBetween(10, 50); - for (int i = 0; i < moreDocs; i++) { - String id = "more-" + i; - String index = "index-" + randomIntBetween(1, numIndices); - client().prepareIndex(index).setId(id).setSource("value", i).get(); + String pitId = openPointInTime(new String[]{"*"}, TimeValue.timeValueMinutes(2)); + try { + SearchResponse resp = client().prepareSearch() + .setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(2)) + .get(); + assertNoFailures(resp); + assertHitCount(resp, numDocs); + assertNotNull(resp.pointInTimeId()); + pitId = resp.pointInTimeId(); + int moreDocs = randomIntBetween(10, 50); + for (int i = 0; i < moreDocs; i++) { + String id = "more-" + i; + String index = "index-" + randomIntBetween(1, numIndices); + client().prepareIndex(index).setId(id).setSource("value", i).get(); + } + refresh(); + resp = client().prepareSearch().get(); + assertNoFailures(resp); + assertHitCount(resp, numDocs + moreDocs); + + resp = client().prepareSearch().setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(1)).get(); + assertNoFailures(resp); + assertHitCount(resp, numDocs); + assertNotNull(resp.pointInTimeId()); + pitId = resp.pointInTimeId(); + } finally { + closePointInTime(pitId); + } + } + + public void testRelocation() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(4); + createIndex("test", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 1)).build()); + ensureGreen("test"); + int numDocs = randomIntBetween(10, 50); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test").setId(Integer.toString(i)).setSource("value", i).get(); } refresh(); + String pitId = openPointInTime(new String[]{"test"}, TimeValue.timeValueMinutes(2)); try { - SearchResponse resp2 = client().prepareSearch().get(); - assertNoFailures(resp2); - assertHitCount(resp2, numDocs + moreDocs); - - SearchResponse resp3 = client().prepareSearch() - .setPreference(null) - .setSearchContext(resp1.pointInTimeId(), TimeValue.timeValueMinutes(1)) + SearchResponse resp = client().prepareSearch() + .setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(2)) .get(); - assertNoFailures(resp3); - assertHitCount(resp3, numDocs); + assertNoFailures(resp); + assertHitCount(resp, numDocs); + assertNotNull(resp.pointInTimeId()); + if (randomBoolean()) { + pitId = resp.pointInTimeId(); + } + final Set dataNodes = StreamSupport.stream(clusterService().state().nodes().getDataNodes().spliterator(), false) + .map(e -> e.value.getId()).collect(Collectors.toSet()); + final List excludedNodes = randomSubsetOf(2, dataNodes); + assertAcked(client().admin().indices().prepareUpdateSettings("test") + .setSettings(Settings.builder().put("index.routing.allocation.exclude._id", String.join(",", excludedNodes)).build())); + if (randomBoolean()) { + int moreDocs = randomIntBetween(10, 50); + for (int i = 0; i < moreDocs; i++) { + client().prepareIndex("test").setId("more-" + i).setSource("value", i).get(); + } + refresh(); + } + resp = client().prepareSearch() + .setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(2)) + .get(); + assertNoFailures(resp); + assertHitCount(resp, numDocs); + assertNotNull(resp.pointInTimeId()); + if (randomBoolean()) { + pitId = resp.pointInTimeId(); + } + assertBusy(() -> { + final Set assignedNodes = clusterService().state().routingTable().allShards().stream() + .filter(shr -> shr.index().getName().equals("test") && shr.assignedToNode()) + .map(ShardRouting::currentNodeId) + .collect(Collectors.toSet()); + assertThat(assignedNodes, everyItem(not(in(excludedNodes)))); + }, 30, TimeUnit.SECONDS); + resp = client().prepareSearch() + .setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(2)) + .get(); + assertNoFailures(resp); + assertHitCount(resp, numDocs); + assertNotNull(resp.pointInTimeId()); + if (randomBoolean()) { + pitId = resp.pointInTimeId(); + } } finally { - closePointInTime(resp1.pointInTimeId()); + closePointInTime(pitId); } } From 62474f7085f65f167f5ceec628805565986637aa Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 31 Aug 2020 23:49:04 -0400 Subject: [PATCH 3/5] better name --- .../org/elasticsearch/action/search/SearchShardIterator.java | 2 +- .../org/elasticsearch/cluster/routing/PlainShardsIterator.java | 2 +- .../main/java/org/elasticsearch/common/util/PlainIterator.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java index 4b9a27fc975c9..c0441b1b7a188 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java @@ -117,7 +117,7 @@ TimeValue getSearchContextKeepAlive() { } List getTargetNodeIds() { - return targetNodesIterator.list(); + return targetNodesIterator.asList(); } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java b/server/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java index 4e71b2aeda701..55c6c588005b2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java @@ -38,6 +38,6 @@ public int sizeActive() { @Override public List getShardRoutings() { - return list(); + return asList(); } } diff --git a/server/src/main/java/org/elasticsearch/common/util/PlainIterator.java b/server/src/main/java/org/elasticsearch/common/util/PlainIterator.java index 4ccbf35ecd3fd..59e3c240ea71f 100644 --- a/server/src/main/java/org/elasticsearch/common/util/PlainIterator.java +++ b/server/src/main/java/org/elasticsearch/common/util/PlainIterator.java @@ -58,7 +58,7 @@ public int size() { } - public List list() { + public List asList() { return Collections.unmodifiableList(elements); } From f70f20482d9a974ef240d060422e29d6f142415b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 1 Sep 2020 09:47:55 -0400 Subject: [PATCH 4/5] ignore when node is no longer in cluster --- .../elasticsearch/action/search/TransportSearchAction.java | 6 +++++- .../action/search/AbstractSearchAsyncActionTests.java | 1 - .../action/search/CanMatchPreFilterSearchPhaseTests.java | 1 - .../org/elasticsearch/search/slice/SliceBuilderTests.java | 1 - 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 21553c603c034..d8243034dbaa2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -884,7 +884,11 @@ static List getSearchShardsFromSearchContexts(ClusterState final ShardIterator shards = OperationRouting.getShards(clusterState, shardId); final List matchingNodeFirst = new ArrayList<>(shards.size()); final String nodeId = entry.getValue().getNode(); - matchingNodeFirst.add(nodeId); // always try the matching node first even its shard was relocated + if (clusterState.nodes().get(nodeId) != null) { + // always search the matching node first even when its shard was relocated to another node + // because the point in time should keep the corresponding search context open. + matchingNodeFirst.add(nodeId); + } for (ShardRouting shard : shards) { if (shard.currentNodeId().equals(nodeId) == false) { matchingNodeFirst.add(shard.currentNodeId()); diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index c1ba0dd30dc01..916f9111517a5 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.util.set.Sets; diff --git a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 8c6ce44c00f9c..8bf5f36d94107 100644 --- a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.shard.ShardId; diff --git a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java index 704715d0573ba..d52d48e6cf3df 100644 --- a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java @@ -32,7 +32,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchShardIterator; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; From 88072ac6bfc5fb98636c78980d36193378af5a0a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 1 Sep 2020 13:04:11 -0400 Subject: [PATCH 5/5] no need to exclude --- .../action/search/TransportSearchAction.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index d8243034dbaa2..f204cf162a6af 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -884,11 +884,9 @@ static List getSearchShardsFromSearchContexts(ClusterState final ShardIterator shards = OperationRouting.getShards(clusterState, shardId); final List matchingNodeFirst = new ArrayList<>(shards.size()); final String nodeId = entry.getValue().getNode(); - if (clusterState.nodes().get(nodeId) != null) { - // always search the matching node first even when its shard was relocated to another node - // because the point in time should keep the corresponding search context open. - matchingNodeFirst.add(nodeId); - } + // always search the matching node first even when its shard was relocated to another node + // because the point in time should keep the corresponding search context open. + matchingNodeFirst.add(nodeId); for (ShardRouting shard : shards) { if (shard.currentNodeId().equals(nodeId) == false) { matchingNodeFirst.add(shard.currentNodeId());