Skip to content

Commit

Permalink
Search & Count: Add option to early terminate doc collection
Browse files Browse the repository at this point in the history
Allow users to control document collection termination, if a specified terminate_after number is
set. Upon setting the newly added parameter, the response will include a boolean terminated_early
flag, indicating if the document collection for any shard terminated early.

closes elastic#6876
  • Loading branch information
areek committed Jul 23, 2014
1 parent 66825ac commit 5487c56
Show file tree
Hide file tree
Showing 32 changed files with 600 additions and 47 deletions.
6 changes: 6 additions & 0 deletions docs/reference/search/count.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ query.

|default_operator |The default operator to be used, can be `AND` or
`OR`. Defaults to `OR`.

|coming[1.4.0] terminate_after |The maximum count for each shard, upon
reaching which the query execution will terminate early.
If set, the response will have a boolean field `terminated_early` to
indicate whether the query execution has actually terminated_early.
Defaults to no terminate_after.
|=======================================================================

[float]
Expand Down
6 changes: 6 additions & 0 deletions docs/reference/search/request-body.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ that point when expired. Defaults to no timeout.
`query_and_fetch`. Defaults to `query_then_fetch`. See
<<search-request-search-type,_Search Type_>> for
more details on the different types of search that can be performed.

|coming[1.4.0] `terminate_after` |The maximum number of documents to collect for
each shard, upon reaching which the query execution will terminate early.
If set, the response will have a boolean field `terminated_early` to
indicate whether the query execution has actually terminated_early.
Defaults to no terminate_after.
|=======================================================================

Out of the above, the `search_type` is the one that can not be passed
Expand Down
6 changes: 6 additions & 0 deletions docs/reference/search/uri-request.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ scores and return them as part of each hit.
within the specified time value and bail with the hits accumulated up to
that point when expired. Defaults to no timeout.

|coming[1.4.0] `terminate_after` |The maximum number of documents to collect for
each shard, upon reaching which the query execution will terminate early.
If set, the response will have a boolean field `terminated_early` to
indicate whether the query execution has actually terminated_early.
Defaults to no terminate_after.

|`from` |The starting from index of the hits to return. Defaults to `0`.

|`size` |The number of hits to return. Defaults to `10`.
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/org/elasticsearch/action/count/CountRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.action.count;

import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
Expand All @@ -34,6 +36,8 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;

import static org.elasticsearch.search.internal.SearchContext.DEFAULT_TERMINATE_AFTER;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -67,6 +71,7 @@ public class CountRequest extends BroadcastOperationRequest<CountRequest> {
private String[] types = Strings.EMPTY_ARRAY;

long nowInMillis;
private int terminateAfter = DEFAULT_TERMINATE_AFTER;

CountRequest() {
}
Expand Down Expand Up @@ -221,6 +226,21 @@ public String preference() {
return this.preference;
}

/**
* Upon reaching <code>terminateAfter</code> counts, the count request will early terminate
*/
public CountRequest terminateAfter(int terminateAfterCount) {
if (terminateAfterCount <= 0) {
throw new ElasticsearchIllegalArgumentException("terminateAfter must be > 0");
}
this.terminateAfter = terminateAfterCount;
return this;
}

public int terminateAfter() {
return this.terminateAfter;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -230,6 +250,10 @@ public void readFrom(StreamInput in) throws IOException {
sourceUnsafe = false;
source = in.readBytesReference();
types = in.readStringArray();

if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
terminateAfter = in.readVInt();
}
}

@Override
Expand All @@ -240,6 +264,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(preference);
out.writeBytesReference(source);
out.writeStringArray(types);

if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeVInt(terminateAfter);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ public CountRequestBuilder setQuery(QueryBuilder queryBuilder) {
return this;
}

/**
* The query binary to execute
*/
public CountRequestBuilder setQuery(BytesReference queryBinary) {
sourceBuilder().setQuery(queryBinary);
return this;
}

/**
* The source to execute.
*/
Expand All @@ -115,6 +123,11 @@ public CountRequestBuilder setSource(byte[] querySource) {
return this;
}

public CountRequestBuilder setTerminateAfter(int terminateAfter) {
request().terminateAfter(terminateAfter);
return this;
}

@Override
protected void doExecute(ActionListener<CountResponse> listener) {
if (sourceBuilder != null) {
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/org/elasticsearch/action/count/CountResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.count;

import org.elasticsearch.Version;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -33,15 +34,17 @@
*/
public class CountResponse extends BroadcastOperationResponse {

private boolean terminatedEarly;
private long count;

CountResponse() {

}

CountResponse(long count, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
CountResponse(long count, boolean hasTerminatedEarly, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.count = count;
this.terminatedEarly = hasTerminatedEarly;
}

/**
Expand All @@ -51,6 +54,13 @@ public long getCount() {
return count;
}

/**
* True if the request has been terminated early due to enough count
*/
public boolean terminatedEarly() {
return this.terminatedEarly;
}

public RestStatus status() {
if (getFailedShards() == 0) {
if (getSuccessfulShards() == 0 && getTotalShards() > 0) {
Expand All @@ -76,11 +86,17 @@ public RestStatus status() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
count = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
terminatedEarly = in.readBoolean();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(count);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(terminatedEarly);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.count;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
Expand All @@ -28,12 +29,15 @@

import java.io.IOException;

import static org.elasticsearch.search.internal.SearchContext.DEFAULT_TERMINATE_AFTER;

/**
* Internal count request executed directly against a specific index shard.
*/
class ShardCountRequest extends BroadcastShardOperationRequest {

private float minScore;
private int terminateAfter;

private BytesReference querySource;

Expand All @@ -55,6 +59,7 @@ public ShardCountRequest(String index, int shardId, @Nullable String[] filtering
this.types = request.types();
this.filteringAliases = filteringAliases;
this.nowInMillis = request.nowInMillis;
this.terminateAfter = request.terminateAfter();
}

public float minScore() {
Expand All @@ -77,6 +82,10 @@ public long nowInMillis() {
return this.nowInMillis;
}

public int terminateAfter() {
return this.terminateAfter;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -99,6 +108,12 @@ public void readFrom(StreamInput in) throws IOException {
}
}
nowInMillis = in.readVLong();

if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
terminateAfter = in.readVInt();
} else {
terminateAfter = DEFAULT_TERMINATE_AFTER;
}
}

@Override
Expand All @@ -121,5 +136,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(0);
}
out.writeVLong(nowInMillis);

if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeVInt(terminateAfter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.count;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -33,29 +34,41 @@
class ShardCountResponse extends BroadcastShardOperationResponse {

private long count;
private boolean terminatedEarly;

ShardCountResponse() {

}

public ShardCountResponse(String index, int shardId, long count) {
public ShardCountResponse(String index, int shardId, long count, boolean terminatedEarly) {
super(index, shardId);
this.count = count;
this.terminatedEarly = terminatedEarly;
}

public long getCount() {
return this.count;
}

public boolean terminatedEarly() {
return this.terminatedEarly;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
count = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
terminatedEarly = in.readBoolean();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(count);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(terminatedEarly);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.atomic.AtomicReferenceArray;

import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.search.internal.SearchContext.DEFAULT_TERMINATE_AFTER;

/**
*
Expand Down Expand Up @@ -139,6 +140,7 @@ protected CountResponse newResponse(CountRequest request, AtomicReferenceArray s
int successfulShards = 0;
int failedShards = 0;
long count = 0;
boolean terminatedEarly = false;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
Expand All @@ -152,10 +154,13 @@ protected CountResponse newResponse(CountRequest request, AtomicReferenceArray s
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
count += ((ShardCountResponse) shardResponse).getCount();
if (((ShardCountResponse) shardResponse).terminatedEarly()) {
terminatedEarly = true;
}
successfulShards++;
}
}
return new CountResponse(count, shardsResponses.length(), successfulShards, failedShards, shardFailures);
return new CountResponse(count, terminatedEarly, shardsResponses.length(), successfulShards, failedShards, shardFailures);
}

@Override
Expand Down Expand Up @@ -186,10 +191,20 @@ protected ShardCountResponse shardOperation(ShardCountRequest request) throws El
QueryParseContext.removeTypes();
}
}
final boolean hasTerminateAfterCount = request.terminateAfter() != DEFAULT_TERMINATE_AFTER;
boolean terminatedEarly = false;
context.preProcess();
try {
long count = Lucene.count(context.searcher(), context.query());
return new ShardCountResponse(request.index(), request.shardId(), count);
long count;
if (hasTerminateAfterCount) {
final Lucene.EarlyTerminatingCollector countCollector =
Lucene.createCountBasedEarlyTerminatingCollector(request.terminateAfter());
terminatedEarly = Lucene.countWithEarlyTermination(context.searcher(), context.query(), countCollector);
count = countCollector.count();
} else {
count = Lucene.count(context.searcher(), context.query());
}
return new ShardCountResponse(request.index(), request.shardId(), count, terminatedEarly);
} catch (Exception e) {
throw new QueryPhaseExecutionException(context, "failed to execute count", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ public SearchRequestBuilder setTimeout(String timeout) {
return this;
}

/**
* An optional document count, upon collecting which the search
* query will early terminate
*/
public SearchRequestBuilder setTerminateAfter(int terminateAfter) {
sourceBuilder().terminateAfter(terminateAfter);
return this;
}

/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
Expand Down
Loading

0 comments on commit 5487c56

Please sign in to comment.