Skip to content

Commit

Permalink
Add early termination support for concurrent segment search
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Jun 28, 2023
1 parent b9edb5a commit 90a37b8
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
- Add support for ignoring missing Javadoc on generated code using annotation ([#7604](https://github.com/opensearch-project/OpenSearch/pull/7604))
- Add partial results support for concurrent segment search ([#8306](https://github.com/opensearch-project/OpenSearch/pull/8306))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.opensearch.common.settings.FeatureFlagSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;

public class ConcurrentSegmentSearchCancellationIT extends SearchCancellationIT {
@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true");
return featureSettings.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.opensearch.common.settings.FeatureFlagSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;

public class ConcurrentSegmentSearchTimeoutIT extends SearchTimeoutIT {

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true");
return featureSettings.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,7 @@ private void verifyCancellationException(ShardSearchFailure[] failures) {
// failure may happen while executing the search or while sending shard request for next phase.
// Below assertion is handling both the cases
final Throwable topFailureCause = searchFailure.getCause();
assertTrue(
searchFailure.toString(),
topFailureCause instanceof TransportException || topFailureCause instanceof TaskCancelledException
);
assertTrue(searchFailure.toString(), topFailureCause instanceof RuntimeException);
if (topFailureCause instanceof TransportException) {
assertTrue(topFailureCause.getCause() instanceof TaskCancelledException);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.index.query.QueryBuilders.scriptQuery;
import static org.opensearch.search.SearchTimeoutIT.ScriptedTimeoutPlugin.SCRIPT_NAME;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE)
public class SearchTimeoutIT extends OpenSearchIntegTestCase {
Expand All @@ -67,17 +66,37 @@ protected Settings nodeSettings(int nodeOrdinal) {
}

public void testSimpleTimeout() throws Exception {
for (int i = 0; i < 32; i++) {
final int numDocs = 1000;
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get();
}
refresh("test");

SearchResponse searchResponse = client().prepareSearch("test")
.setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS))
.setTimeout(new TimeValue(1, TimeUnit.MILLISECONDS))
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.setAllowPartialSearchResults(true)
.get();
assertThat(searchResponse.isTimedOut(), equalTo(true));
assertTrue(searchResponse.isTimedOut());
assertEquals(0, searchResponse.getFailedShards());
assertTrue(numDocs > searchResponse.getHits().getTotalHits().value);
}

public void testSimpleDoesNotTimeout() throws Exception {
final int numDocs = 10;
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get();
}
refresh("test");

SearchResponse searchResponse = client().prepareSearch("test")
.setTimeout(new TimeValue(10000, TimeUnit.SECONDS))
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.setAllowPartialSearchResults(true)
.get();
assertFalse(searchResponse.isTimedOut());
assertEquals(0, searchResponse.getFailedShards());
assertEquals(numDocs, searchResponse.getHits().getTotalHits().value);
}

public void testPartialResultsIntolerantTimeout() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QueryPhaseExecutionException;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.MinAndMax;
Expand Down Expand Up @@ -310,17 +312,25 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
return;
}

cancellable.checkCancelled();
weight = wrapWeight(weight);
// See please https://github.com/apache/lucene/pull/964
collector.setWeight(weight);
final LeafCollector leafCollector;
try {
cancellable.checkCancelled();
weight = wrapWeight(weight);
// See please https://github.com/apache/lucene/pull/964
collector.setWeight(weight);
leafCollector = collector.getLeafCollector(ctx);
} catch (CollectionTerminatedException e) {
// there is no doc of interest in this reader context
// continue with the following leaf
return;
} catch (QueryPhase.TimeExceededException e) {
// Swallow timeout exception but still return
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
}
searchContext.queryResult().searchTimedOut(true);
return;
}
Bits liveDocs = ctx.reader().getLiveDocs();
BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs);
Expand All @@ -332,6 +342,14 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (QueryPhase.TimeExceededException e) {
// Swallow timeout exception but still return
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
}
searchContext.queryResult().searchTimedOut(true);
return;
}
}
} else {
Expand All @@ -348,6 +366,14 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (QueryPhase.TimeExceededException e) {
// Swallow timeout exception but still return
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
}
searchContext.queryResult().searchTimedOut(true);
return;
}
}
}
Expand Down

0 comments on commit 90a37b8

Please sign in to comment.