From 2eabf81453066c5eba3f29157ba5c544fab464eb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 10 Nov 2021 15:24:46 -0500 Subject: [PATCH 1/3] Minimize search source of shard level search requests --- .../search/builder/SearchSourceBuilder.java | 14 +++++++ .../search/internal/ShardSearchRequest.java | 3 ++ .../search/SearchServiceTests.java | 39 +++++++++++++++++++ 3 files changed, 56 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index d759fa3f777dd..d05ec11fd5a90 100644 --- a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -1809,4 +1809,18 @@ public String toString(Params params) { } } + /** + * Returns a minimal source that discards information that aren't required by shard-level search requests. + * The purpose is to minimize the memory usage and serialization cost of shard-level search requests. + */ + public SearchSourceBuilder minimalSourceForShardRequests() { + if (pointInTimeBuilder != null || collapse != null) { + final SearchSourceBuilder source = shallowCopy(); + source.pointInTimeBuilder(null); + source.collapse(null); + return source; + } else { + return this; + } + } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index cb3f1e80a393c..1f6b12a416779 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -392,6 +392,9 @@ public void setAliasFilter(AliasFilter aliasFilter) { } public void source(SearchSourceBuilder source) { + if (source != null) { + source = source.minimalSourceForShardRequests(); + } this.source = source; } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index ca7ca2f47d4eb..67f28dc8225f7 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -22,6 +22,10 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.ClosePointInTimeAction; +import org.elasticsearch.action.search.ClosePointInTimeRequest; +import org.elasticsearch.action.search.OpenPointInTimeAction; +import org.elasticsearch.action.search.OpenPointInTimeRequest; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -78,6 +82,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.ValueType; +import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchRequest; @@ -105,6 +110,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; @@ -113,6 +119,7 @@ import java.util.function.Consumer; import java.util.function.Function; +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; @@ -126,6 +133,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.Matchers.not; import static org.mockito.Mockito.mock; public class SearchServiceTests extends ESSingleNodeTestCase { @@ -1824,6 +1832,37 @@ public void testWaitOnRefreshTimeout() { assertThat(ex.getMessage(), containsString("Wait for seq_no [0] refreshed timed out [")); } + public void testMinimalSearchSourceInShardRequests() { + createIndex("test"); + int numDocs = between(0, 10); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test").setSource("id", Integer.toString(i)).get(); + } + client().admin().indices().prepareRefresh("test").get(); + + String pitId = client().execute( + OpenPointInTimeAction.INSTANCE, + new OpenPointInTimeRequest("test").keepAlive(TimeValue.timeValueMinutes(10)) + ).actionGet().getPointInTimeId(); + final MockSearchService searchService = (MockSearchService) getInstanceFromNode(SearchService.class); + final List shardRequests = new CopyOnWriteArrayList<>(); + searchService.setOnCreateSearchContext(ctx -> shardRequests.add(ctx.request())); + try { + SearchRequest searchRequest = new SearchRequest().source( + new SearchSourceBuilder().size(between(numDocs, numDocs * 2)).pointInTimeBuilder(new PointInTimeBuilder(pitId)) + ); + final SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertHitCount(searchResponse, numDocs); + } finally { + client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet(); + } + assertThat(shardRequests, not(emptyList())); + for (ShardSearchRequest shardRequest : shardRequests) { + assertNotNull(shardRequest.source()); + assertNotNull(shardRequest.source().pointInTimeBuilder()); + } + } + private ReaderContext createReaderContext(IndexService indexService, IndexShard indexShard) { return new ReaderContext( new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong()), From d99725efd5f247c3ffe1e6b5c8f9e679ef43129e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 15 Nov 2021 09:21:32 -0500 Subject: [PATCH 2/3] Jim's comments --- .../org/elasticsearch/search/builder/SearchSourceBuilder.java | 3 +-- .../org/elasticsearch/search/internal/ShardSearchRequest.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index d05ec11fd5a90..cd24ec3ed0979 100644 --- a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -1814,10 +1814,9 @@ public String toString(Params params) { * The purpose is to minimize the memory usage and serialization cost of shard-level search requests. */ public SearchSourceBuilder minimalSourceForShardRequests() { - if (pointInTimeBuilder != null || collapse != null) { + if (pointInTimeBuilder != null) { final SearchSourceBuilder source = shallowCopy(); source.pointInTimeBuilder(null); - source.collapse(null); return source; } else { return this; diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 1f6b12a416779..6160debe96ae8 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -236,7 +236,7 @@ public ShardSearchRequest(StreamInput in) throws IOException { shardRequestIndex = in.getVersion().onOrAfter(Version.V_7_11_0) ? in.readVInt() : -1; numberOfShards = in.readVInt(); scroll = in.readOptionalWriteable(Scroll::new); - source = in.readOptionalWriteable(SearchSourceBuilder::new); + source(in.readOptionalWriteable(SearchSourceBuilder::new)); if (in.getVersion().before(Version.V_8_0_0)) { // types no longer relevant so ignore String[] types = in.readStringArray(); From 8a415eecfc006153ced627f32b0df4752704b15b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 15 Nov 2021 16:17:43 -0500 Subject: [PATCH 3/3] simplify --- .../search/builder/SearchSourceBuilder.java | 13 ------------- .../search/internal/ShardSearchRequest.java | 15 ++++++++++----- .../elasticsearch/search/SearchServiceTests.java | 1 + 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index cd24ec3ed0979..d759fa3f777dd 100644 --- a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -1809,17 +1809,4 @@ public String toString(Params params) { } } - /** - * Returns a minimal source that discards information that aren't required by shard-level search requests. - * The purpose is to minimize the memory usage and serialization cost of shard-level search requests. - */ - public SearchSourceBuilder minimalSourceForShardRequests() { - if (pointInTimeBuilder != null) { - final SearchSourceBuilder source = shallowCopy(); - source.pointInTimeBuilder(null); - return source; - } else { - return this; - } - } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 6160debe96ae8..a6d7eb837941d 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -43,6 +43,7 @@ import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchSortValuesAndFormats; +import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.sort.FieldSortBuilder; @@ -212,7 +213,7 @@ public ShardSearchRequest( this.shardRequestIndex = shardRequestIndex; this.numberOfShards = numberOfShards; this.searchType = searchType; - this.source = source; + this.source(source); this.requestCache = requestCache; this.aliasFilter = aliasFilter; this.indexBoost = indexBoost; @@ -236,7 +237,7 @@ public ShardSearchRequest(StreamInput in) throws IOException { shardRequestIndex = in.getVersion().onOrAfter(Version.V_7_11_0) ? in.readVInt() : -1; numberOfShards = in.readVInt(); scroll = in.readOptionalWriteable(Scroll::new); - source(in.readOptionalWriteable(SearchSourceBuilder::new)); + source = in.readOptionalWriteable(SearchSourceBuilder::new); if (in.getVersion().before(Version.V_8_0_0)) { // types no longer relevant so ignore String[] types = in.readStringArray(); @@ -285,7 +286,7 @@ public ShardSearchRequest(ShardSearchRequest clone) { this.searchType = clone.searchType; this.numberOfShards = clone.numberOfShards; this.scroll = clone.scroll; - this.source = clone.source; + this.source(clone.source); this.aliasFilter = clone.aliasFilter; this.indexBoost = clone.indexBoost; this.nowInMillis = clone.nowInMillis; @@ -392,8 +393,12 @@ public void setAliasFilter(AliasFilter aliasFilter) { } public void source(SearchSourceBuilder source) { - if (source != null) { - source = source.minimalSourceForShardRequests(); + if (source != null && source.pointInTimeBuilder() != null) { + // Discard the actual point in time as data nodes don't use it to reduce the memory usage and the serialization cost + // of shard-level search requests. However, we need to assign as a dummy PIT instead of null as we verify PIT for + // slice requests on data nodes. + source = source.shallowCopy(); + source.pointInTimeBuilder(new PointInTimeBuilder("")); } this.source = source; } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 67f28dc8225f7..9a95ab04bab4c 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -1860,6 +1860,7 @@ public void testMinimalSearchSourceInShardRequests() { for (ShardSearchRequest shardRequest : shardRequests) { assertNotNull(shardRequest.source()); assertNotNull(shardRequest.source().pointInTimeBuilder()); + assertThat(shardRequest.source().pointInTimeBuilder().getEncodedId(), equalTo("")); } }