Skip to content

Commit

Permalink
Always include the matching node when resolving point in time (#61658)
Browse files Browse the repository at this point in the history
If shards are relocated to new nodes, then searches with a point in time
will fail, although a pit keeps search contexts open. This commit solves
this problem by reducing info used by SearchShardIterator and always
including the matching nodes when resolving a point in time.

Closes #61627
  • Loading branch information
dnhatn committed Sep 10, 2020
1 parent 035f063 commit 808c868
Show file tree
Hide file tree
Showing 19 changed files with 342 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
Expand Down Expand Up @@ -212,7 +211,7 @@ void skipShard(SearchShardIterator iterator) {
successfulShardExecution(iterator);
}

private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
/*
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
* same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we
Expand All @@ -221,16 +220,16 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator
* we can continue (cf. InitialSearchPhase#maybeFork).
*/
if (shard == null) {
fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
fork(() -> onShardFailure(shardIndex, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
} else {
final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
: null;
Runnable r = () -> {
final Thread thread = Thread.currentThread();
try {
executePhaseOnShard(shardIt, shard,
new SearchActionListener<Result>(shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {
new SearchActionListener<Result>(shard, shardIndex) {
@Override
public void innerOnResponse(Result result) {
try {
Expand All @@ -243,7 +242,7 @@ public void innerOnResponse(Result result) {
@Override
public void onFailure(Exception t) {
try {
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);
onShardFailure(shardIndex, shard, shardIt, t);
} finally {
executeNext(pendingExecutions, thread);
}
Expand All @@ -255,7 +254,7 @@ public void onFailure(Exception t) {
* It is possible to run into connection exceptions here because we are getting the connection early and might
* run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
*/
fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));
fork(() -> onShardFailure(shardIndex, shard, shardIt, e));
} finally {
executeNext(pendingExecutions, thread);
}
Expand All @@ -275,7 +274,9 @@ public void onFailure(Exception t) {
* @param shard the shard routing to send the request for
* @param listener the listener to notify on response
*/
protected abstract void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, SearchActionListener<Result> listener);
protected abstract void executePhaseOnShard(SearchShardIterator shardIt,
SearchShardTarget shard,
SearchActionListener<Result> listener);

private void fork(final Runnable runnable) {
executor.execute(new AbstractRunnable() {
Expand Down Expand Up @@ -370,18 +371,16 @@ ShardSearchFailure[] buildShardFailures() {
return failures;
}

private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,
final SearchShardIterator shardIt, Exception e) {
private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId);
onShardFailure(shardIndex, shardTarget, e);
final ShardRouting nextShard = shardIt.nextOrNull();
onShardFailure(shardIndex, shard, e);
final SearchShardTarget nextShard = shardIt.nextOrNull();
final boolean lastShard = nextShard == null;
logger.debug(() -> new ParameterizedMessage("{}: Failed to execute [{}] lastShard [{}]",
shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e);
shard != null ? shard : shardIt.shardId(), request, lastShard), e);
if (lastShard) {
onShardGroupFailure(shardIndex, shardTarget, e);
onShardGroupFailure(shardIndex, shard, e);
}
final int totalOps = this.totalOps.incrementAndGet();
if (totalOps == expectedTotalOps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchService.CanMatchResponse;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.sort.FieldSortBuilder;
Expand Down Expand Up @@ -77,9 +77,9 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
}

@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard,
SearchActionListener<CanMatchResponse> listener) {
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
getSearchTransport().sendCanMatch(getConnection(shard.getClusterAlias(), shard.getNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
Expand Down Expand Up @@ -65,9 +65,9 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
}

@Override
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
protected void executePhaseOnShard(final SearchShardIterator shardIt, final SearchShardTarget shard,
final SearchActionListener<DfsSearchResult> listener) {
getSearchTransport().sendExecuteDfs(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
getSearchTransport().sendExecuteDfs(getConnection(shard.getClusterAlias(), shard.getNodeId()),
buildShardSearchRequest(shardIt) , getTask(), listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -75,11 +74,11 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
SearchProgressListener.buildSearchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}

protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
protected void executePhaseOnShard(final SearchShardIterator shardIt,
final SearchShardTarget shard,
final SearchActionListener<SearchPhaseResult> listener) {
ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt));
getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
request, getTask(), listener);
getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,56 @@

import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Countable;
import org.elasticsearch.common.util.PlainIterator;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.ShardSearchContextId;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices}
* of the search request (useful especially with cross-cluster search, as each cluster has its own set of original indices) as well as
* the cluster alias.
* @see OriginalIndices
*/
public final class SearchShardIterator extends PlainShardIterator {
public final class SearchShardIterator implements Comparable<SearchShardIterator>, Countable {

private final OriginalIndices originalIndices;
private final String clusterAlias;
private final ShardId shardId;
private boolean skip = false;

private final ShardSearchContextId searchContextId;
private final TimeValue searchContextKeepAlive;
private final PlainIterator<String> targetNodesIterator;

/**
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
* this the a given <code>shardId</code>.
*
* @param clusterAlias the alias of the cluster where the shard is located
* @param shardId shard id of the group
* @param shards shards to iterate
* @param clusterAlias the alias of the cluster where the shard is located
* @param shardId shard id of the group
* @param shards shards to iterate
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
*/
public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
this(clusterAlias, shardId, shards, originalIndices, null, null);
this(clusterAlias, shardId, shards.stream().map(ShardRouting::currentNodeId).collect(Collectors.toList()),
originalIndices, null, null);
}

public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId,
List<ShardRouting> shards, OriginalIndices originalIndices,
List<String> targetNodeIds, OriginalIndices originalIndices,
ShardSearchContextId searchContextId, TimeValue searchContextKeepAlive) {
super(shardId, shards);
this.shardId = shardId;
this.targetNodesIterator = new PlainIterator<>(targetNodeIds);
this.originalIndices = originalIndices;
this.clusterAlias = clusterAlias;
this.searchContextId = searchContextId;
Expand All @@ -86,12 +93,16 @@ public String getClusterAlias() {
return clusterAlias;
}

/**
* Creates a new shard target from this iterator, pointing at the node identified by the provided identifier.
* @see SearchShardTarget
*/
SearchShardTarget newSearchShardTarget(String nodeId) {
return new SearchShardTarget(nodeId, shardId(), clusterAlias, originalIndices);
SearchShardTarget nextOrNull() {
final String nodeId = targetNodesIterator.nextOrNull();
if (nodeId != null) {
return new SearchShardTarget(nodeId, shardId, clusterAlias, originalIndices);
}
return null;
}

int remaining() {
return targetNodesIterator.remaining();
}

/**
Expand All @@ -105,6 +116,10 @@ TimeValue getSearchContextKeepAlive() {
return searchContextKeepAlive;
}

List<String> getTargetNodeIds() {
return targetNodesIterator.asList();
}

/**
* Reset the iterator and mark it as skippable
* @see #skip()
Expand All @@ -114,49 +129,44 @@ void resetAndSkip() {
skip = true;
}

void reset() {
targetNodesIterator.reset();
}

/**
* Returns <code>true</code> if the search execution should skip this shard since it can not match any documents given the query.
*/
boolean skip() {
return skip;
}


@Override
public int size() {
return targetNodesIterator.size();
}

ShardId shardId() {
return shardId;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (super.equals(o) == false) {
return false;
}
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchShardIterator that = (SearchShardIterator) o;
return Objects.equals(clusterAlias, that.clusterAlias);
return shardId.equals(that.shardId) && Objects.equals(clusterAlias, that.clusterAlias);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), clusterAlias);
return Objects.hash(clusterAlias, shardId);
}

@Override
public int compareTo(ShardIterator o) {
int superCompareTo = super.compareTo(o);
if (superCompareTo != 0 || (o instanceof SearchShardIterator == false)) {
return superCompareTo;
}
SearchShardIterator searchShardIterator = (SearchShardIterator)o;
if (clusterAlias == null && searchShardIterator.getClusterAlias() == null) {
return 0;
}
if (clusterAlias == null) {
return -1;
}
if (searchShardIterator.getClusterAlias() == null) {
return 1;
}
return clusterAlias.compareTo(searchShardIterator.getClusterAlias());
public int compareTo(SearchShardIterator o) {
return Comparator.comparing(SearchShardIterator::shardId)
.thenComparing(SearchShardIterator::getClusterAlias, Comparator.nullsFirst(String::compareTo))
.compare(this, o);
}
}
Loading

0 comments on commit 808c868

Please sign in to comment.