Skip to content

Commit

Permalink
Add automatic tiebreaker for search requests that use a PIT (#68833)
Browse files Browse the repository at this point in the history
This PR adds the special `_shard_doc` sort tiebreaker automatically to any
search requests that use a PIT. Adding the tiebreaker ensures that any
sorted query can be paginated consistently within a PIT.

Closes #56828
  • Loading branch information
jimczi authored Feb 17, 2021
1 parent dde3df2 commit ab7dd46
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,15 @@ To get the first page of results, submit a search request with a `sort`
argument. If using a PIT, specify the PIT ID in the `pit.id` parameter and omit
the target data stream or index from the request path.

IMPORTANT: We recommend you include a tiebreaker field in your `sort`. This
tiebreaker field should contain a unique value for each document. If you don't
include a tiebreaker field, your paged results could miss or duplicate hits.
IMPORTANT: All PIT search requests add an implicit sort tiebreaker field called `_shard_doc`,
which can also be provided explicitly.
If you cannot use a PIT, we recommend that you include a tiebreaker field
in your `sort`. This tiebreaker field should contain a unique value for each document.
If you don't include a tiebreaker field, your paged results could miss or duplicate hits.

NOTE: Search after requests have optimizations that make them faster when the sort
order is `_shard_doc` and total hits are not tracked. If you want to iterate over all documents regardless of the
order, this is the most efficient option.

[source,console]
----
Expand All @@ -90,18 +96,47 @@ GET /_search
"keep_alive": "1m"
},
"sort": [ <2>
{"@timestamp": "asc"},
{"tie_breaker_id": "asc"}
{"@timestamp": "asc"}
]
}
----
// TEST[catch:missing]

<1> PIT ID for the search.
<2> Sorts hits for the search.
<2> Sorts hits for the search with an implicit tiebreak on `_shard_doc` ascending.

The search response includes an array of `sort` values for each hit. If you used
a PIT, the response's `pit_id` parameter contains an updated PIT ID.
a PIT, a tiebreaker is included as the last `sort` values for each hit.
This tiebreaker called `_shard_doc` is added automically on every search requests that use a PIT.
The `_shard_doc` value is the combination of the shard index within the PIT and the Lucene's internal doc ID,
it is unique per document and constant within a PIT.
You can also add the tiebreaker explicitly in the search request to customize the order:

[source,console]
----
GET /_search
{
"size": 10000,
"query": {
"match" : {
"user.id" : "elkbee"
}
},
"pit": {
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
},
"sort": [ <2>
{"@timestamp": "asc"},
{"_shard_doc": "desc"}
]
}
----
// TEST[catch:missing]

<1> PIT ID for the search.
<2> Sorts hits for the search with an explicit tiebreak on `_shard_doc` descending.


[source,console-result]
----
Expand All @@ -122,7 +157,7 @@ a PIT, the response's `pit_id` parameter contains an updated PIT ID.
"_source" : ...,
"sort" : [ <2>
4098435132000,
"FaslK3QBySSL_rrj9zM5"
4294967298 <3>
]
}
]
Expand All @@ -133,9 +168,10 @@ a PIT, the response's `pit_id` parameter contains an updated PIT ID.

<1> Updated `id` for the point in time.
<2> Sort values for the last returned hit.
<3> The tiebreaker value, unique per document within the `pit_id`.

To get the next page of results, rerun the previous search using the last hit's
sort values as the `search_after` argument. If using a PIT, use the latest PIT
sort values (including the tiebreaker) as the `search_after` argument. If using a PIT, use the latest PIT
ID in the `pit.id` parameter. The search's `query` and `sort` arguments must
remain unchanged. If provided, the `from` argument must be `0` (default) or `-1`.

Expand All @@ -154,19 +190,20 @@ GET /_search
"keep_alive": "1m"
},
"sort": [
{"@timestamp": "asc"},
{"tie_breaker_id": "asc"}
{"@timestamp": "asc"}
],
"search_after": [ <2>
4098435132000,
"FaslK3QBySSL_rrj9zM5"
]
4294967298
],
"track_total_hits": false <3>
}
----
// TEST[catch:missing]

<1> PIT ID returned by the previous search.
<2> Sort values from the previous search's last hit.
<3> Disable the tracking of total hits to speed up pagination.

You can repeat this process to get additional pages of results. If using a PIT,
you can extend the PIT's retention period using the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.ShardDocSortField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand All @@ -48,7 +53,7 @@
* @see org.elasticsearch.client.Client#search(SearchRequest)
* @see SearchResponse
*/
public class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {
public class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable, Rewriteable<SearchRequest> {

public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

Expand Down Expand Up @@ -641,6 +646,35 @@ public int resolveTrackTotalHitsUpTo() {
return resolveTrackTotalHitsUpTo(scroll, source);
}

@Override
public SearchRequest rewrite(QueryRewriteContext ctx) throws IOException {
if (source == null) {
return this;
}

SearchSourceBuilder source = this.source.rewrite(ctx);
boolean hasChanged = source != this.source;

// add a sort tiebreaker for PIT search requests if not explicitly set
Object[] searchAfter = source.searchAfter();
if (source.pointInTimeBuilder() != null
&& source.sorts() != null
&& source.sorts().isEmpty() == false
// skip the tiebreaker if it is not provided in the search after values
&& (searchAfter == null || searchAfter.length == source.sorts().size()+1)) {
SortBuilder<?> lastSort = source.sorts().get(source.sorts().size() - 1);
if (lastSort instanceof FieldSortBuilder == false
|| FieldSortBuilder.SHARD_DOC_FIELD_NAME.equals(((FieldSortBuilder) lastSort).getFieldName()) == false) {
List<SortBuilder<?>> newSorts = new ArrayList<>(source.sorts());
newSorts.add(SortBuilders.pitTiebreaker().unmappedType("long"));
source = source.shallowCopy().sort(newSorts);
hasChanged = true;
}
}

return hasChanged ? new SearchRequest(this).source(source) : this;
}

public static int resolveTrackTotalHitsUpTo(Scroll scroll, SearchSourceBuilder source) {
if (scroll != null) {
// no matter what the value of track_total_hits is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,42 +259,39 @@ boolean buildPointInTimeFromSearchResults() {
}, listener);
}

private void executeRequest(Task task, SearchRequest searchRequest,
SearchAsyncActionProvider searchAsyncActionProvider, ActionListener<SearchResponse> listener) {
private void executeRequest(Task task,
SearchRequest original,
SearchAsyncActionProvider searchAsyncActionProvider,
ActionListener<SearchResponse> listener) {
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider =
new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
searchRequest.source(source);
}
new SearchTimeProvider(original.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
ActionListener<SearchRequest> rewriteListener = ActionListener.wrap(rewritten -> {
final SearchContextId searchContext;
final Map<String, OriginalIndices> remoteClusterIndices;
if (searchRequest.pointInTimeBuilder() != null) {
searchContext = searchRequest.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
if (rewritten.pointInTimeBuilder() != null) {
searchContext = rewritten.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, rewritten.indicesOptions());
} else {
searchContext = null;
remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(), searchRequest.indices());
remoteClusterIndices = remoteClusterService.groupIndices(rewritten.indicesOptions(), rewritten.indices());
}
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
final ClusterState clusterState = clusterService.state();
if (remoteClusterIndices.isEmpty()) {
executeLocalSearch(
task, timeProvider, searchRequest, localIndices, clusterState, listener, searchContext, searchAsyncActionProvider);
task, timeProvider, rewritten, localIndices, clusterState, listener, searchContext, searchAsyncActionProvider);
} else {
if (shouldMinimizeRoundtrips(searchRequest)) {
if (shouldMinimizeRoundtrips(rewritten)) {
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
ccsRemoteReduce(parentTaskId, searchRequest, localIndices, remoteClusterIndices, timeProvider,
searchService.aggReduceContextBuilder(searchRequest),
ccsRemoteReduce(parentTaskId, rewritten, localIndices, remoteClusterIndices, timeProvider,
searchService.aggReduceContextBuilder(rewritten),
remoteClusterService, threadPool, listener,
(r, l) -> executeLocalSearch(
task, timeProvider, r, localIndices, clusterState, l, searchContext, searchAsyncActionProvider));
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
collectSearchShards(rewritten.indicesOptions(), rewritten.preference(), rewritten.routing(),
skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
ActionListener.wrap(
searchShardsResponses -> {
Expand All @@ -305,7 +302,7 @@ private void executeRequest(Task task, SearchRequest searchRequest,
if (searchContext != null) {
remoteAliasFilters = searchContext.aliasFilter();
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses,
searchContext, searchRequest.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
searchContext, rewritten.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
} else {
remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices,
Expand All @@ -314,7 +311,7 @@ private void executeRequest(Task task, SearchRequest searchRequest,
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() + localClusters;
int successfulClusters = searchShardsResponses.size() + localClusters;
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators,
executeSearch((SearchTask) task, timeProvider, rewritten, localIndices, remoteShardIterators,
clusterNodeLookup, clusterState, remoteAliasFilters, listener,
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
searchContext, searchAsyncActionProvider);
Expand All @@ -323,12 +320,8 @@ private void executeRequest(Task task, SearchRequest searchRequest,
}
}
}, listener::onFailure);
if (searchRequest.source() == null) {
rewriteListener.onResponse(searchRequest.source());
} else {
Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener);
}
Rewriteable.rewriteAndFetch(original, searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener);
}

static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,15 @@ public SearchSourceBuilder sort(SortBuilder<?> sort) {
}

/**
* Gets the bytes representing the sort builders for this request.
* Sets the sort builders for this request.
*/
public SearchSourceBuilder sort(List<SortBuilder<?>> sorts) {
this.sorts = sorts;
return this;
}

/**
* Gets the sort builders for this request.
*/
public List<SortBuilder<?>> sorts() {
return sorts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,17 @@ public void testPITTiebreak() throws Exception {
try {
for (int size = 1; size <= numIndex; size++) {
SortOrder order = randomBoolean() ? SortOrder.ASC : SortOrder.DESC;

assertPagination(new PointInTimeBuilder(pit), expectedNumDocs, size,
SortBuilders.pitTiebreaker().order(order));

assertPagination(new PointInTimeBuilder(pit), expectedNumDocs, size,
SortBuilders.scoreSort());
assertPagination(new PointInTimeBuilder(pit), expectedNumDocs, size,
SortBuilders.scoreSort(), SortBuilders.pitTiebreaker().order(order));

assertPagination(new PointInTimeBuilder(pit), expectedNumDocs, size,
SortBuilders.fieldSort("value"));
assertPagination(new PointInTimeBuilder(pit), expectedNumDocs, size,
SortBuilders.fieldSort("value"), SortBuilders.pitTiebreaker().order(order));
}
Expand All @@ -406,18 +415,21 @@ public void testPITTiebreak() throws Exception {
}
}

private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int size, SortBuilder<?>... sort) throws Exception {
private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int size, SortBuilder<?>... sorts) throws Exception {
Set<String> seen = new HashSet<>();
SearchRequestBuilder builder = client().prepareSearch()
.setSize(size)
.setPointInTime(pit);
for (SortBuilder<?> sort : sorts) {
builder.addSort(sort);
}
final SearchRequest searchRequest = builder.request().rewrite(null);

final int[] reverseMuls = new int[sort.length];
for (int i = 0; i < sort.length; i++) {
builder.addSort(sort[i]);
reverseMuls[i] = sort[i].order() == SortOrder.ASC ? 1 : -1;
final List<SortBuilder<?>> expectedSorts = searchRequest.source().sorts();
final int[] reverseMuls = new int[expectedSorts.size()];
for (int i = 0; i < expectedSorts.size(); i++) {
reverseMuls[i] = expectedSorts.get(i).order() == SortOrder.ASC ? 1 : -1;
}
final SearchRequest searchRequest = builder.request();
SearchResponse response = client().search(searchRequest).get();
Object[] lastSortValues = null;
while (response.getHits().getHits().length > 0) {
Expand All @@ -426,7 +438,7 @@ private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int s
assertTrue(seen.add(hit.getIndex() + hit.getId()));

if (lastHitSortValues != null) {
for (int i = 0; i < sort.length; i++) {
for (int i = 0; i < expectedSorts.size(); i++) {
Comparable value = (Comparable) hit.getRawSortValues()[i];
int cmp = value.compareTo(lastHitSortValues[i]) * reverseMuls[i];
if (cmp != 0) {
Expand All @@ -440,7 +452,7 @@ private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int s
int len = response.getHits().getHits().length;
SearchHit last = response.getHits().getHits()[len - 1];
if (lastSortValues != null) {
for (int i = 0; i < sort.length; i++) {
for (int i = 0; i < expectedSorts.size(); i++) {
Comparable value = (Comparable) last.getSortValues()[i];
int cmp = value.compareTo(lastSortValues[i]) * reverseMuls[i];
if (cmp != 0) {
Expand All @@ -449,7 +461,7 @@ private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int s
}
}
}
assertThat(last.getSortValues().length, equalTo(sort.length));
assertThat(last.getSortValues().length, equalTo(expectedSorts.size()));
lastSortValues = last.getSortValues();
searchRequest.source().searchAfter(last.getSortValues());
response = client().search(searchRequest).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,8 @@
- length: {hits.hits: 1 }
- match: {hits.hits.0._index: "/\\.ds-simple-data-stream1-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
- match: {hits.hits.0._id: "123" }
- match: {hits.hits.0.sort: [22, 123] }
- match: {hits.hits.0.sort.0: 22}
- match: {hits.hits.0.sort.1: 123}

- do:
search:
Expand All @@ -693,7 +694,8 @@
- length: {hits.hits: 1 }
- match: {hits.hits.0._index: "/\\.ds-simple-data-stream1-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
- match: {hits.hits.0._id: "5" }
- match: {hits.hits.0.sort: [18, 5] }
- match: {hits.hits.0.sort.0: 18}
- match: {hits.hits.0.sort.1: 5}

- do:
search:
Expand All @@ -712,7 +714,8 @@
- length: {hits.hits: 1 }
- match: {hits.hits.0._index: "/\\.ds-simple-data-stream1-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
- match: {hits.hits.0._id: "1" }
- match: {hits.hits.0.sort: [18, 1] }
- match: {hits.hits.0.sort.0: 18}
- match: {hits.hits.0.sort.1: 1}

- do:
search:
Expand Down
Loading

0 comments on commit ab7dd46

Please sign in to comment.