From 0657b1a47912d0d92bb0e478cda5fd74465ef745 Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Tue, 30 Jul 2024 14:20:16 +0300 Subject: [PATCH 01/18] iter --- .../src/main/groovy/elasticsearch.run.gradle | 8 +- .../action/search/LenientPointInTimeIT.java | 96 +++++++++++++++++++ .../org/elasticsearch/TransportVersions.java | 1 + .../action/search/OpenPointInTimeRequest.java | 24 ++++- .../search/OpenPointInTimeResponse.java | 42 +++++++- .../TransportOpenPointInTimeAction.java | 10 +- .../RestOpenPointInTimeActionTests.java | 2 +- 7 files changed, 176 insertions(+), 7 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/action/search/LenientPointInTimeIT.java diff --git a/build-tools-internal/src/main/groovy/elasticsearch.run.gradle b/build-tools-internal/src/main/groovy/elasticsearch.run.gradle index 3a905c001d0cf..c5f9afe37c105 100644 --- a/build-tools-internal/src/main/groovy/elasticsearch.run.gradle +++ b/build-tools-internal/src/main/groovy/elasticsearch.run.gradle @@ -30,7 +30,13 @@ testClusters.register("runTask") { setting 'xpack.security.enabled', 'true' keystore 'bootstrap.password', 'password' user username: 'elastic-admin', password: 'elastic-password', role: '_es_test_root' - numberOfNodes = 1 + numberOfNodes = 3 + def cluster = testClusters.named("runTask").get() + cluster.getNodes().each { node -> + node.setting('cluster.initial_master_nodes', cluster.getLastNode().getName()) + node.setting('node.roles', '[master,data_hot,data_content]') + } + cluster.getFirstNode().setting('node.roles', '[]') } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/LenientPointInTimeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/LenientPointInTimeIT.java new file mode 100644 index 0000000000000..7586fc46bc7ad --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/LenientPointInTimeIT.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.search.builder.PointInTimeBuilder; +import org.elasticsearch.test.ESIntegTestCase; + +import java.io.IOException; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; + +@ESIntegTestCase.ClusterScope(numDataNodes = 2) +public class LenientPointInTimeIT extends ESIntegTestCase { + + public void testBasic() throws IOException { + final String index = "my_test_index"; + createIndex( + index, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 10).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + int numDocs = randomIntBetween(10, 50); + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + prepareIndex(index).setId(id).setSource("value", i).get(); + } + refresh(index); + OpenPointInTimeResponse pointInTimeResponse = openPointInTime(new String[] { index }, TimeValue.timeValueMinutes(1)); + try { + assertThat(10, equalTo(pointInTimeResponse.getTotalShards())); + assertThat(10, equalTo(pointInTimeResponse.getSuccessfulShards())); + assertThat(0, equalTo(pointInTimeResponse.getFailedShards())); + assertThat(0, equalTo(pointInTimeResponse.getSkippedShards())); + + assertResponse( + prepareSearch().setQuery(new MatchAllQueryBuilder()) + .setPointInTime(new PointInTimeBuilder(pointInTimeResponse.getPointInTimeId())), + resp -> { + assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponse.getPointInTimeId())); + assertHitCount(resp, numDocs); + } + ); + internalCluster().stopRandomDataNode(); + + OpenPointInTimeResponse pointInTimeResponseOneNodeDown = openPointInTime( + new String[] { index }, + TimeValue.timeValueMinutes(10) + ); + try { + assertThat(10, equalTo(pointInTimeResponseOneNodeDown.getTotalShards())); + assertThat(5, equalTo(pointInTimeResponseOneNodeDown.getSuccessfulShards())); + assertThat(5, equalTo(pointInTimeResponseOneNodeDown.getFailedShards())); + assertThat(0, equalTo(pointInTimeResponseOneNodeDown.getSkippedShards())); + + assertResponse( + prepareSearch().setQuery(new MatchAllQueryBuilder()) + .setPointInTime(new PointInTimeBuilder(pointInTimeResponseOneNodeDown.getPointInTimeId())), + resp -> { + assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponseOneNodeDown.getPointInTimeId())); + assertNotNull(resp.getHits().getTotalHits()); + assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs)); + } + ); + + + } finally { + closePointInTime(pointInTimeResponseOneNodeDown.getPointInTimeId()); + } + + } finally { + closePointInTime(pointInTimeResponse.getPointInTimeId()); + } + } + + private OpenPointInTimeResponse openPointInTime(String[] indices, TimeValue keepAlive) { + OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).keepAlive(keepAlive).allowPartialSearchResults(true); + return client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet(); + } + + private void closePointInTime(BytesReference readerId) { + client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(readerId)).actionGet(); + } +} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 34324ec2a1c16..4b45206f49666 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -179,6 +179,7 @@ static TransportVersion def(int id) { public static final TransportVersion MASTER_NODE_METRICS = def(8_710_00_0); public static final TransportVersion SEGMENT_LEVEL_FIELDS_STATS = def(8_711_00_0); public static final TransportVersion ML_ADD_DETECTION_RULE_PARAMS = def(8_712_00_0); + public static final TransportVersion ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT = def(8_713_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java b/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java index a1cd4df25a25c..b77d9142fe73c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java @@ -41,6 +41,8 @@ public final class OpenPointInTimeRequest extends ActionRequest implements Indic private QueryBuilder indexFilter; + private boolean allowPartialSearchResults = false; + public static final IndicesOptions DEFAULT_INDICES_OPTIONS = SearchRequest.DEFAULT_INDICES_OPTIONS; public OpenPointInTimeRequest(String... indices) { @@ -60,6 +62,9 @@ public OpenPointInTimeRequest(StreamInput in) throws IOException { if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { this.indexFilter = in.readOptionalNamedWriteable(QueryBuilder.class); } + if (in.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) { + this.allowPartialSearchResults = in.readBoolean(); + } } @Override @@ -76,6 +81,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { out.writeOptionalWriteable(indexFilter); } + if (out.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) { + out.writeBoolean(allowPartialSearchResults); + } } @Override @@ -180,6 +188,15 @@ public boolean includeDataStreams() { return true; } + public boolean allowPartialSearchResults() { + return allowPartialSearchResults; + } + + public OpenPointInTimeRequest allowPartialSearchResults(boolean allowPartialSearchResults) { + this.allowPartialSearchResults = allowPartialSearchResults; + return this; + } + @Override public String getDescription() { return "open search context: indices [" + String.join(",", indices) + "] keep_alive [" + keepAlive + "]"; @@ -200,6 +217,8 @@ public String toString() { + ", preference='" + preference + '\'' + + ", allowPartialSearchResults=" + + allowPartialSearchResults + '}'; } @@ -218,12 +237,13 @@ public boolean equals(Object o) { && indicesOptions.equals(that.indicesOptions) && keepAlive.equals(that.keepAlive) && Objects.equals(routing, that.routing) - && Objects.equals(preference, that.preference); + && Objects.equals(preference, that.preference) + && Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults); } @Override public int hashCode() { - int result = Objects.hash(indicesOptions, keepAlive, maxConcurrentShardRequests, routing, preference); + int result = Objects.hash(indicesOptions, keepAlive, maxConcurrentShardRequests, routing, preference, allowPartialSearchResults); result = 31 * result + Arrays.hashCode(indices); return result; } diff --git a/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeResponse.java b/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeResponse.java index dafcee894c9a6..4a4c0252fb109 100644 --- a/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeResponse.java @@ -8,6 +8,7 @@ package org.elasticsearch.action.search; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamOutput; @@ -18,22 +19,46 @@ import java.util.Base64; import java.util.Objects; +import static org.elasticsearch.rest.action.RestActions.buildBroadcastShardsHeader; + public final class OpenPointInTimeResponse extends ActionResponse implements ToXContentObject { private final BytesReference pointInTimeId; - public OpenPointInTimeResponse(BytesReference pointInTimeId) { + private final int totalShards; + private final int successfulShards; + private final int failedShards; + private final int skippedShards; + + public OpenPointInTimeResponse( + BytesReference pointInTimeId, + int totalShards, + int successfulShards, + int failedShards, + int skippedShards + ) { this.pointInTimeId = Objects.requireNonNull(pointInTimeId, "Point in time parameter must be not null"); + this.totalShards = totalShards; + this.successfulShards = successfulShards; + this.failedShards = failedShards; + this.skippedShards = skippedShards; } @Override public void writeTo(StreamOutput out) throws IOException { out.writeBytesReference(pointInTimeId); + if (out.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) { + out.writeVInt(totalShards); + out.writeVInt(successfulShards); + out.writeVInt(failedShards); + out.writeVInt(skippedShards); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field("id", Base64.getUrlEncoder().encodeToString(BytesReference.toBytes(pointInTimeId))); + buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, failedShards, skippedShards, null); builder.endObject(); return builder; } @@ -42,4 +67,19 @@ public BytesReference getPointInTimeId() { return pointInTimeId; } + public int getTotalShards() { + return totalShards; + } + + public int getSuccessfulShards() { + return successfulShards; + } + + public int getFailedShards() { + return failedShards; + } + + public int getSkippedShards() { + return skippedShards; + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index 92d90fa8e55ad..8ff327266e365 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -98,13 +98,19 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen .indicesOptions(request.indicesOptions()) .preference(request.preference()) .routing(request.routing()) - .allowPartialSearchResults(false) + .allowPartialSearchResults(request.allowPartialSearchResults()) .source(new SearchSourceBuilder().query(request.indexFilter())); searchRequest.setMaxConcurrentShardRequests(request.maxConcurrentShardRequests()); searchRequest.setCcsMinimizeRoundtrips(false); transportSearchAction.executeRequest((SearchTask) task, searchRequest, listener.map(r -> { assert r.pointInTimeId() != null : r; - return new OpenPointInTimeResponse(r.pointInTimeId()); + return new OpenPointInTimeResponse( + r.pointInTimeId(), + r.getTotalShards(), + r.getSuccessfulShards(), + r.getFailedShards(), + r.getSkippedShards() + ); }), searchListener -> new OpenPointInTimePhase(request, searchListener)); } diff --git a/server/src/test/java/org/elasticsearch/action/search/RestOpenPointInTimeActionTests.java b/server/src/test/java/org/elasticsearch/action/search/RestOpenPointInTimeActionTests.java index dda977565af45..b8950ed7aa10e 100644 --- a/server/src/test/java/org/elasticsearch/action/search/RestOpenPointInTimeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/RestOpenPointInTimeActionTests.java @@ -31,7 +31,7 @@ public void testMaxConcurrentSearchRequests() { verifyingClient.setExecuteVerifier(((actionType, transportRequest) -> { assertThat(transportRequest, instanceOf(OpenPointInTimeRequest.class)); transportRequests.add((OpenPointInTimeRequest) transportRequest); - return new OpenPointInTimeResponse(new BytesArray("n/a")); + return new OpenPointInTimeResponse(new BytesArray("n/a"), 0, 0, 0, 0); })); { RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) From 92dee660e071c55f482243ceb791fa2915c08e40 Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Tue, 30 Jul 2024 15:58:43 +0300 Subject: [PATCH 02/18] iter --- .../action/search/LenientPointInTimeIT.java | 47 ++++++++++++++++++- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/LenientPointInTimeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/LenientPointInTimeIT.java index 7586fc46bc7ad..6251930f4c827 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/LenientPointInTimeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/LenientPointInTimeIT.java @@ -9,6 +9,8 @@ package org.elasticsearch.action.search; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; @@ -17,16 +19,20 @@ import org.elasticsearch.test.ESIntegTestCase; import java.io.IOException; +import java.util.List; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.lessThan; @ESIntegTestCase.ClusterScope(numDataNodes = 2) public class LenientPointInTimeIT extends ESIntegTestCase { - public void testBasic() throws IOException { + public void testBasic() throws Exception { final String index = "my_test_index"; createIndex( index, @@ -53,7 +59,21 @@ public void testBasic() throws IOException { assertHitCount(resp, numDocs); } ); - internalCluster().stopRandomDataNode(); + + final String randomDataNode = internalCluster().getNodeNameThat( + settings -> DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) + ); + + updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", randomDataNode)); + ensureGreen(index); + + assertResponse( + prepareSearch().setQuery(new MatchAllQueryBuilder()), + resp -> { + assertNotNull(resp.getHits().getTotalHits()); + assertThat(resp.getHits().getTotalHits().value, lessThan((long)numDocs)); + } + ); OpenPointInTimeResponse pointInTimeResponseOneNodeDown = openPointInTime( new String[] { index }, @@ -75,6 +95,29 @@ public void testBasic() throws IOException { } ); + for (int i = numDocs; i < numDocs * 2; i++) { + String id = Integer.toString(i); + prepareIndex(index).setId(id).setSource("value", i).get(); + } + + assertResponse( + prepareSearch().setQuery(new MatchAllQueryBuilder()), + resp -> { + assertNotNull(resp.getHits().getTotalHits()); + assertThat(resp.getHits().getTotalHits().value, greaterThan((long)numDocs)); + } + ); + + assertResponse( + prepareSearch().setQuery(new MatchAllQueryBuilder()) + .setPointInTime(new PointInTimeBuilder(pointInTimeResponseOneNodeDown.getPointInTimeId())), + resp -> { + assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponseOneNodeDown.getPointInTimeId())); + assertNotNull(resp.getHits().getTotalHits()); + assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs)); + } + ); + } finally { closePointInTime(pointInTimeResponseOneNodeDown.getPointInTimeId()); From 1bb92370dd07da1026f587ca49eb757d5e39275c Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Wed, 31 Jul 2024 18:16:52 +0300 Subject: [PATCH 03/18] iter --- .../action/search/LenientPointInTimeIT.java | 139 ------------- .../action/search/PointInTimeIT.java | 182 ++++++++++++++++-- .../elasticsearch/test/ESIntegTestCase.java | 7 + 3 files changed, 177 insertions(+), 151 deletions(-) delete mode 100644 server/src/internalClusterTest/java/org/elasticsearch/action/search/LenientPointInTimeIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/LenientPointInTimeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/LenientPointInTimeIT.java deleted file mode 100644 index 6251930f4c827..0000000000000 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/LenientPointInTimeIT.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action.search; - -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodeRole; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.query.MatchAllQueryBuilder; -import org.elasticsearch.search.builder.PointInTimeBuilder; -import org.elasticsearch.test.ESIntegTestCase; - -import java.io.IOException; -import java.util.List; - -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.in; -import static org.hamcrest.Matchers.lessThan; - -@ESIntegTestCase.ClusterScope(numDataNodes = 2) -public class LenientPointInTimeIT extends ESIntegTestCase { - - public void testBasic() throws Exception { - final String index = "my_test_index"; - createIndex( - index, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 10).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() - ); - int numDocs = randomIntBetween(10, 50); - for (int i = 0; i < numDocs; i++) { - String id = Integer.toString(i); - prepareIndex(index).setId(id).setSource("value", i).get(); - } - refresh(index); - OpenPointInTimeResponse pointInTimeResponse = openPointInTime(new String[] { index }, TimeValue.timeValueMinutes(1)); - try { - assertThat(10, equalTo(pointInTimeResponse.getTotalShards())); - assertThat(10, equalTo(pointInTimeResponse.getSuccessfulShards())); - assertThat(0, equalTo(pointInTimeResponse.getFailedShards())); - assertThat(0, equalTo(pointInTimeResponse.getSkippedShards())); - - assertResponse( - prepareSearch().setQuery(new MatchAllQueryBuilder()) - .setPointInTime(new PointInTimeBuilder(pointInTimeResponse.getPointInTimeId())), - resp -> { - assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponse.getPointInTimeId())); - assertHitCount(resp, numDocs); - } - ); - - final String randomDataNode = internalCluster().getNodeNameThat( - settings -> DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) - ); - - updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", randomDataNode)); - ensureGreen(index); - - assertResponse( - prepareSearch().setQuery(new MatchAllQueryBuilder()), - resp -> { - assertNotNull(resp.getHits().getTotalHits()); - assertThat(resp.getHits().getTotalHits().value, lessThan((long)numDocs)); - } - ); - - OpenPointInTimeResponse pointInTimeResponseOneNodeDown = openPointInTime( - new String[] { index }, - TimeValue.timeValueMinutes(10) - ); - try { - assertThat(10, equalTo(pointInTimeResponseOneNodeDown.getTotalShards())); - assertThat(5, equalTo(pointInTimeResponseOneNodeDown.getSuccessfulShards())); - assertThat(5, equalTo(pointInTimeResponseOneNodeDown.getFailedShards())); - assertThat(0, equalTo(pointInTimeResponseOneNodeDown.getSkippedShards())); - - assertResponse( - prepareSearch().setQuery(new MatchAllQueryBuilder()) - .setPointInTime(new PointInTimeBuilder(pointInTimeResponseOneNodeDown.getPointInTimeId())), - resp -> { - assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponseOneNodeDown.getPointInTimeId())); - assertNotNull(resp.getHits().getTotalHits()); - assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs)); - } - ); - - for (int i = numDocs; i < numDocs * 2; i++) { - String id = Integer.toString(i); - prepareIndex(index).setId(id).setSource("value", i).get(); - } - - assertResponse( - prepareSearch().setQuery(new MatchAllQueryBuilder()), - resp -> { - assertNotNull(resp.getHits().getTotalHits()); - assertThat(resp.getHits().getTotalHits().value, greaterThan((long)numDocs)); - } - ); - - assertResponse( - prepareSearch().setQuery(new MatchAllQueryBuilder()) - .setPointInTime(new PointInTimeBuilder(pointInTimeResponseOneNodeDown.getPointInTimeId())), - resp -> { - assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponseOneNodeDown.getPointInTimeId())); - assertNotNull(resp.getHits().getTotalHits()); - assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs)); - } - ); - - - } finally { - closePointInTime(pointInTimeResponseOneNodeDown.getPointInTimeId()); - } - - } finally { - closePointInTime(pointInTimeResponse.getPointInTimeId()); - } - } - - private OpenPointInTimeResponse openPointInTime(String[] indices, TimeValue keepAlive) { - OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).keepAlive(keepAlive).allowPartialSearchResults(true); - return client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet(); - } - - private void closePointInTime(BytesReference readerId) { - client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(readerId)).actionGet(); - } -} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java index a9a5bb074c9ac..9b4634b64d7fa 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java @@ -10,12 +10,15 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -40,6 +43,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -57,8 +61,10 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; public class PointInTimeIT extends ESIntegTestCase { @@ -84,7 +90,7 @@ public void testBasic() { prepareIndex("test").setId(id).setSource("value", i).get(); } refresh("test"); - BytesReference pitId = openPointInTime(new String[] { "test" }, TimeValue.timeValueMinutes(2)); + BytesReference pitId = openPointInTime(new String[] { "test" }, TimeValue.timeValueMinutes(2)).getPointInTimeId(); assertResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pitId)), resp1 -> { assertThat(resp1.pointInTimeId(), equalTo(pitId)); assertHitCount(resp1, numDocs); @@ -130,7 +136,7 @@ public void testMultipleIndices() { prepareIndex(index).setId(id).setSource("value", i).get(); } refresh(); - BytesReference pitId = openPointInTime(new String[] { "*" }, TimeValue.timeValueMinutes(2)); + BytesReference pitId = openPointInTime(new String[] { "*" }, TimeValue.timeValueMinutes(2)).getPointInTimeId(); try { int moreDocs = randomIntBetween(10, 50); assertNoFailuresAndResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pitId)), resp -> { @@ -212,7 +218,7 @@ public void testRelocation() throws Exception { prepareIndex("test").setId(Integer.toString(i)).setSource("value", i).get(); } refresh(); - BytesReference pitId = openPointInTime(new String[] { "test" }, TimeValue.timeValueMinutes(2)); + BytesReference pitId = openPointInTime(new String[] { "test" }, TimeValue.timeValueMinutes(2)).getPointInTimeId(); try { assertNoFailuresAndResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pitId)), resp -> { assertHitCount(resp, numDocs); @@ -264,7 +270,7 @@ public void testPointInTimeNotFound() throws Exception { prepareIndex("index").setId(id).setSource("value", i).get(); } refresh(); - BytesReference pit = openPointInTime(new String[] { "index" }, TimeValue.timeValueSeconds(5)); + BytesReference pit = openPointInTime(new String[] { "index" }, TimeValue.timeValueSeconds(5)).getPointInTimeId(); assertNoFailuresAndResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pit)), resp1 -> { assertHitCount(resp1, index1); if (rarely()) { @@ -305,7 +311,7 @@ public void testIndexNotFound() { prepareIndex("index-2").setId(id).setSource("value", i).get(); } refresh(); - BytesReference pit = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueMinutes(2)); + BytesReference pit = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueMinutes(2)).getPointInTimeId(); try { assertNoFailuresAndResponse( prepareSearch().setPointInTime(new PointInTimeBuilder(pit)), @@ -348,7 +354,7 @@ public void testCanMatch() throws Exception { assertAcked(prepareCreate("test").setSettings(settings).setMapping(""" {"properties":{"created_date":{"type": "date", "format": "yyyy-MM-dd"}}}""")); ensureGreen("test"); - BytesReference pitId = openPointInTime(new String[] { "test*" }, TimeValue.timeValueMinutes(2)); + BytesReference pitId = openPointInTime(new String[] { "test*" }, TimeValue.timeValueMinutes(2)).getPointInTimeId(); try { for (String node : internalCluster().nodesInclude("test")) { for (IndexService indexService : internalCluster().getInstance(IndicesService.class, node)) { @@ -415,7 +421,7 @@ public void testPartialResults() throws Exception { prepareIndex(randomFrom("test-2")).setId(Integer.toString(i)).setSource("value", i).get(); } refresh(); - BytesReference pitId = openPointInTime(new String[] { "test-*" }, TimeValue.timeValueMinutes(2)); + BytesReference pitId = openPointInTime(new String[] { "test-*" }, TimeValue.timeValueMinutes(2)).getPointInTimeId(); try { assertNoFailuresAndResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pitId)), resp -> { assertHitCount(resp, numDocs1 + numDocs2); @@ -447,7 +453,7 @@ public void testPITTiebreak() throws Exception { } } refresh("index-*"); - BytesReference pit = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueHours(1)); + BytesReference pit = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueHours(1)).getPointInTimeId(); try { for (int size = 1; size <= numIndex; size++) { SortOrder order = randomBoolean() ? SortOrder.ASC : SortOrder.DESC; @@ -532,6 +538,154 @@ public void testOpenPITConcurrentShardRequests() throws Exception { } } + public void testMissingShardsWithPointInTime() throws Exception { + final Settings nodeAttributes = Settings.builder().put("node.attr.foo", "bar").build(); + final String masterNode = internalCluster().startMasterOnlyNode(nodeAttributes); + List dataNodes = internalCluster().startDataOnlyNodes(2, nodeAttributes); + + final String index = "my_test_index"; + final int numShards = 2; // randomIntBetween(2, 4); + // create an index with numShards shards and 0 replicas + createIndex( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.routing.allocation.require.foo", "bar") + .build() + ); + + // index some documents + int numDocs = randomIntBetween(10, 50); + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + prepareIndex(index).setId(id).setSource("value", i).get(); + } + refresh(index); + + // create a PIT when all docs are present + OpenPointInTimeResponse pointInTimeResponse = openPointInTime(new String[]{index}, TimeValue.timeValueMinutes(1)); + try { + // ensure that the PIT created has all the shards there + assertThat(numShards, equalTo(pointInTimeResponse.getTotalShards())); + assertThat(numShards, equalTo(pointInTimeResponse.getSuccessfulShards())); + assertThat(0, equalTo(pointInTimeResponse.getFailedShards())); + assertThat(0, equalTo(pointInTimeResponse.getSkippedShards())); + + // make a request using the above PIT + assertResponse( + prepareSearch().setQuery(new MatchAllQueryBuilder()) + .setPointInTime(new PointInTimeBuilder(pointInTimeResponse.getPointInTimeId())), + resp -> { + // ensure that al docs are returned + assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponse.getPointInTimeId())); + assertHitCount(resp, numDocs); + } + ); + + // pick up a random data node to shut down + final String randomDataNode = randomFrom(dataNodes); + + // find which shards to relocate + final String nodeId = admin().cluster().prepareNodesInfo(randomDataNode).get().getNodes().get(0).getNode().getId(); + List shardsToRelocate = new ArrayList<>(); + for (ShardStats stats : admin().indices().prepareStats(index).get().getShards()) { + if (nodeId.equals(stats.getShardRouting().currentNodeId())) { + shardsToRelocate.add(stats.getShardRouting().shardId().id()); + } + } + + final int shardsRemoved = shardsToRelocate.size(); + + // shut down the random data node + internalCluster().stopNode(randomDataNode); + + // ensure that the index is Red + ensureRed(index); + + // verify that not all documents can now be retrieved + assertResponse(prepareSearch().setQuery(new MatchAllQueryBuilder()), resp -> { + assertNotNull(resp.getHits().getTotalHits()); + assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs)); + }); + + // create a PIT when some shards are missing + OpenPointInTimeResponse pointInTimeResponseOneNodeDown = openPointInTime( + new String[]{index}, + TimeValue.timeValueMinutes(10), + true + ); + try { + // assert that some shards are indeed missing from PIT + assertThat(numShards, equalTo(pointInTimeResponseOneNodeDown.getTotalShards())); + assertThat(numShards - shardsToRelocate.size(), equalTo(pointInTimeResponseOneNodeDown.getSuccessfulShards())); + assertThat(shardsToRelocate.size(), equalTo(pointInTimeResponseOneNodeDown.getFailedShards())); + assertThat(0, equalTo(pointInTimeResponseOneNodeDown.getSkippedShards())); + + // ensure that the response now contains fewer documents than the total number of indexed documents + assertResponse( + prepareSearch().setQuery(new MatchAllQueryBuilder()) + .setPointInTime(new PointInTimeBuilder(pointInTimeResponseOneNodeDown.getPointInTimeId())), + resp -> { + assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponseOneNodeDown.getPointInTimeId())); + assertNotNull(resp.getHits().getTotalHits()); + assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs)); + } + ); + + // add another node to the cluster and re-allocate the shards + final String newNodeName = internalCluster().startDataOnlyNode(nodeAttributes); + try { + for (int shardId : shardsToRelocate) { + ClusterRerouteUtils.reroute(client(), new AllocateEmptyPrimaryAllocationCommand(index, shardId, newNodeName, true)); + } + ensureGreen(TimeValue.timeValueMinutes(2), index); + + // index some more documents + for (int i = numDocs; i < numDocs * 2; i++) { + String id = Integer.toString(i); + prepareIndex(index).setId(id).setSource("value", i).get(); + } + refresh(index); + + // ensure that we now see at least numDocs results from the updated index + assertResponse(prepareSearch().setQuery(new MatchAllQueryBuilder()), resp -> { + assertThat(resp.getSuccessfulShards(), equalTo(numShards)); + assertNotNull(resp.getHits().getTotalHits()); + assertThat(resp.getHits().getTotalHits().value, greaterThan((long) numDocs)); + }); + + // ensure that when using the previously created PIT, we'd see the same number of documents as before regardless of the + // newly indexed documents + assertResponse( + prepareSearch().setQuery(new MatchAllQueryBuilder()) + .setPointInTime(new PointInTimeBuilder(pointInTimeResponseOneNodeDown.getPointInTimeId())), + resp -> { + assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponseOneNodeDown.getPointInTimeId())); + assertThat(resp.getTotalShards(), equalTo(numShards - shardsRemoved)); + assertThat(resp.getSuccessfulShards(), equalTo(numShards - shardsRemoved)); + assertThat(resp.getFailedShards(), equalTo(0)); + assertNotNull(resp.getHits().getTotalHits()); + // we expect less documents as the newly indexed ones should not be part of the PIT + assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs)); + } + ); + } finally { + internalCluster().stopNode(newNodeName); + } + } finally { + closePointInTime(pointInTimeResponseOneNodeDown.getPointInTimeId()); + } + + } finally { + closePointInTime(pointInTimeResponse.getPointInTimeId()); + internalCluster().stopNode(masterNode); + for (String dataNode : dataNodes) { + internalCluster().stopNode(dataNode); + } + } + } + @SuppressWarnings({ "rawtypes", "unchecked" }) private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int size, SortBuilder... sorts) throws Exception { Set seen = new HashSet<>(); @@ -590,10 +744,14 @@ private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int s assertThat(seen.size(), equalTo(expectedNumDocs)); } - private BytesReference openPointInTime(String[] indices, TimeValue keepAlive) { - OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).keepAlive(keepAlive); - final OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet(); - return response.getPointInTimeId(); + private OpenPointInTimeResponse openPointInTime(String[] indices, TimeValue keepAlive) { + return openPointInTime(indices, keepAlive, false); + } + + private OpenPointInTimeResponse openPointInTime(String[] indices, TimeValue keepAlive, boolean allowPartialSearchResults) { + OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).keepAlive(keepAlive) + .allowPartialSearchResults(allowPartialSearchResults); + return client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet(); } private void closePointInTime(BytesReference readerId) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index f1b012d757926..19d0c04750523 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -959,6 +959,13 @@ public ClusterHealthStatus ensureYellow(String... indices) { return ensureColor(ClusterHealthStatus.YELLOW, TimeValue.timeValueSeconds(30), false, indices); } + /** + * Ensures the cluster has a red state via the cluster health API. + */ + public ClusterHealthStatus ensureRed(String... indices) { + return ensureColor(ClusterHealthStatus.RED, TimeValue.timeValueSeconds(30), false, indices); + } + /** * Ensures the cluster has a yellow state via the cluster health API and ensures the that cluster has no initializing shards * for the given indices From ad1c98f1ebb2d9d937e8b8053ed005cc3c770c67 Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Thu, 1 Aug 2024 14:52:48 +0300 Subject: [PATCH 04/18] iter --- .../src/main/groovy/elasticsearch.run.gradle | 8 +---- .../search/point-in-time-api.asciidoc | 30 +++++++++++++++++++ .../action/search/PointInTimeIT.java | 10 ++++--- .../search/RestOpenPointInTimeAction.java | 1 + 4 files changed, 38 insertions(+), 11 deletions(-) diff --git a/build-tools-internal/src/main/groovy/elasticsearch.run.gradle b/build-tools-internal/src/main/groovy/elasticsearch.run.gradle index c5f9afe37c105..3a905c001d0cf 100644 --- a/build-tools-internal/src/main/groovy/elasticsearch.run.gradle +++ b/build-tools-internal/src/main/groovy/elasticsearch.run.gradle @@ -30,13 +30,7 @@ testClusters.register("runTask") { setting 'xpack.security.enabled', 'true' keystore 'bootstrap.password', 'password' user username: 'elastic-admin', password: 'elastic-password', role: '_es_test_root' - numberOfNodes = 3 - def cluster = testClusters.named("runTask").get() - cluster.getNodes().each { node -> - node.setting('cluster.initial_master_nodes', cluster.getLastNode().getName()) - node.setting('node.roles', '[master,data_hot,data_content]') - } - cluster.getFirstNode().setting('node.roles', '[]') + numberOfNodes = 1 } } diff --git a/docs/reference/search/point-in-time-api.asciidoc b/docs/reference/search/point-in-time-api.asciidoc index 2e32324cb44d9..30fde7d0e7284 100644 --- a/docs/reference/search/point-in-time-api.asciidoc +++ b/docs/reference/search/point-in-time-api.asciidoc @@ -78,6 +78,36 @@ IMPORTANT: The open point in time request and each subsequent search request can return different `id`; thus always use the most recently received `id` for the next search request. + +In addition to `keep_alive`, we can also specify the `allow_partial_search_results` parameter +which specifies whether the <> should tolerate unavailable shards or +<> when initially creating the point in time. If `true`, the point in time will be +created with just the available shards, otherwise the operation will fail if there is at +least one shard that is unavailable. Defaults to `false`. + +The PIT response now also contains a report on the total number of shards, +as well as how many of those were successful in creating the PIT. + +[source,console] +-------------------------------------------------- +POST /my-index-000001/_pit?keep_alive=1m&allow_partial_search_results=true +-------------------------------------------------- +// TEST[setup:my_index] + +[source,json] +-------------------------------------------------- +{ + "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=", + "_shards": { + "total": 10, + "successful": 10, + "skipped": 0, + "failed": 0 + } +} +-------------------------------------------------- +// NOTCONSOLE + [[point-in-time-keep-alive]] ==== Keeping point in time alive The `keep_alive` parameter, which is passed to a open point in time request and diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java index 9b4634b64d7fa..e3fef7a23b238 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java @@ -544,13 +544,15 @@ public void testMissingShardsWithPointInTime() throws Exception { List dataNodes = internalCluster().startDataOnlyNodes(2, nodeAttributes); final String index = "my_test_index"; - final int numShards = 2; // randomIntBetween(2, 4); + // tried to have randomIntBetween(3, 10) but having more shards than 3 was taking forever and throwing timeouts + final int numShards = 3; + final int numReplicas = 0; // create an index with numShards shards and 0 replicas createIndex( index, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas) .put("index.routing.allocation.require.foo", "bar") .build() ); @@ -564,7 +566,7 @@ public void testMissingShardsWithPointInTime() throws Exception { refresh(index); // create a PIT when all docs are present - OpenPointInTimeResponse pointInTimeResponse = openPointInTime(new String[]{index}, TimeValue.timeValueMinutes(1)); + OpenPointInTimeResponse pointInTimeResponse = openPointInTime(new String[] { index }, TimeValue.timeValueMinutes(1)); try { // ensure that the PIT created has all the shards there assertThat(numShards, equalTo(pointInTimeResponse.getTotalShards())); @@ -611,7 +613,7 @@ public void testMissingShardsWithPointInTime() throws Exception { // create a PIT when some shards are missing OpenPointInTimeResponse pointInTimeResponseOneNodeDown = openPointInTime( - new String[]{index}, + new String[] { index }, TimeValue.timeValueMinutes(10), true ); diff --git a/server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java index 0e7f3f9111842..5966a1c924745 100644 --- a/server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java @@ -47,6 +47,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC openRequest.routing(request.param("routing")); openRequest.preference(request.param("preference")); openRequest.keepAlive(TimeValue.parseTimeValue(request.param("keep_alive"), null, "keep_alive")); + openRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", false)); if (request.hasParam("max_concurrent_shard_requests")) { final int maxConcurrentShardRequests = request.paramAsInt( "max_concurrent_shard_requests", From e62c6022ccc59ae48cc902489cdbe2b31c33dcd7 Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Thu, 1 Aug 2024 16:15:00 +0300 Subject: [PATCH 05/18] iter --- docs/reference/search/point-in-time-api.asciidoc | 6 +++--- .../java/org/elasticsearch/action/search/PointInTimeIT.java | 2 +- .../action/search/RestOpenPointInTimeActionTests.java | 2 +- .../xpack/eql/execution/sample/CircuitBreakerTests.java | 2 +- .../eql/execution/search/PITAwareQueryClientTests.java | 2 +- .../xpack/eql/execution/sequence/CircuitBreakerTests.java | 2 +- .../elasticsearch/xpack/sql/analysis/CancellationTests.java | 2 +- .../transform/transforms/ClientTransformIndexerTests.java | 2 +- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/reference/search/point-in-time-api.asciidoc b/docs/reference/search/point-in-time-api.asciidoc index 30fde7d0e7284..db30f21dc3b7c 100644 --- a/docs/reference/search/point-in-time-api.asciidoc +++ b/docs/reference/search/point-in-time-api.asciidoc @@ -79,14 +79,14 @@ return different `id`; thus always use the most recently received `id` for the next search request. -In addition to `keep_alive`, we can also specify the `allow_partial_search_results` parameter +In addition to `keep_alive`, we can also define the `allow_partial_search_results` parameter which specifies whether the <> should tolerate unavailable shards or <> when initially creating the point in time. If `true`, the point in time will be created with just the available shards, otherwise the operation will fail if there is at least one shard that is unavailable. Defaults to `false`. The PIT response now also contains a report on the total number of shards, -as well as how many of those were successful in creating the PIT. +as well as how many of those were successful when creating the PIT. [source,console] -------------------------------------------------- @@ -94,7 +94,7 @@ POST /my-index-000001/_pit?keep_alive=1m&allow_partial_search_results=true -------------------------------------------------- // TEST[setup:my_index] -[source,json] +[source,js] -------------------------------------------------- { "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=", diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java index e3fef7a23b238..e0e7b491104c6 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java @@ -565,7 +565,7 @@ public void testMissingShardsWithPointInTime() throws Exception { } refresh(index); - // create a PIT when all docs are present + // create a PIT when all shards are present OpenPointInTimeResponse pointInTimeResponse = openPointInTime(new String[] { index }, TimeValue.timeValueMinutes(1)); try { // ensure that the PIT created has all the shards there diff --git a/server/src/test/java/org/elasticsearch/action/search/RestOpenPointInTimeActionTests.java b/server/src/test/java/org/elasticsearch/action/search/RestOpenPointInTimeActionTests.java index b8950ed7aa10e..e7b5e898684f6 100644 --- a/server/src/test/java/org/elasticsearch/action/search/RestOpenPointInTimeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/RestOpenPointInTimeActionTests.java @@ -31,7 +31,7 @@ public void testMaxConcurrentSearchRequests() { verifyingClient.setExecuteVerifier(((actionType, transportRequest) -> { assertThat(transportRequest, instanceOf(OpenPointInTimeRequest.class)); transportRequests.add((OpenPointInTimeRequest) transportRequest); - return new OpenPointInTimeResponse(new BytesArray("n/a"), 0, 0, 0, 0); + return new OpenPointInTimeResponse(new BytesArray("n/a"), 1, 1, 0, 0); })); { RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sample/CircuitBreakerTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sample/CircuitBreakerTests.java index 943d1275364fb..687fe884df8b2 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sample/CircuitBreakerTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sample/CircuitBreakerTests.java @@ -209,7 +209,7 @@ protected void ActionListener listener ) { if (request instanceof OpenPointInTimeRequest) { - OpenPointInTimeResponse response = new OpenPointInTimeResponse(pitId); + OpenPointInTimeResponse response = new OpenPointInTimeResponse(pitId, 1, 1, 0, 0); listener.onResponse((Response) response); } else if (request instanceof ClosePointInTimeRequest) { ClosePointInTimeResponse response = new ClosePointInTimeResponse(true, 1); diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClientTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClientTests.java index c0e5d398d6508..f1c5d483d4002 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClientTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/search/PITAwareQueryClientTests.java @@ -204,7 +204,7 @@ protected void assertArrayEquals(INDICES, openPIT.indices()); // indices for opening pit should be the same as for the eql query itself openedPIT = true; - OpenPointInTimeResponse response = new OpenPointInTimeResponse(pitId); + OpenPointInTimeResponse response = new OpenPointInTimeResponse(pitId, 1, 1, 0, 0); listener.onResponse((Response) response); } else if (request instanceof ClosePointInTimeRequest closePIT) { assertTrue(openedPIT); diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java index c001b312d5578..ecf5ef61ac49a 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/execution/sequence/CircuitBreakerTests.java @@ -394,7 +394,7 @@ protected void ) { if (request instanceof OpenPointInTimeRequest) { pitContextCounter.incrementAndGet(); - OpenPointInTimeResponse response = new OpenPointInTimeResponse(pitId); + OpenPointInTimeResponse response = new OpenPointInTimeResponse(pitId, 1, 1, 0, 0); listener.onResponse((Response) response); } else if (request instanceof ClosePointInTimeRequest) { ClosePointInTimeResponse response = new ClosePointInTimeResponse(true, 1); diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/CancellationTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/CancellationTests.java index 10d6b04d7505c..3e1f910c9f72e 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/CancellationTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/CancellationTests.java @@ -190,7 +190,7 @@ public void testCancellationDuringSearch(String query) throws InterruptedExcepti doAnswer(invocation -> { @SuppressWarnings("unchecked") ActionListener listener = (ActionListener) invocation.getArguments()[2]; - listener.onResponse(new OpenPointInTimeResponse(pitId)); + listener.onResponse(new OpenPointInTimeResponse(pitId, 1, 1, 0, 0)); return null; }).when(client).execute(eq(TransportOpenPointInTimeAction.TYPE), any(), any()); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index 062c951f67c96..c8677c2816fc9 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -538,7 +538,7 @@ protected void if (request instanceof OpenPointInTimeRequest) { if (pitSupported) { pitContextCounter.incrementAndGet(); - OpenPointInTimeResponse response = new OpenPointInTimeResponse(new BytesArray("the_pit_id")); + OpenPointInTimeResponse response = new OpenPointInTimeResponse(new BytesArray("the_pit_id"), 1, 1, 0, 0); listener.onResponse((Response) response); } else { listener.onFailure(new ActionNotFoundTransportException("_pit")); From 81948e1e137039e8a870bdeb83e2b92613e137b7 Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Thu, 1 Aug 2024 17:12:41 +0300 Subject: [PATCH 06/18] iter --- .../action/search/TransportOpenPointInTimeAction.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index 8ff327266e365..d156798252ec5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -221,7 +221,9 @@ SearchPhase openPointInTimePhase( ) { @Override protected String missingShardsErrorMessage(StringBuilder missingShards) { - return "[open_point_in_time] action requires all shards to be available. Missing shards: [" + missingShards + "]"; + return "[open_point_in_time] action requires all shards to be available. Missing shards: [" + + missingShards + + "]. Consider using `allow_partial_search_results` setting to bypass this error."; } @Override From a75295f6984628e6dd53d5d053a101d584411793 Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Thu, 1 Aug 2024 18:58:13 +0300 Subject: [PATCH 07/18] iter --- .../paginate-search-results.asciidoc | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/reference/search/search-your-data/paginate-search-results.asciidoc b/docs/reference/search/search-your-data/paginate-search-results.asciidoc index a81598273dfd3..d42e85423b8c7 100644 --- a/docs/reference/search/search-your-data/paginate-search-results.asciidoc +++ b/docs/reference/search/search-your-data/paginate-search-results.asciidoc @@ -106,9 +106,9 @@ The search response includes an array of `sort` values for each hit: "_id" : "654322", "_score" : null, "_source" : ..., - "sort" : [ + "sort" : [ 1463538855, - "654322" + "654322" ] }, { @@ -118,7 +118,7 @@ The search response includes an array of `sort` values for each hit: "_source" : ..., "sort" : [ <1> 1463538857, - "654323" + "654323" ] } ] @@ -150,7 +150,7 @@ GET twitter/_search -------------------------------------------------- //TEST[continued] -Repeat this process by updating the `search_after` array every time you retrieve a +Repeat this process by updating the `search_after` array every time you retrieve a new page of results. If a <> occurs between these requests, the order of your results may change, causing inconsistent results across pages. To prevent this, you can create a <> to @@ -167,10 +167,12 @@ The API returns a PIT ID. [source,console-result] ---- { - "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==" + "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", + "_shards": ... } ---- // TESTRESPONSE[s/"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="/"id": $body.id/] +// TESTRESPONSE[s/"_shards": \.\.\./"_shards": "$body._shards"/] To get the first page of results, submit a search request with a `sort` argument. If using a PIT, specify the PIT ID in the `pit.id` parameter and omit From 9061b2f80b149f32f97867e55bac2b974b0f7185 Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Fri, 2 Aug 2024 13:13:38 +0300 Subject: [PATCH 08/18] Update docs/changelog/111516.yaml --- docs/changelog/111516.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/111516.yaml diff --git a/docs/changelog/111516.yaml b/docs/changelog/111516.yaml new file mode 100644 index 0000000000000..96e8bd843f750 --- /dev/null +++ b/docs/changelog/111516.yaml @@ -0,0 +1,5 @@ +pr: 111516 +summary: Adding support for `allow_partial_search_results` in PIT +area: Search +type: enhancement +issues: [] From c73f36adc66ff4d7dec2c6aa0da0547c6b0e5830 Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Mon, 5 Aug 2024 16:46:08 +0300 Subject: [PATCH 09/18] addressing PR comments - encoding failed shards when creating PIT --- .../org/elasticsearch/TransportVersions.java | 2 +- .../search/AbstractSearchAsyncAction.java | 2 +- .../action/search/SearchContextId.java | 47 +++++++++- .../action/search/SearchContextIdTests.java | 94 ++++++++++++++----- .../search/TransportSearchActionTests.java | 6 +- .../authz/AuthorizationServiceTests.java | 3 +- 6 files changed, 119 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index f9ddead13681c..2c76064990b80 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -183,7 +183,7 @@ static TransportVersion def(int id) { public static final TransportVersion FIX_VECTOR_SIMILARITY_INNER_HITS = def(8_713_00_0); public static final TransportVersion INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN = def(8_714_00_0); public static final TransportVersion ESQL_ATTRIBUTE_CACHED_SERIALIZATION = def(8_715_00_0); - + public static final TransportVersion ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT = def(8_999_00_0); /* diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 4fd551994e2a0..1e5b5ebbefe48 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -707,7 +707,7 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At final String scrollId = request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults) : null; final BytesReference searchContextId; if (buildPointInTimeFromSearchResults()) { - searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minTransportVersion); + searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minTransportVersion, failures); } else { if (request.source() != null && request.source().pointInTimeBuilder() != null diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java index 95d22e8a9034e..3f70d42be0ac5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.search; import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -26,6 +27,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -36,17 +38,27 @@ public final class SearchContextId { private final Map shards; private final Map aliasFilter; private final transient Set contextIds; + private final Map failedShardsWithCauses; - SearchContextId(Map shards, Map aliasFilter) { + SearchContextId( + Map shards, + Map aliasFilter, + Map failedShards + ) { this.shards = shards; this.aliasFilter = aliasFilter; this.contextIds = shards.values().stream().map(SearchContextIdForNode::getSearchContextId).collect(Collectors.toSet()); + this.failedShardsWithCauses = failedShards; } public Map shards() { return shards; } + public Map failedShards() { + return failedShardsWithCauses; + } + public Map aliasFilter() { return aliasFilter; } @@ -58,12 +70,22 @@ public boolean contains(ShardSearchContextId contextId) { public static BytesReference encode( List searchPhaseResults, Map aliasFilter, - TransportVersion version + TransportVersion version, + ShardSearchFailure[] shardFailures ) { try (var out = new BytesStreamOutput()) { out.setTransportVersion(version); TransportVersion.writeVersion(version, out); out.writeCollection(searchPhaseResults, SearchContextId::writeSearchPhaseResult); + if (out.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) { + Map failedShardsWithCauses = new HashMap<>(); + for (ShardSearchFailure shardFailure : shardFailures) { + if (shardFailure.shard() != null) { + failedShardsWithCauses.put(shardFailure.shard(), shardFailure.reason()); + } + } + out.writeMap(failedShardsWithCauses, StreamOutput::writeWriteable, StreamOutput::writeString); + } out.writeMap(aliasFilter, StreamOutput::writeWriteable); return out.bytes(); } catch (IOException e) { @@ -85,11 +107,15 @@ public static SearchContextId decode(NamedWriteableRegistry namedWriteableRegist final Map shards = Collections.unmodifiableMap( in.readCollection(Maps::newHashMapWithExpectedSize, SearchContextId::readShardsMapEntry) ); + Map failedShards = new HashMap<>(); + if (in.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) { + failedShards = in.readMap(SearchShardTarget::new, StreamInput::readString); + } final Map aliasFilters = in.readImmutableMap(AliasFilter::readFrom); if (in.available() > 0) { throw new IllegalArgumentException("Not all bytes were read"); } - return new SearchContextId(shards, aliasFilters); + return new SearchContextId(shards, aliasFilters, failedShards); } catch (IOException e) { assert false : e; throw new IllegalArgumentException(e); @@ -103,7 +129,11 @@ public static String[] decodeIndices(BytesReference id) { final Map shards = Collections.unmodifiableMap( in.readCollection(Maps::newHashMapWithExpectedSize, SearchContextId::readShardsMapEntry) ); - return new SearchContextId(shards, Collections.emptyMap()).getActualIndices(); + Map failedShards = new HashMap<>(); + if (in.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) { + failedShards = in.readMap(SearchShardTarget::new, StreamInput::readString); + } + return new SearchContextId(shards, Collections.emptyMap(), failedShards).getActualIndices(); } catch (IOException e) { assert false : e; throw new IllegalArgumentException(e); @@ -126,6 +156,15 @@ public String[] getActualIndices() { indices.add(clusterAlias + RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR + indexName); } } + for (Map.Entry entry : failedShards().entrySet()) { + final String indexName = entry.getKey().getIndex(); + final String clusterAlias = entry.getKey().getClusterAlias(); + if (Strings.isEmpty(clusterAlias)) { + indices.add(indexName); + } else { + indices.add(clusterAlias + RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR + indexName); + } + } return indices.toArray(String[]::new); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchContextIdTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchContextIdTests.java index 32157e09e628f..9959aec2c4833 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchContextIdTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchContextIdTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.test.ESTestCase; @@ -25,7 +26,9 @@ import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.nullValue; @@ -52,40 +55,81 @@ public void testEncode() { final AtomicArray queryResults = TransportSearchHelperTests.generateQueryResults(); final TransportVersion version = TransportVersion.current(); final Map aliasFilters = new HashMap<>(); + Map shardSearchFailures = new HashMap<>(); + int idx = 0; for (SearchPhaseResult result : queryResults.asList()) { - final AliasFilter aliasFilter; if (randomBoolean()) { - aliasFilter = AliasFilter.of(randomQueryBuilder()); - } else if (randomBoolean()) { - aliasFilter = AliasFilter.of(randomQueryBuilder(), "alias-" + between(1, 10)); + shardSearchFailures.put( + result.getSearchShardTarget(), + new ShardSearchFailure(new UnsupportedOperationException("simulated failure"), result.getSearchShardTarget()) + ); + queryResults.set(idx, null); } else { - aliasFilter = AliasFilter.EMPTY; - } - if (randomBoolean()) { - aliasFilters.put(result.getSearchShardTarget().getShardId().getIndex().getUUID(), aliasFilter); + final AliasFilter aliasFilter; + if (randomBoolean()) { + aliasFilter = AliasFilter.of(randomQueryBuilder()); + } else if (randomBoolean()) { + aliasFilter = AliasFilter.of(randomQueryBuilder(), "alias-" + between(1, 10)); + } else { + aliasFilter = AliasFilter.EMPTY; + } + if (randomBoolean()) { + aliasFilters.put(result.getSearchShardTarget().getShardId().getIndex().getUUID(), aliasFilter); + } } + idx += 1; } - final BytesReference id = SearchContextId.encode(queryResults.asList(), aliasFilters, version); + final int shardsFailed = shardSearchFailures.size(); + final BytesReference id = SearchContextId.encode( + queryResults.asList(), + aliasFilters, + version, + shardSearchFailures.values().toArray(ShardSearchFailure[]::new) + ); final SearchContextId context = SearchContextId.decode(namedWriteableRegistry, id); - assertThat(context.shards().keySet(), hasSize(3)); + assertThat(context.shards().keySet(), hasSize(3 - shardsFailed)); + assertThat(context.failedShards().keySet(), hasSize(shardsFailed)); assertThat(context.aliasFilter(), equalTo(aliasFilters)); - SearchContextIdForNode node1 = context.shards().get(new ShardId("idx", "uuid1", 2)); - assertThat(node1.getClusterAlias(), equalTo("cluster_x")); - assertThat(node1.getNode(), equalTo("node_1")); - assertThat(node1.getSearchContextId().getId(), equalTo(1L)); - assertThat(node1.getSearchContextId().getSessionId(), equalTo("a")); - SearchContextIdForNode node2 = context.shards().get(new ShardId("idy", "uuid2", 42)); - assertThat(node2.getClusterAlias(), equalTo("cluster_y")); - assertThat(node2.getNode(), equalTo("node_2")); - assertThat(node2.getSearchContextId().getId(), equalTo(12L)); - assertThat(node2.getSearchContextId().getSessionId(), equalTo("b")); + ShardId shardIdForNode1 = new ShardId("idx", "uuid1", 2); + SearchShardTarget shardTargetForNode1 = new SearchShardTarget("node_1", shardIdForNode1, "cluster_x"); + if (shardSearchFailures.containsKey(shardTargetForNode1)) { + assertThat(context.failedShards(), hasKey(shardTargetForNode1)); + assertThat(context.failedShards().get(shardTargetForNode1), containsString("simulated failure")); + } else { + SearchContextIdForNode node1 = context.shards().get(shardIdForNode1); + assertThat(node1.getClusterAlias(), equalTo("cluster_x")); + assertThat(node1.getNode(), equalTo("node_1")); + assertThat(node1.getSearchContextId().getId(), equalTo(1L)); + assertThat(node1.getSearchContextId().getSessionId(), equalTo("a")); + } + + ShardId shardIdForNode2 = new ShardId("idy", "uuid2", 42); + SearchShardTarget shardTargetForNode2 = new SearchShardTarget("node_2", shardIdForNode2, "cluster_y"); + if (shardSearchFailures.containsKey(shardTargetForNode2)) { + assertThat(context.failedShards(), hasKey(shardTargetForNode2)); + assertThat(context.failedShards().get(shardTargetForNode2), containsString("simulated failure")); - SearchContextIdForNode node3 = context.shards().get(new ShardId("idy", "uuid2", 43)); - assertThat(node3.getClusterAlias(), nullValue()); - assertThat(node3.getNode(), equalTo("node_3")); - assertThat(node3.getSearchContextId().getId(), equalTo(42L)); - assertThat(node3.getSearchContextId().getSessionId(), equalTo("c")); + } else { + SearchContextIdForNode node2 = context.shards().get(shardIdForNode2); + assertThat(node2.getClusterAlias(), equalTo("cluster_y")); + assertThat(node2.getNode(), equalTo("node_2")); + assertThat(node2.getSearchContextId().getId(), equalTo(12L)); + assertThat(node2.getSearchContextId().getSessionId(), equalTo("b")); + } + + ShardId shardIdForNode3 = new ShardId("idy", "uuid2", 43); + SearchShardTarget shardTargetForNode3 = new SearchShardTarget("node_3", shardIdForNode3, null); + if (shardSearchFailures.containsKey(shardTargetForNode3)) { + assertThat(context.failedShards(), hasKey(shardTargetForNode3)); + assertThat(context.failedShards().get(shardTargetForNode3), containsString("simulated failure")); + } else { + SearchContextIdForNode node3 = context.shards().get(shardIdForNode3); + assertThat(node3.getClusterAlias(), nullValue()); + assertThat(node3.getNode(), equalTo("node_3")); + assertThat(node3.getSearchContextId().getId(), equalTo(42L)); + assertThat(node3.getSearchContextId().getSessionId(), equalTo("c")); + } final String[] indices = SearchContextId.decodeIndices(id); assertThat(indices.length, equalTo(3)); diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index edd253e945a9b..30b2937c24385 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -1650,7 +1650,7 @@ public void testLocalShardIteratorFromPointInTime() { clusterState, null, null, - new SearchContextId(contexts, aliasFilterMap), + new SearchContextId(contexts, aliasFilterMap, null), keepAlive, randomBoolean() ); @@ -1695,7 +1695,7 @@ public void testLocalShardIteratorFromPointInTime() { clusterState, null, null, - new SearchContextId(contexts, aliasFilterMap), + new SearchContextId(contexts, aliasFilterMap, null), keepAlive, false ); @@ -1706,7 +1706,7 @@ public void testLocalShardIteratorFromPointInTime() { clusterState, null, null, - new SearchContextId(contexts, aliasFilterMap), + new SearchContextId(contexts, aliasFilterMap, null), keepAlive, true ).stream().filter(si -> si.shardId().equals(anotherShardId)).findFirst(); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index 5f878480a7d0d..3be0a17d19253 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -66,6 +66,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchTransportService; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.search.TransportClearScrollAction; import org.elasticsearch.action.search.TransportClosePointInTimeAction; import org.elasticsearch.action.search.TransportMultiSearchAction; @@ -3650,7 +3651,7 @@ private static BytesReference createEncodedPIT(Index index) { ); List results = new ArrayList<>(); results.add(testSearchPhaseResult1); - return SearchContextId.encode(results, Collections.emptyMap(), TransportVersion.current()); + return SearchContextId.encode(results, Collections.emptyMap(), TransportVersion.current(), ShardSearchFailure.EMPTY_ARRAY); } private static class RBACAuthorizationInfoRoleMatcher implements ArgumentMatcher { From 1edf6df0f09704da96a1509b71142afa716f97ec Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Fri, 9 Aug 2024 00:54:59 +0300 Subject: [PATCH 10/18] updating transportversion --- server/src/main/java/org/elasticsearch/TransportVersions.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 2c76064990b80..a7c9ddd4b09ff 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -183,8 +183,7 @@ static TransportVersion def(int id) { public static final TransportVersion FIX_VECTOR_SIMILARITY_INNER_HITS = def(8_713_00_0); public static final TransportVersion INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN = def(8_714_00_0); public static final TransportVersion ESQL_ATTRIBUTE_CACHED_SERIALIZATION = def(8_715_00_0); - - public static final TransportVersion ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT = def(8_999_00_0); + public static final TransportVersion ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT = def(8_716_00_0); /* * STOP! READ THIS FIRST! No, really, From 2f6addc7d323b51f1504cc3904ccde0825ed3479 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Fri, 9 Aug 2024 14:53:04 +0900 Subject: [PATCH 11/18] Add PIT's missing shards as failures in subsequent search request. --- .../search/point-in-time-api.asciidoc | 26 ++++--- .../action/search/PointInTimeIT.java | 24 +++++-- .../action/search/ClearScrollController.java | 4 ++ .../action/search/SearchContextId.java | 67 ++++++------------ .../action/search/SearchContextIdForNode.java | 46 ++++++++++-- .../action/search/TransportSearchAction.java | 58 ++++++++------- .../action/search/SearchContextIdTests.java | 70 ++++++++++++++----- .../search/TransportSearchActionTests.java | 12 ++-- 8 files changed, 195 insertions(+), 112 deletions(-) diff --git a/docs/reference/search/point-in-time-api.asciidoc b/docs/reference/search/point-in-time-api.asciidoc index db30f21dc3b7c..9cd91626c7600 100644 --- a/docs/reference/search/point-in-time-api.asciidoc +++ b/docs/reference/search/point-in-time-api.asciidoc @@ -78,15 +78,18 @@ IMPORTANT: The open point in time request and each subsequent search request can return different `id`; thus always use the most recently received `id` for the next search request. - -In addition to `keep_alive`, we can also define the `allow_partial_search_results` parameter -which specifies whether the <> should tolerate unavailable shards or -<> when initially creating the point in time. If `true`, the point in time will be -created with just the available shards, otherwise the operation will fail if there is at -least one shard that is unavailable. Defaults to `false`. - -The PIT response now also contains a report on the total number of shards, -as well as how many of those were successful when creating the PIT. +In addition to the `keep_alive` parameter, the `allow_partial_search_results` parameter +can also be defined. +This parameter determines whether the <> +should tolerate unavailable shards or <> when +initially creating the PIT. +If set to true, the PIT will be created with the available shards, along with a +reference to any missing ones. +If set to false, the operation will fail if any shard is unavailable. +The default value is false. + +The PIT response includes a summary of the total number of shards, as well as the number +of successful shards when creating the PIT. [source,console] -------------------------------------------------- @@ -108,6 +111,11 @@ POST /my-index-000001/_pit?keep_alive=1m&allow_partial_search_results=true -------------------------------------------------- // NOTCONSOLE +When a PIT that contains shard failures is used in a search request, the missing are +always reported in the search response as a NoShardAvailableActionException exception. +To get rid of these exceptions, a new PIT needs to be created so that shards missing +from the previous PIT can be handled, assuming they become available in the meantime. + [[point-in-time-keep-alive]] ==== Keeping point in time alive The `keep_alive` parameter, which is passed to a open point in time request and diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java index e0e7b491104c6..13f8b61128d7b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; @@ -43,7 +44,6 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; -import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -58,6 +58,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -590,7 +591,7 @@ public void testMissingShardsWithPointInTime() throws Exception { // find which shards to relocate final String nodeId = admin().cluster().prepareNodesInfo(randomDataNode).get().getNodes().get(0).getNode().getId(); - List shardsToRelocate = new ArrayList<>(); + Set shardsToRelocate = new HashSet<>(); for (ShardStats stats : admin().indices().prepareStats(index).get().getShards()) { if (nodeId.equals(stats.getShardRouting().currentNodeId())) { shardsToRelocate.add(stats.getShardRouting().shardId().id()); @@ -664,14 +665,29 @@ public void testMissingShardsWithPointInTime() throws Exception { .setPointInTime(new PointInTimeBuilder(pointInTimeResponseOneNodeDown.getPointInTimeId())), resp -> { assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponseOneNodeDown.getPointInTimeId())); - assertThat(resp.getTotalShards(), equalTo(numShards - shardsRemoved)); + assertThat(resp.getTotalShards(), equalTo(numShards)); assertThat(resp.getSuccessfulShards(), equalTo(numShards - shardsRemoved)); - assertThat(resp.getFailedShards(), equalTo(0)); + assertThat(resp.getFailedShards(), equalTo(shardsRemoved)); + assertThat(resp.getShardFailures().length, equalTo(shardsRemoved)); + for (var failure : resp.getShardFailures()) { + assertTrue(shardsToRelocate.contains(failure.shardId())); + assertThat(failure.getCause(), instanceOf(NoShardAvailableActionException.class)); + } assertNotNull(resp.getHits().getTotalHits()); // we expect less documents as the newly indexed ones should not be part of the PIT assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs)); } ); + + Exception exc = expectThrows( + Exception.class, + () -> prepareSearch().setQuery(new MatchAllQueryBuilder()) + .setPointInTime(new PointInTimeBuilder(pointInTimeResponseOneNodeDown.getPointInTimeId())) + .setAllowPartialSearchResults(false) + .get() + ); + assertThat(exc.getCause().getMessage(), containsString("missing shards")); + } finally { internalCluster().stopNode(newNodeName); } diff --git a/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java b/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java index 04573f72068f3..965b19a69b858 100644 --- a/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java +++ b/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java @@ -166,6 +166,10 @@ public static void closeContexts( final var successes = new AtomicInteger(); try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) { for (SearchContextIdForNode contextId : contextIds) { + if (contextId.getNode() == null) { + // the shard was missing when creating the PIT, ignore. + continue; + } final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode()); if (node != null) { try { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java index 3f70d42be0ac5..059711eff22e5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,27 +37,17 @@ public final class SearchContextId { private final Map shards; private final Map aliasFilter; private final transient Set contextIds; - private final Map failedShardsWithCauses; - SearchContextId( - Map shards, - Map aliasFilter, - Map failedShards - ) { + SearchContextId(Map shards, Map aliasFilter) { this.shards = shards; this.aliasFilter = aliasFilter; this.contextIds = shards.values().stream().map(SearchContextIdForNode::getSearchContextId).collect(Collectors.toSet()); - this.failedShardsWithCauses = failedShards; } public Map shards() { return shards; } - public Map failedShards() { - return failedShardsWithCauses; - } - public Map aliasFilter() { return aliasFilter; } @@ -76,15 +65,26 @@ public static BytesReference encode( try (var out = new BytesStreamOutput()) { out.setTransportVersion(version); TransportVersion.writeVersion(version, out); - out.writeCollection(searchPhaseResults, SearchContextId::writeSearchPhaseResult); - if (out.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) { - Map failedShardsWithCauses = new HashMap<>(); - for (ShardSearchFailure shardFailure : shardFailures) { - if (shardFailure.shard() != null) { - failedShardsWithCauses.put(shardFailure.shard(), shardFailure.reason()); - } + boolean allowNullContextId = out.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT); + int shardSize = searchPhaseResults.size() + (allowNullContextId ? shardFailures.length : 0); + out.writeVInt(shardSize); + for (var searchResult : searchPhaseResults) { + final SearchShardTarget target = searchResult.getSearchShardTarget(); + target.getShardId().writeTo(out); + new SearchContextIdForNode(target.getClusterAlias(), target.getNodeId(), searchResult.getContextId()).writeTo(out); + } + if (allowNullContextId) { + /** + * Shard failures are not encoded if there are nodes in the cluster that have not yet been upgraded to + * {@link TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT}. + * These shards will be silently ignored during searches using this PIT; however, this situation should never occur, + * as failures are not permitted when creating a point in time with versions prior to + * {@link TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT}. + */ + for (var failure : shardFailures) { + failure.shard().getShardId().writeTo(out); + new SearchContextIdForNode(failure.shard().getClusterAlias(), null, null).writeTo(out); } - out.writeMap(failedShardsWithCauses, StreamOutput::writeWriteable, StreamOutput::writeString); } out.writeMap(aliasFilter, StreamOutput::writeWriteable); return out.bytes(); @@ -94,12 +94,6 @@ public static BytesReference encode( } } - private static void writeSearchPhaseResult(StreamOutput out, SearchPhaseResult searchPhaseResult) throws IOException { - final SearchShardTarget target = searchPhaseResult.getSearchShardTarget(); - target.getShardId().writeTo(out); - new SearchContextIdForNode(target.getClusterAlias(), target.getNodeId(), searchPhaseResult.getContextId()).writeTo(out); - } - public static SearchContextId decode(NamedWriteableRegistry namedWriteableRegistry, BytesReference id) { try (var in = new NamedWriteableAwareStreamInput(id.streamInput(), namedWriteableRegistry)) { final TransportVersion version = TransportVersion.readVersion(in); @@ -107,15 +101,11 @@ public static SearchContextId decode(NamedWriteableRegistry namedWriteableRegist final Map shards = Collections.unmodifiableMap( in.readCollection(Maps::newHashMapWithExpectedSize, SearchContextId::readShardsMapEntry) ); - Map failedShards = new HashMap<>(); - if (in.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) { - failedShards = in.readMap(SearchShardTarget::new, StreamInput::readString); - } final Map aliasFilters = in.readImmutableMap(AliasFilter::readFrom); if (in.available() > 0) { throw new IllegalArgumentException("Not all bytes were read"); } - return new SearchContextId(shards, aliasFilters, failedShards); + return new SearchContextId(shards, aliasFilters); } catch (IOException e) { assert false : e; throw new IllegalArgumentException(e); @@ -129,11 +119,7 @@ public static String[] decodeIndices(BytesReference id) { final Map shards = Collections.unmodifiableMap( in.readCollection(Maps::newHashMapWithExpectedSize, SearchContextId::readShardsMapEntry) ); - Map failedShards = new HashMap<>(); - if (in.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) { - failedShards = in.readMap(SearchShardTarget::new, StreamInput::readString); - } - return new SearchContextId(shards, Collections.emptyMap(), failedShards).getActualIndices(); + return new SearchContextId(shards, Collections.emptyMap()).getActualIndices(); } catch (IOException e) { assert false : e; throw new IllegalArgumentException(e); @@ -156,15 +142,6 @@ public String[] getActualIndices() { indices.add(clusterAlias + RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR + indexName); } } - for (Map.Entry entry : failedShards().entrySet()) { - final String indexName = entry.getKey().getIndex(); - final String clusterAlias = entry.getKey().getClusterAlias(); - if (Strings.isEmpty(clusterAlias)) { - indices.add(indexName); - } else { - indices.add(clusterAlias + RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR + indexName); - } - } return indices.toArray(String[]::new); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java index 3071362f552ea..a70ddf6ee14b9 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java @@ -8,6 +8,7 @@ package org.elasticsearch.action.search; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -21,25 +22,59 @@ public final class SearchContextIdForNode implements Writeable { private final ShardSearchContextId searchContextId; private final String clusterAlias; - SearchContextIdForNode(@Nullable String clusterAlias, String node, ShardSearchContextId searchContextId) { + /** + * Contains the details required to retrieve a {@link ShardSearchContextId} for a shard on a specific node. + * + * @param clusterAlias The alias of the cluster, or {@code null} if the shard is local. + * @param node The target node where the search context ID is defined, or {@code null} if the shard is missing or unavailable. + * @param searchContextId The {@link ShardSearchContextId}, or {@code null} if the shard is missing or unavailable. + */ + SearchContextIdForNode(@Nullable String clusterAlias, @Nullable String node, @Nullable ShardSearchContextId searchContextId) { this.node = node; this.clusterAlias = clusterAlias; this.searchContextId = searchContextId; } SearchContextIdForNode(StreamInput in) throws IOException { - this.node = in.readString(); + boolean allowNull = in.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT); + this.node = allowNull ? in.readOptionalString() : in.readString(); this.clusterAlias = in.readOptionalString(); - this.searchContextId = new ShardSearchContextId(in); + this.searchContextId = allowNull ? in.readOptionalWriteable(ShardSearchContextId::new) : new ShardSearchContextId(in); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(node); + boolean allowNull = out.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT); + if (allowNull) { + out.writeOptionalString(node); + } else { + if (node == null) { + // We should never set a null node if the cluster is not fully upgraded to a version that can handle it. + throw new IOException( + "Cannot write null node value to a node in version " + + out.getTransportVersion() + + ". The target node must be specified to retrieve the ShardSearchContextId." + ); + } + out.writeString(node); + } out.writeOptionalString(clusterAlias); - searchContextId.writeTo(out); + if (allowNull) { + out.writeOptionalWriteable(searchContextId); + } else { + if (searchContextId == null) { + // We should never set a null search context id if the cluster is not fully upgraded to a version that can handle it. + throw new IOException( + "Cannot write null search context ID to a node in version " + + out.getTransportVersion() + + ". A valid search context ID is required to identify the shard's search context in this version." + ); + } + searchContextId.writeTo(out); + } } + @Nullable public String getNode() { return node; } @@ -49,6 +84,7 @@ public String getClusterAlias() { return clusterAlias; } + @Nullable public ShardSearchContextId getSearchContextId() { return searchContextId; } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index ae07b412e96f3..3119a1acd3865 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -1116,11 +1116,16 @@ static List getRemoteShardsIteratorFromPointInTime( final String clusterAlias = entry.getKey(); assert clusterAlias.equals(perNode.getClusterAlias()) : clusterAlias + " != " + perNode.getClusterAlias(); final List targetNodes = new ArrayList<>(group.allocatedNodes().size()); - targetNodes.add(perNode.getNode()); - if (perNode.getSearchContextId().getSearcherId() != null) { - for (String node : group.allocatedNodes()) { - if (node.equals(perNode.getNode()) == false) { - targetNodes.add(node); + if (perNode.getNode() != null) { + // If the shard was available when the PIT was created, it's included. + // Otherwise, we add the shard iterator without a target node, allowing a partial search failure to + // be thrown when a search phase attempts to access it. + targetNodes.add(perNode.getNode()); + if (perNode.getSearchContextId().getSearcherId() != null) { + for (String node : group.allocatedNodes()) { + if (node.equals(perNode.getNode()) == false) { + targetNodes.add(node); + } } } } @@ -1216,7 +1221,7 @@ private void executeSearch( assert searchRequest.pointInTimeBuilder() != null; aliasFilter = resolvedIndices.getSearchContextId().aliasFilter(); concreteLocalIndices = resolvedIndices.getLocalIndices() == null ? new String[0] : resolvedIndices.getLocalIndices().indices(); - localShardIterators = getLocalLocalShardsIteratorFromPointInTime( + localShardIterators = getLocalShardsIteratorFromPointInTime( clusterState, searchRequest.indicesOptions(), searchRequest.getLocalClusterAlias(), @@ -1723,7 +1728,7 @@ private static RemoteTransportException wrapRemoteClusterFailure(String clusterA return new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e); } - static List getLocalLocalShardsIteratorFromPointInTime( + static List getLocalShardsIteratorFromPointInTime( ClusterState clusterState, IndicesOptions indicesOptions, String localClusterAlias, @@ -1737,25 +1742,30 @@ static List getLocalLocalShardsIteratorFromPointInTime( if (Strings.isEmpty(perNode.getClusterAlias())) { final ShardId shardId = entry.getKey(); final List targetNodes = new ArrayList<>(2); - try { - final ShardIterator shards = OperationRouting.getShards(clusterState, shardId); - // Prefer executing shard requests on nodes that are part of PIT first. - if (clusterState.nodes().nodeExists(perNode.getNode())) { - targetNodes.add(perNode.getNode()); - } - if (perNode.getSearchContextId().getSearcherId() != null) { - for (ShardRouting shard : shards) { - if (shard.currentNodeId().equals(perNode.getNode()) == false) { - targetNodes.add(shard.currentNodeId()); + if (perNode.getNode() != null) { + // If the shard was available when the PIT was created, it's included. + // Otherwise, we add the shard iterator without a target node, allowing a partial search failure to + // be thrown when a search phase attempts to access it. + try { + final ShardIterator shards = OperationRouting.getShards(clusterState, shardId); + // Prefer executing shard requests on nodes that are part of PIT first. + if (clusterState.nodes().nodeExists(perNode.getNode())) { + targetNodes.add(perNode.getNode()); + } + if (perNode.getSearchContextId().getSearcherId() != null) { + for (ShardRouting shard : shards) { + if (shard.currentNodeId().equals(perNode.getNode()) == false) { + targetNodes.add(shard.currentNodeId()); + } } } - } - } catch (IndexNotFoundException | ShardNotFoundException e) { - // We can hit these exceptions if the index was deleted after creating PIT or the cluster state on - // this coordinating node is outdated. It's fine to ignore these extra "retry-able" target shards - // when allowPartialSearchResults is false - if (allowPartialSearchResults == false) { - throw e; + } catch (IndexNotFoundException | ShardNotFoundException e) { + // We can hit these exceptions if the index was deleted after creating PIT or the cluster state on + // this coordinating node is outdated. It's fine to ignore these extra "retry-able" target shards + // when allowPartialSearchResults is false + if (allowPartialSearchResults == false) { + throw e; + } } } OriginalIndices finalIndices = new OriginalIndices(new String[] { shardId.getIndexName() }, indicesOptions); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchContextIdTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchContextIdTests.java index 9959aec2c4833..0d9cfcd3dafc6 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchContextIdTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchContextIdTests.java @@ -9,6 +9,8 @@ package org.elasticsearch.action.search; import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -21,14 +23,13 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TransportVersionUtils; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.nullValue; @@ -61,7 +62,10 @@ public void testEncode() { if (randomBoolean()) { shardSearchFailures.put( result.getSearchShardTarget(), - new ShardSearchFailure(new UnsupportedOperationException("simulated failure"), result.getSearchShardTarget()) + new ShardSearchFailure( + new NoShardAvailableActionException(result.getSearchShardTarget().getShardId()), + result.getSearchShardTarget() + ) ); queryResults.set(idx, null); } else { @@ -79,7 +83,6 @@ public void testEncode() { } idx += 1; } - final int shardsFailed = shardSearchFailures.size(); final BytesReference id = SearchContextId.encode( queryResults.asList(), aliasFilters, @@ -87,18 +90,18 @@ public void testEncode() { shardSearchFailures.values().toArray(ShardSearchFailure[]::new) ); final SearchContextId context = SearchContextId.decode(namedWriteableRegistry, id); - assertThat(context.shards().keySet(), hasSize(3 - shardsFailed)); - assertThat(context.failedShards().keySet(), hasSize(shardsFailed)); + assertThat(context.shards().keySet(), hasSize(3)); + // TODO assertThat(context.failedShards().keySet(), hasSize(shardsFailed)); assertThat(context.aliasFilter(), equalTo(aliasFilters)); ShardId shardIdForNode1 = new ShardId("idx", "uuid1", 2); SearchShardTarget shardTargetForNode1 = new SearchShardTarget("node_1", shardIdForNode1, "cluster_x"); + SearchContextIdForNode node1 = context.shards().get(shardIdForNode1); + assertThat(node1.getClusterAlias(), equalTo("cluster_x")); if (shardSearchFailures.containsKey(shardTargetForNode1)) { - assertThat(context.failedShards(), hasKey(shardTargetForNode1)); - assertThat(context.failedShards().get(shardTargetForNode1), containsString("simulated failure")); + assertNull(node1.getNode()); + assertNull(node1.getSearchContextId()); } else { - SearchContextIdForNode node1 = context.shards().get(shardIdForNode1); - assertThat(node1.getClusterAlias(), equalTo("cluster_x")); assertThat(node1.getNode(), equalTo("node_1")); assertThat(node1.getSearchContextId().getId(), equalTo(1L)); assertThat(node1.getSearchContextId().getSessionId(), equalTo("a")); @@ -106,13 +109,12 @@ public void testEncode() { ShardId shardIdForNode2 = new ShardId("idy", "uuid2", 42); SearchShardTarget shardTargetForNode2 = new SearchShardTarget("node_2", shardIdForNode2, "cluster_y"); + SearchContextIdForNode node2 = context.shards().get(shardIdForNode2); + assertThat(node2.getClusterAlias(), equalTo("cluster_y")); if (shardSearchFailures.containsKey(shardTargetForNode2)) { - assertThat(context.failedShards(), hasKey(shardTargetForNode2)); - assertThat(context.failedShards().get(shardTargetForNode2), containsString("simulated failure")); - + assertNull(node2.getNode()); + assertNull(node2.getSearchContextId()); } else { - SearchContextIdForNode node2 = context.shards().get(shardIdForNode2); - assertThat(node2.getClusterAlias(), equalTo("cluster_y")); assertThat(node2.getNode(), equalTo("node_2")); assertThat(node2.getSearchContextId().getId(), equalTo(12L)); assertThat(node2.getSearchContextId().getSessionId(), equalTo("b")); @@ -120,12 +122,12 @@ public void testEncode() { ShardId shardIdForNode3 = new ShardId("idy", "uuid2", 43); SearchShardTarget shardTargetForNode3 = new SearchShardTarget("node_3", shardIdForNode3, null); + SearchContextIdForNode node3 = context.shards().get(shardIdForNode3); + assertThat(node3.getClusterAlias(), nullValue()); if (shardSearchFailures.containsKey(shardTargetForNode3)) { - assertThat(context.failedShards(), hasKey(shardTargetForNode3)); - assertThat(context.failedShards().get(shardTargetForNode3), containsString("simulated failure")); + assertNull(node3.getNode()); + assertNull(node3.getSearchContextId()); } else { - SearchContextIdForNode node3 = context.shards().get(shardIdForNode3); - assertThat(node3.getClusterAlias(), nullValue()); assertThat(node3.getNode(), equalTo("node_3")); assertThat(node3.getSearchContextId().getId(), equalTo(42L)); assertThat(node3.getSearchContextId().getSessionId(), equalTo("c")); @@ -137,4 +139,34 @@ public void testEncode() { assertThat(indices[1], equalTo("cluster_y:idy")); assertThat(indices[2], equalTo("idy")); } + + public void testFailuresIgnoredInPreviousVersion() { + final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry( + List.of( + new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new), + new NamedWriteableRegistry.Entry(QueryBuilder.class, MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new), + new NamedWriteableRegistry.Entry(QueryBuilder.class, IdsQueryBuilder.NAME, IdsQueryBuilder::new) + ) + ); + final AtomicArray queryResults = TransportSearchHelperTests.generateQueryResults(); + final TransportVersion version = TransportVersionUtils.randomVersionBetween( + random(), + TransportVersionUtils.getFirstVersion(), + TransportVersionUtils.getPreviousVersion(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT) + ); + Map shardSearchFailures = new HashMap<>(); + + final BytesReference id = SearchContextId.encode( + queryResults.asList(), + Map.of(), + version, + shardSearchFailures.values().toArray(ShardSearchFailure[]::new) + ); + + final SearchContextId context = SearchContextId.decode(namedWriteableRegistry, id); + assertThat(context.shards().size(), equalTo(queryResults.asList().size())); + for (var result : queryResults.asList()) { + assertTrue(context.shards().containsKey(result.getSearchShardTarget().getShardId())); + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 30b2937c24385..6621f2055968f 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -1646,11 +1646,11 @@ public void testLocalShardIteratorFromPointInTime() { } TimeValue keepAlive = randomBoolean() ? null : TimeValue.timeValueSeconds(between(30, 3600)); - final List shardIterators = TransportSearchAction.getLocalLocalShardsIteratorFromPointInTime( + final List shardIterators = TransportSearchAction.getLocalShardsIteratorFromPointInTime( clusterState, null, null, - new SearchContextId(contexts, aliasFilterMap, null), + new SearchContextId(contexts, aliasFilterMap), keepAlive, randomBoolean() ); @@ -1691,22 +1691,22 @@ public void testLocalShardIteratorFromPointInTime() { ) ); IndexNotFoundException error = expectThrows(IndexNotFoundException.class, () -> { - TransportSearchAction.getLocalLocalShardsIteratorFromPointInTime( + TransportSearchAction.getLocalShardsIteratorFromPointInTime( clusterState, null, null, - new SearchContextId(contexts, aliasFilterMap, null), + new SearchContextId(contexts, aliasFilterMap), keepAlive, false ); }); assertThat(error.getIndex().getName(), equalTo("another-index")); // Ok when some indices don't exist and `allowPartialSearchResults` is true. - Optional anotherShardIterator = TransportSearchAction.getLocalLocalShardsIteratorFromPointInTime( + Optional anotherShardIterator = TransportSearchAction.getLocalShardsIteratorFromPointInTime( clusterState, null, null, - new SearchContextId(contexts, aliasFilterMap, null), + new SearchContextId(contexts, aliasFilterMap), keepAlive, true ).stream().filter(si -> si.shardId().equals(anotherShardId)).findFirst(); From f1e47b5cf5715542f4162da96610d75585e91ef1 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Tue, 13 Aug 2024 09:29:05 +0900 Subject: [PATCH 12/18] apply review comment --- .../org/elasticsearch/action/search/OpenPointInTimeRequest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java b/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java index b77d9142fe73c..146418839f063 100644 --- a/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java @@ -83,6 +83,8 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) { out.writeBoolean(allowPartialSearchResults); + } else if (allowPartialSearchResults) { + throw new IOException("[allow_partial_search_results] is not supported on nodes with version " + out.getTransportVersion()); } } From b151ab17dcbbaafca68bdfb2b7dc71ed431712d0 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Thu, 15 Aug 2024 10:11:49 +0900 Subject: [PATCH 13/18] Apply review comment --- .../action/search/PointInTimeIT.java | 13 +++++--- .../action/search/SearchContextId.java | 10 ++---- .../TransportOpenPointInTimeAction.java | 25 ++++++++++++++- .../action/search/SearchContextIdTests.java | 32 ------------------- 4 files changed, 36 insertions(+), 44 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java index 13f8b61128d7b..b2dee6c9845a8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java @@ -608,6 +608,8 @@ public void testMissingShardsWithPointInTime() throws Exception { // verify that not all documents can now be retrieved assertResponse(prepareSearch().setQuery(new MatchAllQueryBuilder()), resp -> { + assertThat(resp.getSuccessfulShards(), equalTo(numShards)); + assertThat(resp.getShardFailures(), equalTo(0)); assertNotNull(resp.getHits().getTotalHits()); assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs)); }); @@ -620,16 +622,18 @@ public void testMissingShardsWithPointInTime() throws Exception { ); try { // assert that some shards are indeed missing from PIT - assertThat(numShards, equalTo(pointInTimeResponseOneNodeDown.getTotalShards())); - assertThat(numShards - shardsToRelocate.size(), equalTo(pointInTimeResponseOneNodeDown.getSuccessfulShards())); - assertThat(shardsToRelocate.size(), equalTo(pointInTimeResponseOneNodeDown.getFailedShards())); - assertThat(0, equalTo(pointInTimeResponseOneNodeDown.getSkippedShards())); + assertThat(pointInTimeResponseOneNodeDown.getTotalShards(), equalTo(numShards)); + assertThat(pointInTimeResponseOneNodeDown.getSuccessfulShards(), equalTo(numShards - shardsToRelocate.size())); + assertThat(pointInTimeResponseOneNodeDown.getFailedShards(), equalTo(shardsToRelocate.size())); + assertThat(pointInTimeResponseOneNodeDown.getSkippedShards(), equalTo(0)); // ensure that the response now contains fewer documents than the total number of indexed documents assertResponse( prepareSearch().setQuery(new MatchAllQueryBuilder()) .setPointInTime(new PointInTimeBuilder(pointInTimeResponseOneNodeDown.getPointInTimeId())), resp -> { + assertThat(resp.getSuccessfulShards(), equalTo(numShards - shardsToRelocate.size())); + assertThat(resp.getShardFailures(), equalTo(shardsToRelocate.size())); assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponseOneNodeDown.getPointInTimeId())); assertNotNull(resp.getHits().getTotalHits()); assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs)); @@ -654,6 +658,7 @@ public void testMissingShardsWithPointInTime() throws Exception { // ensure that we now see at least numDocs results from the updated index assertResponse(prepareSearch().setQuery(new MatchAllQueryBuilder()), resp -> { assertThat(resp.getSuccessfulShards(), equalTo(numShards)); + assertThat(resp.getFailedShards(), equalTo(0)); assertNotNull(resp.getHits().getTotalHits()); assertThat(resp.getHits().getTotalHits().value, greaterThan((long) numDocs)); }); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java index 059711eff22e5..3e4bce7afd7bd 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java @@ -62,6 +62,9 @@ public static BytesReference encode( TransportVersion version, ShardSearchFailure[] shardFailures ) { + assert shardFailures.length == 0 || version.onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT) + : "[allow_partial_search_results] cannot be enabled on a cluster that has not been fully upgraded to version [" + + TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT + "] or higher."; try (var out = new BytesStreamOutput()) { out.setTransportVersion(version); TransportVersion.writeVersion(version, out); @@ -74,13 +77,6 @@ public static BytesReference encode( new SearchContextIdForNode(target.getClusterAlias(), target.getNodeId(), searchResult.getContextId()).writeTo(out); } if (allowNullContextId) { - /** - * Shard failures are not encoded if there are nodes in the cluster that have not yet been upgraded to - * {@link TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT}. - * These shards will be silently ignored during searches using this PIT; however, this situation should never occur, - * as failures are not permitted when creating a point in time with versions prior to - * {@link TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT}. - */ for (var failure : shardFailures) { failure.shard().getShardId().writeTo(out); new SearchContextIdForNode(failure.shard().getClusterAlias(), null, null).writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index d156798252ec5..14367f0eeb030 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -10,6 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionType; @@ -21,6 +23,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -29,6 +32,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; @@ -50,6 +54,8 @@ import java.util.concurrent.Executor; import java.util.function.BiFunction; +import static org.elasticsearch.core.Strings.format; + public class TransportOpenPointInTimeAction extends HandledTransportAction { private static final Logger logger = LogManager.getLogger(TransportOpenPointInTimeAction.class); @@ -62,6 +68,7 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction listener) { + final ClusterState clusterState = clusterService.state(); + // Check if all the nodes in this cluster know about the service + if (request.allowPartialSearchResults() + && clusterState.getMinTransportVersion().before(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) { + listener.onFailure( + new ElasticsearchStatusException( + format( + "The [allow_partial_search_results] parameter cannot be used while the cluster is still upgrading. Please wait until the upgrade is fully completed and try again." + ), + RestStatus.BAD_REQUEST + ) + ); + return; + } final SearchRequest searchRequest = new SearchRequest().indices(request.indices()) .indicesOptions(request.indicesOptions()) .preference(request.preference()) diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchContextIdTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchContextIdTests.java index 0d9cfcd3dafc6..af7068152648f 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchContextIdTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchContextIdTests.java @@ -9,7 +9,6 @@ package org.elasticsearch.action.search; import org.elasticsearch.TransportVersion; -import org.elasticsearch.TransportVersions; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -23,7 +22,6 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.TransportVersionUtils; import java.util.HashMap; import java.util.List; @@ -139,34 +137,4 @@ public void testEncode() { assertThat(indices[1], equalTo("cluster_y:idy")); assertThat(indices[2], equalTo("idy")); } - - public void testFailuresIgnoredInPreviousVersion() { - final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry( - List.of( - new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new), - new NamedWriteableRegistry.Entry(QueryBuilder.class, MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new), - new NamedWriteableRegistry.Entry(QueryBuilder.class, IdsQueryBuilder.NAME, IdsQueryBuilder::new) - ) - ); - final AtomicArray queryResults = TransportSearchHelperTests.generateQueryResults(); - final TransportVersion version = TransportVersionUtils.randomVersionBetween( - random(), - TransportVersionUtils.getFirstVersion(), - TransportVersionUtils.getPreviousVersion(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT) - ); - Map shardSearchFailures = new HashMap<>(); - - final BytesReference id = SearchContextId.encode( - queryResults.asList(), - Map.of(), - version, - shardSearchFailures.values().toArray(ShardSearchFailure[]::new) - ); - - final SearchContextId context = SearchContextId.decode(namedWriteableRegistry, id); - assertThat(context.shards().size(), equalTo(queryResults.asList().size())); - for (var result : queryResults.asList()) { - assertTrue(context.shards().containsKey(result.getSearchShardTarget().getShardId())); - } - } } From c55aabc6eebc59e84401cfbe6ee449fb9f7402dc Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Thu, 15 Aug 2024 10:17:04 +0900 Subject: [PATCH 14/18] fix conflict --- .../action/search/TransportOpenPointInTimeAction.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index 08ddc207cc941..919a8550c84d1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -23,11 +23,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; -<<<<<<< HEAD import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -======= ->>>>>>> upstream/main import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -35,11 +31,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.ShardId; -<<<<<<< HEAD -import org.elasticsearch.rest.RestStatus; -======= import org.elasticsearch.injection.guice.Inject; ->>>>>>> upstream/main import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; From 2bee58c77d2541289b07dd060086a00f5b6dec48 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Thu, 15 Aug 2024 10:21:31 +0900 Subject: [PATCH 15/18] fix another conflict --- server/src/main/java/org/elasticsearch/TransportVersions.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 765eefc059e5f..28057fba3cfaf 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -190,7 +190,6 @@ static TransportVersion def(int id) { public static final TransportVersion ML_INFERENCE_EIS_INTEGRATION_ADDED = def(8_720_00_0); public static final TransportVersion INGEST_PIPELINE_EXCEPTION_ADDED = def(8_721_00_0); public static final TransportVersion ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT = def(8_722_00_0); - /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ From 88c22dc54f530f27677c86aadbf5b9e27db9c98d Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Thu, 15 Aug 2024 10:30:03 +0900 Subject: [PATCH 16/18] missing import --- .../action/search/TransportOpenPointInTimeAction.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index 919a8550c84d1..89077618665d3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; @@ -109,7 +110,8 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen listener.onFailure( new ElasticsearchStatusException( format( - "The [allow_partial_search_results] parameter cannot be used while the cluster is still upgrading. Please wait until the upgrade is fully completed and try again." + "The [allow_partial_search_results] parameter cannot be used while the cluster is still upgrading. " + + "Please wait until the upgrade is fully completed and try again." ), RestStatus.BAD_REQUEST ) From ad75a744468e788dcb6f1101abe61162e0ed12c3 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Thu, 15 Aug 2024 12:08:41 +0900 Subject: [PATCH 17/18] fix format --- .../org/elasticsearch/action/search/SearchContextId.java | 5 +++-- .../action/search/TransportOpenPointInTimeAction.java | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java index 3e4bce7afd7bd..2e4dc724413ea 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java @@ -63,8 +63,9 @@ public static BytesReference encode( ShardSearchFailure[] shardFailures ) { assert shardFailures.length == 0 || version.onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT) - : "[allow_partial_search_results] cannot be enabled on a cluster that has not been fully upgraded to version [" - + TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT + "] or higher."; + : "[allow_partial_search_results] cannot be enabled on a cluster that has not been fully upgraded to version [" + + TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT + + "] or higher."; try (var out = new BytesStreamOutput()) { out.setTransportVersion(version); TransportVersion.writeVersion(version, out); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index 89077618665d3..717b1805547be 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -110,8 +110,8 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen listener.onFailure( new ElasticsearchStatusException( format( - "The [allow_partial_search_results] parameter cannot be used while the cluster is still upgrading. " + - "Please wait until the upgrade is fully completed and try again." + "The [allow_partial_search_results] parameter cannot be used while the cluster is still upgrading. " + + "Please wait until the upgrade is fully completed and try again." ), RestStatus.BAD_REQUEST ) From 91b4d8c6535e9b3b3fe54cb6591f2dfa17d21d81 Mon Sep 17 00:00:00 2001 From: Panagiotis Bailis Date: Mon, 26 Aug 2024 10:27:26 +0300 Subject: [PATCH 18/18] fixing test after updates --- .../elasticsearch/action/search/PointInTimeIT.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java index b2dee6c9845a8..da2dfc50d7fe9 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java @@ -608,8 +608,8 @@ public void testMissingShardsWithPointInTime() throws Exception { // verify that not all documents can now be retrieved assertResponse(prepareSearch().setQuery(new MatchAllQueryBuilder()), resp -> { - assertThat(resp.getSuccessfulShards(), equalTo(numShards)); - assertThat(resp.getShardFailures(), equalTo(0)); + assertThat(resp.getSuccessfulShards(), equalTo(numShards - shardsRemoved)); + assertThat(resp.getFailedShards(), equalTo(shardsRemoved)); assertNotNull(resp.getHits().getTotalHits()); assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs)); }); @@ -623,8 +623,8 @@ public void testMissingShardsWithPointInTime() throws Exception { try { // assert that some shards are indeed missing from PIT assertThat(pointInTimeResponseOneNodeDown.getTotalShards(), equalTo(numShards)); - assertThat(pointInTimeResponseOneNodeDown.getSuccessfulShards(), equalTo(numShards - shardsToRelocate.size())); - assertThat(pointInTimeResponseOneNodeDown.getFailedShards(), equalTo(shardsToRelocate.size())); + assertThat(pointInTimeResponseOneNodeDown.getSuccessfulShards(), equalTo(numShards - shardsRemoved)); + assertThat(pointInTimeResponseOneNodeDown.getFailedShards(), equalTo(shardsRemoved)); assertThat(pointInTimeResponseOneNodeDown.getSkippedShards(), equalTo(0)); // ensure that the response now contains fewer documents than the total number of indexed documents @@ -632,8 +632,8 @@ public void testMissingShardsWithPointInTime() throws Exception { prepareSearch().setQuery(new MatchAllQueryBuilder()) .setPointInTime(new PointInTimeBuilder(pointInTimeResponseOneNodeDown.getPointInTimeId())), resp -> { - assertThat(resp.getSuccessfulShards(), equalTo(numShards - shardsToRelocate.size())); - assertThat(resp.getShardFailures(), equalTo(shardsToRelocate.size())); + assertThat(resp.getSuccessfulShards(), equalTo(numShards - shardsRemoved)); + assertThat(resp.getFailedShards(), equalTo(shardsRemoved)); assertThat(resp.pointInTimeId(), equalTo(pointInTimeResponseOneNodeDown.getPointInTimeId())); assertNotNull(resp.getHits().getTotalHits()); assertThat(resp.getHits().getTotalHits().value, lessThan((long) numDocs));