diff --git a/CHANGELOG.md b/CHANGELOG.md index 16c794135cb23..5f12a9953813e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151)) - Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854)) - Add support for ignoring missing Javadoc on generated code using annotation ([#7604](https://github.com/opensearch-project/OpenSearch/pull/7604)) +- Add partial results support for concurrent segment search ([#8306](https://github.com/opensearch-project/OpenSearch/pull/8306)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/internalClusterTest/java/org/opensearch/search/ConcurrentSegmentSearchCancellationIT.java b/server/src/internalClusterTest/java/org/opensearch/search/ConcurrentSegmentSearchCancellationIT.java new file mode 100644 index 0000000000000..3c50627e342dd --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/ConcurrentSegmentSearchCancellationIT.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.opensearch.common.settings.FeatureFlagSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; + +public class ConcurrentSegmentSearchCancellationIT extends SearchCancellationIT { + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) { + featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY)); + } + featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true"); + return featureSettings.build(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/search/ConcurrentSegmentSearchTimeoutIT.java b/server/src/internalClusterTest/java/org/opensearch/search/ConcurrentSegmentSearchTimeoutIT.java new file mode 100644 index 0000000000000..c19f762679fb0 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/ConcurrentSegmentSearchTimeoutIT.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.opensearch.common.settings.FeatureFlagSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; + +public class ConcurrentSegmentSearchTimeoutIT extends SearchTimeoutIT { + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) { + featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY)); + } + featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true"); + return featureSettings.build(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java index 9db0ac4590efa..92a925be84556 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java @@ -207,10 +207,7 @@ private void verifyCancellationException(ShardSearchFailure[] failures) { // failure may happen while executing the search or while sending shard request for next phase. // Below assertion is handling both the cases final Throwable topFailureCause = searchFailure.getCause(); - assertTrue( - searchFailure.toString(), - topFailureCause instanceof TransportException || topFailureCause instanceof TaskCancelledException - ); + assertTrue(searchFailure.toString(), topFailureCause instanceof RuntimeException); if (topFailureCause instanceof TransportException) { assertTrue(topFailureCause.getCause() instanceof TaskCancelledException); } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchTimeoutIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchTimeoutIT.java index 049dcb50024ba..fea867e7484d8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchTimeoutIT.java @@ -51,7 +51,6 @@ import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.opensearch.index.query.QueryBuilders.scriptQuery; import static org.opensearch.search.SearchTimeoutIT.ScriptedTimeoutPlugin.SCRIPT_NAME; -import static org.hamcrest.Matchers.equalTo; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE) public class SearchTimeoutIT extends OpenSearchIntegTestCase { @@ -67,17 +66,37 @@ protected Settings nodeSettings(int nodeOrdinal) { } public void testSimpleTimeout() throws Exception { - for (int i = 0; i < 32; i++) { + final int numDocs = 1000; + for (int i = 0; i < numDocs; i++) { client().prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get(); } refresh("test"); SearchResponse searchResponse = client().prepareSearch("test") - .setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS)) + .setTimeout(new TimeValue(1, TimeUnit.MILLISECONDS)) .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) .setAllowPartialSearchResults(true) .get(); - assertThat(searchResponse.isTimedOut(), equalTo(true)); + assertTrue(searchResponse.isTimedOut()); + assertEquals(0, searchResponse.getFailedShards()); + assertTrue(numDocs > searchResponse.getHits().getTotalHits().value); + } + + public void testSimpleDoesNotTimeout() throws Exception { + final int numDocs = 10; + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get(); + } + refresh("test"); + + SearchResponse searchResponse = client().prepareSearch("test") + .setTimeout(new TimeValue(10000, TimeUnit.SECONDS)) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) + .setAllowPartialSearchResults(true) + .get(); + assertFalse(searchResponse.isTimedOut()); + assertEquals(0, searchResponse.getFailedShards()); + assertEquals(numDocs, searchResponse.getHits().getTotalHits().value); } public void testPartialResultsIntolerantTimeout() throws Exception { diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index f1650b686f070..d0dcd0fdcd3be 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -73,6 +73,8 @@ import org.opensearch.search.profile.query.ProfileWeight; import org.opensearch.search.profile.query.QueryProfiler; import org.opensearch.search.profile.query.QueryTimingType; +import org.opensearch.search.query.QueryPhase; +import org.opensearch.search.query.QueryPhaseExecutionException; import org.opensearch.search.query.QuerySearchResult; import org.opensearch.search.sort.FieldSortBuilder; import org.opensearch.search.sort.MinAndMax; @@ -310,17 +312,25 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto return; } - cancellable.checkCancelled(); - weight = wrapWeight(weight); - // See please https://github.com/apache/lucene/pull/964 - collector.setWeight(weight); final LeafCollector leafCollector; try { + cancellable.checkCancelled(); + weight = wrapWeight(weight); + // See please https://github.com/apache/lucene/pull/964 + collector.setWeight(weight); leafCollector = collector.getLeafCollector(ctx); } catch (CollectionTerminatedException e) { // there is no doc of interest in this reader context // continue with the following leaf return; + } catch (QueryPhase.TimeExceededException e) { + // Swallow timeout exception but still return + if (searchContext.request().allowPartialSearchResults() == false) { + // Can't rethrow TimeExceededException because not serializable + throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded"); + } + searchContext.queryResult().searchTimedOut(true); + return; } Bits liveDocs = ctx.reader().getLiveDocs(); BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs); @@ -332,6 +342,14 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto } catch (CollectionTerminatedException e) { // collection was terminated prematurely // continue with the following leaf + } catch (QueryPhase.TimeExceededException e) { + // Swallow timeout exception but still return + if (searchContext.request().allowPartialSearchResults() == false) { + // Can't rethrow TimeExceededException because not serializable + throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded"); + } + searchContext.queryResult().searchTimedOut(true); + return; } } } else { @@ -348,6 +366,14 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto } catch (CollectionTerminatedException e) { // collection was terminated prematurely // continue with the following leaf + } catch (QueryPhase.TimeExceededException e) { + // Swallow timeout exception but still return + if (searchContext.request().allowPartialSearchResults() == false) { + // Can't rethrow TimeExceededException because not serializable + throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded"); + } + searchContext.queryResult().searchTimedOut(true); + return; } } }