Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time series based workload desc order optimization through reverse segment read #7244

Merged
merged 19 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce full support for searchable snapshots ([#5087](https://github.com/opensearch-project/OpenSearch/issues/5087))
- Introduce full support for Segment Replication ([#5147](https://github.com/opensearch-project/OpenSearch/issues/5147))
- [Remote Store] Add support for enabling Remote Store at Cluster level by default. ([#6932](https://github.com/opensearch-project/OpenSearch/pull/6932))
- Time series based workload desc order optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down Expand Up @@ -146,4 +147,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public void setUp() throws Exception {
IndexSearcher.getDefaultQueryCache(),
ALWAYS_CACHE_POLICY,
true,
executor
executor,
false
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,8 @@ private static ContextIndexSearcher newContextSearcher(IndexReader reader, Execu
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
executor
executor,
false
);
}

Expand All @@ -1164,7 +1165,8 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
executor
executor,
false
) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,8 @@ private static ContextIndexSearcher newContextSearcher(IndexReader reader, Execu
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
executor
executor,
false
);
}

Expand All @@ -1176,7 +1177,8 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
executor
executor,
false
) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.MAX_OPEN_SCROLL_CONTEXT,
SearchService.MAX_OPEN_PIT_CONTEXT,
SearchService.MAX_PIT_KEEPALIVE_SETTING,
SearchService.SEARCH_SEGMENTS_REVERSE_ORDER_OPTIMIZATION,
CreatePitController.PIT_INIT_KEEP_ALIVE,
Node.WRITE_PORTS_FILE_SETTING,
Node.NODE_NAME_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ final class DefaultSearchContext extends SearchContext {
boolean lowLevelCancellation,
Version minNodeVersion,
boolean validate,
Executor executor
Executor executor,
boolean searchSegmentOrderReversed
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand All @@ -210,7 +211,8 @@ final class DefaultSearchContext extends SearchContext {
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
lowLevelCancellation,
executor
executor,
searchSegmentOrderReversed
);
this.relativeTimeSupplier = relativeTimeSupplier;
this.timeout = timeout;
Expand Down
45 changes: 44 additions & 1 deletion server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import org.opensearch.search.sort.MinAndMax;
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.search.sort.SortBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.search.suggest.Suggest;
import org.opensearch.search.suggest.completion.CompletionSuggestion;
import org.opensearch.threadpool.Scheduler.Cancellable;
Expand Down Expand Up @@ -234,6 +235,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

public static final Setting<Boolean> SEARCH_SEGMENTS_REVERSE_ORDER_OPTIMIZATION = Setting.boolSetting(
gashutos marked this conversation as resolved.
Show resolved Hide resolved
"search.search_segments_reverse_order_optimization",
false,
Property.Dynamic,
Property.NodeScope
);

/**
* This setting defines the maximum number of active PIT reader contexts in the node , since each PIT context
* has a resource cost attached to it. This setting is less than scroll since users are
Expand Down Expand Up @@ -282,6 +290,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private volatile int maxOpenScrollContext;

private volatile boolean searchSegmentOrderReversed;

private volatile int maxOpenPitContext;

private final Cancellable keepAliveReaper;
Expand Down Expand Up @@ -356,6 +366,10 @@ public SearchService(

lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);

searchSegmentOrderReversed = SEARCH_SEGMENTS_REVERSE_ORDER_OPTIMIZATION.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_SEGMENTS_REVERSE_ORDER_OPTIMIZATION, this::setSearchSegmentOrderReversed);
}

private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
Expand Down Expand Up @@ -432,6 +446,14 @@ private void setLowLevelCancellation(Boolean lowLevelCancellation) {
this.lowLevelCancellation = lowLevelCancellation;
}

private void setSearchSegmentOrderReversed(boolean reversed) {
this.searchSegmentOrderReversed = reversed;
}

private boolean getSearchSegmentOrderReversed() {
return this.searchSegmentOrderReversed;
}

@Override
public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRemovalReason reason) {
// once an index is removed due to deletion or closing, we can just clean up all the pending search context information
Expand Down Expand Up @@ -1038,7 +1060,8 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
lowLevelCancellation,
clusterService.state().nodes().getMinNodeVersion(),
validate,
indexSearcherExecutor
indexSearcherExecutor,
shouldReverseLeafReaderContexts(request)
);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
Expand All @@ -1058,6 +1081,26 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
return searchContext;
}

private boolean shouldReverseLeafReaderContexts(ShardSearchRequest request) {
// By default, if search query is on desc order, we will search leaves in reverse order.
// This is beneficial for desc order sort queries on time series based workload,
// where recent data is always on new segments which are in last.
// This won't regress or impact other type of workload where data is randomly distributed
// across segments. So turning it on by default.
// searchSegmentOrderReversed is true by default
if (searchSegmentOrderReversed) {
// Only reverse order for desc order sort queries
if (request != null
&& request.source() != null
&& request.source().sorts() != null
&& request.source().sorts().size() > 0
&& request.source().sorts().get(0).order() == SortOrder.DESC) {
return true;
}
}
return false;
}
gashutos marked this conversation as resolved.
Show resolved Hide resolved

private void freeAllContextForIndex(Index index) {
assert index != null;
for (ReaderContext ctx : activeReaders.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
Expand All @@ -98,15 +99,31 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
private QueryProfiler profiler;
private MutableQueryTimeout cancellable;

/**
* Certain queries can benefit if we reverse the segment read order,
* for example time series based queries if searched for desc sort order
*/
private boolean reverseLeafReaderContexts;
gashutos marked this conversation as resolved.
Show resolved Hide resolved

public ContextIndexSearcher(
IndexReader reader,
Similarity similarity,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
boolean wrapWithExitableDirectoryReader,
Executor executor
Executor executor,
boolean reverseLeafReaderContexts
gashutos marked this conversation as resolved.
Show resolved Hide resolved
) throws IOException {
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout(), wrapWithExitableDirectoryReader, executor);
this(
reader,
similarity,
queryCache,
queryCachingPolicy,
new MutableQueryTimeout(),
wrapWithExitableDirectoryReader,
executor,
reverseLeafReaderContexts
);
}

private ContextIndexSearcher(
Expand All @@ -116,13 +133,15 @@ private ContextIndexSearcher(
QueryCachingPolicy queryCachingPolicy,
MutableQueryTimeout cancellable,
boolean wrapWithExitableDirectoryReader,
Executor executor
Executor executor,
boolean reverseLeafReaderContexts
) throws IOException {
super(wrapWithExitableDirectoryReader ? new ExitableDirectoryReader((DirectoryReader) reader, cancellable) : reader, executor);
setSimilarity(similarity);
setQueryCache(queryCache);
setQueryCachingPolicy(queryCachingPolicy);
this.cancellable = cancellable;
this.reverseLeafReaderContexts = reverseLeafReaderContexts;
}

public void setProfiler(QueryProfiler profiler) {
Expand Down Expand Up @@ -246,6 +265,12 @@ public void search(

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
if (reverseLeafReaderContexts) {
// reverse the segment search order if this flag is true.
// We need to make a copy, so we don't corrupt DirectoryReader leaves order
leaves = new ArrayList<>(leaves);
Collections.reverse(leaves);
}
for (LeafReaderContext ctx : leaves) { // search each subreader
gashutos marked this conversation as resolved.
Show resolved Hide resolved
searchLeaf(ctx, weight, collector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
false,
Version.CURRENT,
false,
executor
executor,
false
);
contextWithoutScroll.from(300);
contextWithoutScroll.close();
Expand Down Expand Up @@ -255,7 +256,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
false,
Version.CURRENT,
false,
executor
executor,
false
);
context1.from(300);
exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess(false));
Expand Down Expand Up @@ -325,7 +327,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
false,
Version.CURRENT,
false,
executor
executor,
false
);

SliceBuilder sliceBuilder = mock(SliceBuilder.class);
Expand Down Expand Up @@ -364,7 +367,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
false,
Version.CURRENT,
false,
executor
executor,
false
);
ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery();
context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false);
Expand Down Expand Up @@ -399,7 +403,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
false,
Version.CURRENT,
false,
executor
executor,
false
);
context4.sliceBuilder(new SliceBuilder(1, 2)).parsedQuery(parsedQuery).preProcess(false);
Query query1 = context4.query();
Expand Down Expand Up @@ -429,7 +434,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
false,
Version.CURRENT,
false,
executor
executor,
false
);
int numSlicesForPit = maxSlicesPerPit + randomIntBetween(1, 100);
when(sliceBuilder.getMax()).thenReturn(numSlicesForPit);
Expand Down Expand Up @@ -526,7 +532,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
false,
Version.CURRENT,
false,
executor
executor,
false
);
assertThat(context.searcher().hasCancellations(), is(false));
context.searcher().addQueryCancellation(() -> {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public void testAddingCancellationActions() throws IOException {
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null
null,
false
);
NullPointerException npe = expectThrows(NullPointerException.class, () -> searcher.addQueryCancellation(null));
assertEquals("cancellation runnable should not be null", npe.getMessage());
Expand All @@ -129,7 +130,8 @@ public void testCancellableCollector() throws IOException {
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null
null,
false
);

searcher.search(new MatchAllDocsQuery(), collector1);
Expand Down Expand Up @@ -157,7 +159,8 @@ public void testExitableDirectoryReader() throws IOException {
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null
null,
false
);
searcher.addQueryCancellation(cancellation);
CompiledAutomaton automaton = new CompiledAutomaton(new RegExp("a.*").toAutomaton());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ public void onRemoval(ShardId shardId, Accountable accountable) {
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null
null,
false
);

for (LeafReaderContext context : searcher.getIndexReader().leaves()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public void setUp() throws Exception {
IndexSearcher.getDefaultQueryCache(),
ALWAYS_CACHE_POLICY,
true,
null
null,
false
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,8 @@ private static ContextIndexSearcher newContextSearcher(IndexReader reader) throw
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null
null,
false
);
}

Expand All @@ -1181,7 +1182,8 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null
null,
false
) {

@Override
Expand Down
Loading