Skip to content

Commit

Permalink
Add additional shards routing info in ShardSearchRequest (#29533)
Browse files Browse the repository at this point in the history
This commit propagates the preference and routing of the original SearchRequest in the ShardSearchRequest.
This information is then use to fix a bug in sliced scrolls when executed with a preference (or a routing).
Instead of computing the slice query from the total number of shards in the index, this commit computes this number from the number of shards per index that participates in the request.

Fixes #27550
  • Loading branch information
jimczi authored Apr 26, 2018
1 parent fadcce8 commit 8b8c0c0
Show file tree
Hide file tree
Showing 27 changed files with 605 additions and 284 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.transport.Transport;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -62,6 +64,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final long clusterStateVersion;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
private final Map<String, Set<String>> indexRoutings;
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicInteger successfulOps = new AtomicInteger();
Expand All @@ -72,6 +75,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
Expand All @@ -89,6 +93,7 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS
this.clusterStateVersion = clusterStateVersion;
this.concreteIndexBoosts = concreteIndexBoosts;
this.aliasFilter = aliasFilter;
this.indexRoutings = indexRoutings;
this.results = resultConsumer;
this.clusters = clusters;
}
Expand Down Expand Up @@ -128,17 +133,17 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
onPhaseFailure(currentPhase, "all shards failed", cause);
} else {
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && shardFailures.get() != null ){
if (logger.isDebugEnabled()) {
final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
Throwable cause = shardSearchFailures.length == 0 ? null :
ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
shardSearchFailures.length, getName()), cause);
}
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
Expand Down Expand Up @@ -271,14 +276,14 @@ public final SearchRequest getRequest() {

@Override
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {

ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && failures.length > 0){
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
}
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
}

return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
skippedOps.get(), buildTookInMillis(), failures, clusters);
}
Expand Down Expand Up @@ -318,8 +323,11 @@ public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIter
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
String indexName = shardIt.shardId().getIndex().getName();
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
.toArray(new String[0]);
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias);
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias, routings);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.transport.Transport;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand All @@ -47,6 +48,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
CanMatchPreFilterSearchPhase(Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
Expand All @@ -56,9 +58,9 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
* We set max concurrent shard requests to the number of shards to otherwise avoid deep recursing that would occur if the local node
* is the coordinating node for the query, holds all the shards for the request, and there are a lot of shards.
*/
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request,
listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(),
clusters);
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public final void run() throws IOException {
if (shardsIts.size() > 0) {
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
assert success;
assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.allowPartialSearchResults() == false) {
final StringBuilder missingShards = new StringBuilder();
Expand All @@ -140,7 +140,7 @@ public final void run() throws IOException {
final SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.size() == 0) {
if(missingShards.length() >0 ){
missingShards.append(", ");
missingShards.append(", ");
}
missingShards.append(shardRoutings.shardId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.transport.Transport;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;

Expand All @@ -37,11 +38,13 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction

SearchDfsQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService,
final BiFunction<String, String, Transport.Connection> nodeIdToConnection, final Map<String, AliasFilter> aliasFilter,
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
final Map<String, Float> concreteIndexBoosts, final Map<String, Set<String>> indexRoutings,
final SearchPhaseController searchPhaseController, final Executor executor,
final SearchRequest request, final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
final long clusterStateVersion, final SearchTask task, SearchResponse.Clusters clusters) {
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener,
shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(), clusters);
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.transport.Transport;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;

Expand All @@ -37,13 +38,14 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se

SearchQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService,
final BiFunction<String, String, Transport.Connection> nodeIdToConnection, final Map<String, AliasFilter> aliasFilter,
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
final Map<String, Float> concreteIndexBoosts, final Map<String, Set<String>> indexRoutings,
final SearchPhaseController searchPhaseController, final Executor executor,
final SearchRequest request, final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
long clusterStateVersion, SearchTask task, SearchResponse.Clusters clusters) {
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()),
request.getMaxConcurrentShardRequests(), clusters);
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
searchPhaseController.newSearchPhaseResults(request, shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters);
this.searchPhaseController = searchPhaseController;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
searchRequest.indices());
routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
String[] concreteIndices = new String[indices.length];
for (int i = 0; i < indices.length; i++) {
concreteIndices[i] = indices[i].getName();
Expand Down Expand Up @@ -350,7 +351,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
}
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards, clusters).start();
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
}

private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupShardsIterator<SearchShardIterator> shardIterators) {
Expand Down Expand Up @@ -380,17 +381,20 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque
GroupShardsIterator<SearchShardIterator> shardIterators,
SearchTimeProvider timeProvider,
BiFunction<String, String, Transport.Connection> connectionLookup,
long clusterStateVersion, Map<String, AliasFilter> aliasFilter,
long clusterStateVersion,
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
ActionListener<SearchResponse> listener, boolean preFilter,
Map<String, Set<String>> indexRoutings,
ActionListener<SearchResponse> listener,
boolean preFilter,
SearchResponse.Clusters clusters) {
Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
if (preFilter) {
return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, executor, searchRequest, listener, shardIterators,
aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task, (iter) -> {
AbstractSearchAsyncAction action = searchAsyncAction(task, searchRequest, iter, timeProvider, connectionLookup,
clusterStateVersion, aliasFilter, concreteIndexBoosts, listener, false, clusters);
clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
return new SearchPhase(action.getName()) {
@Override
public void run() throws IOException {
Expand All @@ -403,14 +407,14 @@ public void run() throws IOException {
switch (searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task, clusters);
aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
shardIterators, timeProvider, clusterStateVersion, task, clusters);
break;
case QUERY_AND_FETCH:
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task, clusters);
aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
shardIterators, timeProvider, clusterStateVersion, task, clusters);
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/**
* A simple {@link ShardsIterator} that iterates a list or sub-list of
* {@link ShardRouting shard routings}.
* {@link ShardRouting shard indexRoutings}.
*/
public class PlainShardsIterator implements ShardsIterator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

/**
* {@link ShardRouting} immutably encapsulates information about shard
* routings like id, state, version, etc.
* indexRoutings like id, state, version, etc.
*/
public final class ShardRouting implements Writeable, ToXContentObject {

Expand Down Expand Up @@ -477,7 +477,7 @@ public boolean isRelocationTargetOf(ShardRouting other) {
"ShardRouting is a relocation target but current node id isn't equal to source relocating node. This [" + this + "], other [" + other + "]";

assert b == false || this.shardId.equals(other.shardId) :
"ShardRouting is a relocation target but both routings are not of the same shard id. This [" + this + "], other [" + other + "]";
"ShardRouting is a relocation target but both indexRoutings are not of the same shard id. This [" + this + "], other [" + other + "]";

assert b == false || this.primary == other.primary :
"ShardRouting is a relocation target but primary flag is different. This [" + this + "], target [" + other + "]";
Expand All @@ -504,7 +504,7 @@ public boolean isRelocationSourceOf(ShardRouting other) {
"ShardRouting is a relocation source but relocating node isn't equal to other's current node. This [" + this + "], other [" + other + "]";

assert b == false || this.shardId.equals(other.shardId) :
"ShardRouting is a relocation source but both routings are not of the same shard. This [" + this + "], target [" + other + "]";
"ShardRouting is a relocation source but both indexRoutings are not of the same shard. This [" + this + "], target [" + other + "]";

assert b == false || this.primary == other.primary :
"ShardRouting is a relocation source but primary flag is different. This [" + this + "], target [" + other + "]";
Expand Down
Loading

0 comments on commit 8b8c0c0

Please sign in to comment.