Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for allow_partial_search_results in PIT #111516

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0657b1a
iter
pmpailis Jul 30, 2024
46f84b5
Merge remote-tracking branch 'origin/main' into adding_support_for_pa…
pmpailis Jul 30, 2024
92dee66
iter
pmpailis Jul 30, 2024
1bb9237
iter
pmpailis Jul 31, 2024
ad1c98f
iter
pmpailis Aug 1, 2024
e62c602
iter
pmpailis Aug 1, 2024
81948e1
iter
pmpailis Aug 1, 2024
a75295f
iter
pmpailis Aug 1, 2024
fed09d0
Merge branch 'main' into adding_support_for_partial_search_results_in…
elasticmachine Aug 1, 2024
b88224f
Merge branch 'main' into adding_support_for_partial_search_results_in…
pmpailis Aug 2, 2024
9061b2f
Update docs/changelog/111516.yaml
pmpailis Aug 2, 2024
c73f36a
addressing PR comments - encoding failed shards when creating PIT
pmpailis Aug 5, 2024
1edf6df
updating transportversion
pmpailis Aug 8, 2024
2f6addc
Add PIT's missing shards as failures in subsequent search request.
jimczi Aug 9, 2024
cf9ad1e
Merge remote-tracking branch 'upstream/main' into pit_allow_partial_r…
jimczi Aug 9, 2024
f1e47b5
apply review comment
jimczi Aug 13, 2024
b151ab1
Apply review comment
jimczi Aug 15, 2024
5c831c5
Merge remote-tracking branch 'upstream/main' into adding_support_for_…
jimczi Aug 15, 2024
c55aabc
fix conflict
jimczi Aug 15, 2024
2bee58c
fix another conflict
jimczi Aug 15, 2024
e45f21c
Merge remote-tracking branch 'upstream/main' into adding_support_for_…
jimczi Aug 15, 2024
88c22dc
missing import
jimczi Aug 15, 2024
ad75a74
fix format
jimczi Aug 15, 2024
07d9d36
Merge branch 'main' into adding_support_for_partial_search_results_in…
pmpailis Aug 26, 2024
91b4d8c
fixing test after updates
pmpailis Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/reference/search/point-in-time-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,36 @@ IMPORTANT: The open point in time request and each subsequent search request can
return different `id`; thus always use the most recently received `id` for the
next search request.


In addition to `keep_alive`, we can also define the `allow_partial_search_results` parameter
which specifies whether the <<point-in-time-api,point in time (PIT)>> should tolerate unavailable shards or
<<shard-failures,shard failures>> when initially creating the point in time. If `true`, the point in time will be
created with just the available shards, otherwise the operation will fail if there is at
least one shard that is unavailable. Defaults to `false`.

The PIT response now also contains a report on the total number of shards,
as well as how many of those were successful when creating the PIT.

[source,console]
--------------------------------------------------
POST /my-index-000001/_pit?keep_alive=1m&allow_partial_search_results=true
--------------------------------------------------
// TEST[setup:my_index]

[source,js]
--------------------------------------------------
{
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=",
"_shards": {
"total": 10,
"successful": 10,
"skipped": 0,
"failed": 0
}
}
--------------------------------------------------
// NOTCONSOLE

[[point-in-time-keep-alive]]
==== Keeping point in time alive
The `keep_alive` parameter, which is passed to a open point in time request and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ The search response includes an array of `sort` values for each hit:
"_id" : "654322",
"_score" : null,
"_source" : ...,
"sort" : [
"sort" : [
1463538855,
"654322"
"654322"
]
},
{
Expand All @@ -118,7 +118,7 @@ The search response includes an array of `sort` values for each hit:
"_source" : ...,
"sort" : [ <1>
1463538857,
"654323"
"654323"
]
}
]
Expand Down Expand Up @@ -150,7 +150,7 @@ GET twitter/_search
--------------------------------------------------
//TEST[continued]

Repeat this process by updating the `search_after` array every time you retrieve a
Repeat this process by updating the `search_after` array every time you retrieve a
new page of results. If a <<near-real-time,refresh>> occurs between these requests,
the order of your results may change, causing inconsistent results across pages. To
prevent this, you can create a <<point-in-time-api,point in time (PIT)>> to
Expand All @@ -167,10 +167,12 @@ The API returns a PIT ID.
[source,console-result]
----
{
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==",
"_shards": ...
}
----
// TESTRESPONSE[s/"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="/"id": $body.id/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": "$body._shards"/]

To get the first page of results, submit a search request with a `sort`
argument. If using a PIT, specify the PIT ID in the `pit.id` parameter and omit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -40,6 +43,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand All @@ -57,8 +61,10 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;

public class PointInTimeIT extends ESIntegTestCase {
Expand All @@ -84,7 +90,7 @@ public void testBasic() {
prepareIndex("test").setId(id).setSource("value", i).get();
}
refresh("test");
BytesReference pitId = openPointInTime(new String[] { "test" }, TimeValue.timeValueMinutes(2));
BytesReference pitId = openPointInTime(new String[] { "test" }, TimeValue.timeValueMinutes(2)).getPointInTimeId();
assertResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pitId)), resp1 -> {
assertThat(resp1.pointInTimeId(), equalTo(pitId));
assertHitCount(resp1, numDocs);
Expand Down Expand Up @@ -130,7 +136,7 @@ public void testMultipleIndices() {
prepareIndex(index).setId(id).setSource("value", i).get();
}
refresh();
BytesReference pitId = openPointInTime(new String[] { "*" }, TimeValue.timeValueMinutes(2));
BytesReference pitId = openPointInTime(new String[] { "*" }, TimeValue.timeValueMinutes(2)).getPointInTimeId();
try {
int moreDocs = randomIntBetween(10, 50);
assertNoFailuresAndResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pitId)), resp -> {
Expand Down Expand Up @@ -212,7 +218,7 @@ public void testRelocation() throws Exception {
prepareIndex("test").setId(Integer.toString(i)).setSource("value", i).get();
}
refresh();
BytesReference pitId = openPointInTime(new String[] { "test" }, TimeValue.timeValueMinutes(2));
BytesReference pitId = openPointInTime(new String[] { "test" }, TimeValue.timeValueMinutes(2)).getPointInTimeId();
try {
assertNoFailuresAndResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pitId)), resp -> {
assertHitCount(resp, numDocs);
Expand Down Expand Up @@ -264,7 +270,7 @@ public void testPointInTimeNotFound() throws Exception {
prepareIndex("index").setId(id).setSource("value", i).get();
}
refresh();
BytesReference pit = openPointInTime(new String[] { "index" }, TimeValue.timeValueSeconds(5));
BytesReference pit = openPointInTime(new String[] { "index" }, TimeValue.timeValueSeconds(5)).getPointInTimeId();
assertNoFailuresAndResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pit)), resp1 -> {
assertHitCount(resp1, index1);
if (rarely()) {
Expand Down Expand Up @@ -305,7 +311,7 @@ public void testIndexNotFound() {
prepareIndex("index-2").setId(id).setSource("value", i).get();
}
refresh();
BytesReference pit = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueMinutes(2));
BytesReference pit = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueMinutes(2)).getPointInTimeId();
try {
assertNoFailuresAndResponse(
prepareSearch().setPointInTime(new PointInTimeBuilder(pit)),
Expand Down Expand Up @@ -348,7 +354,7 @@ public void testCanMatch() throws Exception {
assertAcked(prepareCreate("test").setSettings(settings).setMapping("""
{"properties":{"created_date":{"type": "date", "format": "yyyy-MM-dd"}}}"""));
ensureGreen("test");
BytesReference pitId = openPointInTime(new String[] { "test*" }, TimeValue.timeValueMinutes(2));
BytesReference pitId = openPointInTime(new String[] { "test*" }, TimeValue.timeValueMinutes(2)).getPointInTimeId();
try {
for (String node : internalCluster().nodesInclude("test")) {
for (IndexService indexService : internalCluster().getInstance(IndicesService.class, node)) {
Expand Down Expand Up @@ -415,7 +421,7 @@ public void testPartialResults() throws Exception {
prepareIndex(randomFrom("test-2")).setId(Integer.toString(i)).setSource("value", i).get();
}
refresh();
BytesReference pitId = openPointInTime(new String[] { "test-*" }, TimeValue.timeValueMinutes(2));
BytesReference pitId = openPointInTime(new String[] { "test-*" }, TimeValue.timeValueMinutes(2)).getPointInTimeId();
try {
assertNoFailuresAndResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pitId)), resp -> {
assertHitCount(resp, numDocs1 + numDocs2);
Expand Down Expand Up @@ -447,7 +453,7 @@ public void testPITTiebreak() throws Exception {
}
}
refresh("index-*");
BytesReference pit = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueHours(1));
BytesReference pit = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueHours(1)).getPointInTimeId();
try {
for (int size = 1; size <= numIndex; size++) {
SortOrder order = randomBoolean() ? SortOrder.ASC : SortOrder.DESC;
Expand Down Expand Up @@ -532,6 +538,156 @@ public void testOpenPITConcurrentShardRequests() throws Exception {
}
}

public void testMissingShardsWithPointInTime() throws Exception {
final Settings nodeAttributes = Settings.builder().put("node.attr.foo", "bar").build();
final String masterNode = internalCluster().startMasterOnlyNode(nodeAttributes);
List<String> dataNodes = internalCluster().startDataOnlyNodes(2, nodeAttributes);

final String index = "my_test_index";
// tried to have randomIntBetween(3, 10) but having more shards than 3 was taking forever and throwing timeouts
final int numShards = 3;
final int numReplicas = 0;
// create an index with numShards shards and 0 replicas
createIndex(
index,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas)
.put("index.routing.allocation.require.foo", "bar")
.build()
);

// index some documents
int numDocs = randomIntBetween(10, 50);
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
prepareIndex(index).setId(id).setSource("value", i).get();
}
refresh(index);

// create a PIT when all shards are present
OpenPointInTimeResponse pointInTimeResponse = openPointInTime(new String[] { index }, TimeValue.timeValueMinutes(1));
try {
// ensure that the PIT created has all the shards there
assertThat(numShards, equalTo(pointInTimeResponse.getTotalShards()));
assertThat(numShards, equalTo(pointInTimeResponse.getSuccessfulShards()));
assertThat(0, equalTo(pointInTimeResponse.getFailedShards()));
assertThat(0, equalTo(pointInTimeResponse.getSkippedShards()));

// make a request using the above PIT
assertResponse(
prepareSearch().setQuery(new MatchAllQueryBuilder())
.setPointInTime(new PointInTimeBuilder(pointInTimeResponse.getPointInTimeId())),
resp -> {
// ensure that al docs are returned
assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponse.getPointInTimeId()));
assertHitCount(resp, numDocs);
}
);

// pick up a random data node to shut down
final String randomDataNode = randomFrom(dataNodes);

// find which shards to relocate
final String nodeId = admin().cluster().prepareNodesInfo(randomDataNode).get().getNodes().get(0).getNode().getId();
List<Integer> shardsToRelocate = new ArrayList<>();
for (ShardStats stats : admin().indices().prepareStats(index).get().getShards()) {
if (nodeId.equals(stats.getShardRouting().currentNodeId())) {
shardsToRelocate.add(stats.getShardRouting().shardId().id());
}
}

final int shardsRemoved = shardsToRelocate.size();

// shut down the random data node
internalCluster().stopNode(randomDataNode);

// ensure that the index is Red
ensureRed(index);

// verify that not all documents can now be retrieved
assertResponse(prepareSearch().setQuery(new MatchAllQueryBuilder()), resp -> {
assertNotNull(resp.getHits().getTotalHits());
assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs));
jimczi marked this conversation as resolved.
Show resolved Hide resolved
});

// create a PIT when some shards are missing
OpenPointInTimeResponse pointInTimeResponseOneNodeDown = openPointInTime(
new String[] { index },
TimeValue.timeValueMinutes(10),
true
);
try {
// assert that some shards are indeed missing from PIT
assertThat(numShards, equalTo(pointInTimeResponseOneNodeDown.getTotalShards()));
jimczi marked this conversation as resolved.
Show resolved Hide resolved
assertThat(numShards - shardsToRelocate.size(), equalTo(pointInTimeResponseOneNodeDown.getSuccessfulShards()));
assertThat(shardsToRelocate.size(), equalTo(pointInTimeResponseOneNodeDown.getFailedShards()));
assertThat(0, equalTo(pointInTimeResponseOneNodeDown.getSkippedShards()));

// ensure that the response now contains fewer documents than the total number of indexed documents
assertResponse(
prepareSearch().setQuery(new MatchAllQueryBuilder())
.setPointInTime(new PointInTimeBuilder(pointInTimeResponseOneNodeDown.getPointInTimeId())),
resp -> {
assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponseOneNodeDown.getPointInTimeId()));
assertNotNull(resp.getHits().getTotalHits());
assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs));
}
);

// add another node to the cluster and re-allocate the shards
final String newNodeName = internalCluster().startDataOnlyNode(nodeAttributes);
try {
for (int shardId : shardsToRelocate) {
ClusterRerouteUtils.reroute(client(), new AllocateEmptyPrimaryAllocationCommand(index, shardId, newNodeName, true));
}
ensureGreen(TimeValue.timeValueMinutes(2), index);

// index some more documents
for (int i = numDocs; i < numDocs * 2; i++) {
String id = Integer.toString(i);
prepareIndex(index).setId(id).setSource("value", i).get();
}
refresh(index);

// ensure that we now see at least numDocs results from the updated index
assertResponse(prepareSearch().setQuery(new MatchAllQueryBuilder()), resp -> {
assertThat(resp.getSuccessfulShards(), equalTo(numShards));
assertNotNull(resp.getHits().getTotalHits());
assertThat(resp.getHits().getTotalHits().value, greaterThan((long) numDocs));
});

// ensure that when using the previously created PIT, we'd see the same number of documents as before regardless of the
// newly indexed documents
Comment on lines +666 to +667
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😤

assertResponse(
prepareSearch().setQuery(new MatchAllQueryBuilder())
.setPointInTime(new PointInTimeBuilder(pointInTimeResponseOneNodeDown.getPointInTimeId())),
resp -> {
assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponseOneNodeDown.getPointInTimeId()));
assertThat(resp.getTotalShards(), equalTo(numShards - shardsRemoved));
assertThat(resp.getSuccessfulShards(), equalTo(numShards - shardsRemoved));
assertThat(resp.getFailedShards(), equalTo(0));
assertNotNull(resp.getHits().getTotalHits());
// we expect less documents as the newly indexed ones should not be part of the PIT
assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs));
}
);
} finally {
internalCluster().stopNode(newNodeName);
}
} finally {
closePointInTime(pointInTimeResponseOneNodeDown.getPointInTimeId());
}

} finally {
closePointInTime(pointInTimeResponse.getPointInTimeId());
internalCluster().stopNode(masterNode);
for (String dataNode : dataNodes) {
internalCluster().stopNode(dataNode);
}
}
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int size, SortBuilder<?>... sorts) throws Exception {
Set<String> seen = new HashSet<>();
Expand Down Expand Up @@ -590,10 +746,14 @@ private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int s
assertThat(seen.size(), equalTo(expectedNumDocs));
}

private BytesReference openPointInTime(String[] indices, TimeValue keepAlive) {
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).keepAlive(keepAlive);
final OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet();
return response.getPointInTimeId();
private OpenPointInTimeResponse openPointInTime(String[] indices, TimeValue keepAlive) {
return openPointInTime(indices, keepAlive, false);
}

private OpenPointInTimeResponse openPointInTime(String[] indices, TimeValue keepAlive, boolean allowPartialSearchResults) {
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).keepAlive(keepAlive)
.allowPartialSearchResults(allowPartialSearchResults);
return client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet();
}

private void closePointInTime(BytesReference readerId) {
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ static TransportVersion def(int id) {
public static final TransportVersion FIX_VECTOR_SIMILARITY_INNER_HITS = def(8_713_00_0);
public static final TransportVersion INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN = def(8_714_00_0);
public static final TransportVersion ESQL_ATTRIBUTE_CACHED_SERIALIZATION = def(8_715_00_0);

jimczi marked this conversation as resolved.
Show resolved Hide resolved
public static final TransportVersion ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT = def(8_999_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Loading