Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional shards routing info in ShardSearchRequest #29533

Merged
merged 11 commits into from
Apr 26, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,17 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
onPhaseFailure(currentPhase, "all shards failed", cause);
} else {
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && shardFailures.get() != null ){
if (logger.isDebugEnabled()) {
final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
Throwable cause = shardSearchFailures.length == 0 ? null :
ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
shardSearchFailures.length, getName()), cause);
}
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
onPhaseFailure(currentPhase, "Partial shards failure", null);
} else {
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
Expand Down Expand Up @@ -271,14 +271,14 @@ public final SearchRequest getRequest() {

@Override
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {

ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && failures.length > 0){
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
}
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
}

return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
skippedOps.get(), buildTookInMillis(), failures, clusters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public final void run() throws IOException {
if (shardsIts.size() > 0) {
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
assert success;
assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.allowPartialSearchResults() == false) {
final StringBuilder missingShards = new StringBuilder();
Expand All @@ -140,7 +140,7 @@ public final void run() throws IOException {
final SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.size() == 0) {
if(missingShards.length() >0 ){
missingShards.append(", ");
missingShards.append(", ");
}
missingShards.append(shardRoutings.shardId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Counter;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.search.Queries;
Expand Down Expand Up @@ -91,6 +93,7 @@ final class DefaultSearchContext extends SearchContext {
private final Engine.Searcher engineSearcher;
private final BigArrays bigArrays;
private final IndexShard indexShard;
private final ClusterService clusterService;
private final IndexService indexService;
private final ContextIndexSearcher searcher;
private final DfsSearchResult dfsResult;
Expand Down Expand Up @@ -120,6 +123,7 @@ final class DefaultSearchContext extends SearchContext {
// filter for sliced scroll
private SliceBuilder sliceBuilder;
private SearchTask task;
private final Version minNodeVersion;


/**
Expand Down Expand Up @@ -152,9 +156,10 @@ final class DefaultSearchContext extends SearchContext {
private final QueryShardContext queryShardContext;
private FetchPhase fetchPhase;

DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, Engine.Searcher engineSearcher,
IndexService indexService, IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter,
TimeValue timeout, FetchPhase fetchPhase, String clusterAlias) {
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter, TimeValue timeout,
FetchPhase fetchPhase, String clusterAlias, Version minNodeVersion) {
this.id = id;
this.request = request;
this.fetchPhase = fetchPhase;
Expand All @@ -168,9 +173,11 @@ final class DefaultSearchContext extends SearchContext {
this.fetchResult = new FetchSearchResult(id, shardTarget);
this.indexShard = indexShard;
this.indexService = indexService;
this.clusterService = clusterService;
this.searcher = new ContextIndexSearcher(engineSearcher, indexService.cache().query(), indexShard.getQueryCachingPolicy());
this.timeEstimateCounter = timeEstimateCounter;
this.timeout = timeout;
this.minNodeVersion = minNodeVersion;
queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher.getIndexReader(), request::nowInMillis,
clusterAlias);
queryShardContext.setTypes(request.types());
Expand Down Expand Up @@ -278,8 +285,7 @@ && new NestedHelper(mapperService()).mightMatchNestedDocs(query)
}

if (sliceBuilder != null) {
filters.add(sliceBuilder.toFilter(queryShardContext, shardTarget().getShardId().getId(),
queryShardContext.getIndexSettings().getNumberOfShards()));
filters.add(sliceBuilder.toFilter(clusterService, request, queryShardContext, minNodeVersion));
}

if (filters.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,8 @@ private DefaultSearchContext createSearchContext(ShardSearchRequest request, Tim
Engine.Searcher engineSearcher = indexShard.acquireSearcher("search");

final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase,
request.getClusterAlias());
engineSearcher, clusterService, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout,
fetchPhase, request.getClusterAlias(), clusterService.state().nodes().getMinNodeVersion());
boolean success = false;
try {
// we clone the query shard context here just for rewriting otherwise we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -74,7 +71,8 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
private Boolean requestCache;
private long nowInMillis;
private boolean allowPartialSearchResults;

private String routing;
private String preference;
private boolean profile;

ShardSearchLocalRequest() {
Expand All @@ -83,8 +81,8 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards,
AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias) {
this(shardId, numberOfShards, searchRequest.searchType(),
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost,
searchRequest.allowPartialSearchResults());
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost,
searchRequest.allowPartialSearchResults(), searchRequest.routing(), searchRequest.preference());
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
// at this stage. Any NPEs in the above are therefore an error in request preparation logic.
assert searchRequest.allowPartialSearchResults() != null;
Expand All @@ -102,7 +100,8 @@ public ShardSearchLocalRequest(ShardId shardId, String[] types, long nowInMillis
}

public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType searchType, SearchSourceBuilder source, String[] types,
Boolean requestCache, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults) {
Boolean requestCache, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults,
String routing, String preference) {
this.shardId = shardId;
this.numberOfShards = numberOfShards;
this.searchType = searchType;
Expand All @@ -112,6 +111,8 @@ public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType s
this.aliasFilter = aliasFilter;
this.indexBoost = indexBoost;
this.allowPartialSearchResults = allowPartialSearchResults;
this.routing = routing;
this.preference = preference;
}


Expand Down Expand Up @@ -169,18 +170,28 @@ public long nowInMillis() {
public Boolean requestCache() {
return requestCache;
}

@Override
public Boolean allowPartialSearchResults() {
return allowPartialSearchResults;
}


@Override
public Scroll scroll() {
return scroll;
}

@Override
public String routing() {
return routing;
}

@Override
public String preference() {
return preference;
}

@Override
public void setProfile(boolean profile) {
this.profile = profile;
Expand Down Expand Up @@ -225,6 +236,13 @@ protected void innerReadFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
allowPartialSearchResults = in.readOptionalBoolean();
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
routing = in.readOptionalString();
preference = in.readOptionalString();
} else {
routing = null;
preference = null;
}
}

protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException {
Expand All @@ -250,7 +268,12 @@ protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
out.writeOptionalBoolean(allowPartialSearchResults);
}

if (!asKey) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use == false?

if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeOptionalString(routing);
out.writeOptionalString(preference);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.search.internal;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -28,8 +29,6 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.AliasFilterParsingException;
Expand Down Expand Up @@ -68,11 +67,21 @@ public interface ShardSearchRequest {
long nowInMillis();

Boolean requestCache();

Boolean allowPartialSearchResults();

Scroll scroll();

/**
* Returns the routing of the original {@link SearchRequest}.
*/
String routing();

/**
* Returns the preference of the original {@link SearchRequest}.
*/
String preference();

/**
* Sets if this shard search needs to be profiled or not
* @param profile True if the shard should be profiled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.Scroll;
Expand Down Expand Up @@ -151,17 +148,27 @@ public long nowInMillis() {
public Boolean requestCache() {
return shardSearchLocalRequest.requestCache();
}

@Override
public Boolean allowPartialSearchResults() {
return shardSearchLocalRequest.allowPartialSearchResults();
}
}

@Override
public Scroll scroll() {
return shardSearchLocalRequest.scroll();
}

@Override
public String routing() {
return shardSearchLocalRequest.routing();
}

@Override
public String preference() {
return shardSearchLocalRequest.preference();
}

@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
Expand Down
Loading