From ae792cd2a8ca864ebd0dd3f76fa38a9d9e951f03 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 14 Aug 2023 20:06:58 +0200 Subject: [PATCH] TimeSeriesIndexSearcher to offload to the provided executor (#98414) We have introduced a search worker thread pool with #98204 that is responsible for the heavy workloads as part of the query and dfs phase, no matter if it is parallelized across segments/slices or not. TSDB aggregations are still executed in the search thread pool and this commit moves their computation to the search worker thread pool, despite the corresponding search thread blocks and waits for such computation to be completed before returning. --- docs/changelog/98414.yaml | 5 ++ .../support/TimeSeriesIndexSearcher.java | 36 ++++++++++++-- .../search/internal/ContextIndexSearcher.java | 4 +- .../support/TimeSeriesIndexSearcherTests.java | 47 ++++++++++--------- 4 files changed, 65 insertions(+), 27 deletions(-) create mode 100644 docs/changelog/98414.yaml diff --git a/docs/changelog/98414.yaml b/docs/changelog/98414.yaml new file mode 100644 index 0000000000000..0a245893676e0 --- /dev/null +++ b/docs/changelog/98414.yaml @@ -0,0 +1,5 @@ +pr: 98414 +summary: '`TimeSeriesIndexSearcher` to offload to the provided executor' +area: TSDB +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcher.java index 9aafa31e85c17..16efc62f2704f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcher.java @@ -21,6 +21,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.PriorityQueue; +import org.apache.lucene.util.ThreadInterruptedException; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.lucene.search.function.MinScoreScorer; import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; @@ -36,6 +37,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; import java.util.function.IntSupplier; import static org.elasticsearch.index.IndexSortConfig.TIME_SERIES_SORT; @@ -63,14 +67,17 @@ public TimeSeriesIndexSearcher(IndexSearcher searcher, List cancellati searcher.getSimilarity(), searcher.getQueryCache(), searcher.getQueryCachingPolicy(), - false + false, + searcher.getExecutor(), + 1, + -1 ); } catch (IOException e) { // IOException from wrapping the index searcher which should never happen. throw new RuntimeException(e); } this.cancellations = cancellations; - cancellations.forEach(cancellation -> this.searcher.addQueryCancellation(cancellation)); + cancellations.forEach(this.searcher::addQueryCancellation); assert TIME_SERIES_SORT.length == 2; assert TIME_SERIES_SORT[0].getField().equals(TimeSeriesIdFieldMapper.NAME); @@ -84,9 +91,32 @@ public void setMinimumScore(Float minimumScore) { } public void search(Query query, BucketCollector bucketCollector) throws IOException { - int seen = 0; query = searcher.rewrite(query); Weight weight = searcher.createWeight(query, bucketCollector.scoreMode(), 1); + if (searcher.getExecutor() == null) { + search(bucketCollector, weight); + return; + } + // offload to the search worker thread pool whenever possible. It will be null only when search.worker_threads_enabled is false + RunnableFuture task = new FutureTask<>(() -> { + search(bucketCollector, weight); + return null; + }); + searcher.getExecutor().execute(task); + try { + task.get(); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException runtimeException) { + throw runtimeException; + } + throw new RuntimeException(e.getCause()); + } + } + + private void search(BucketCollector bucketCollector, Weight weight) throws IOException { + int seen = 0; int[] tsidOrd = new int[1]; // Create LeafWalker for each subreader diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index cfcc0da68f3b3..a8c7c8876aa3c 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -151,7 +151,6 @@ public ContextIndexSearcher( this.leafSlices = null; } else { // we offload to the executor unconditionally, including requests that don't support concurrency - // the few cases that don't support offloading entirely won't get an executor this.leafSlices = computeSlices(getLeafContexts(), maximumNumberOfSlices, minimumDocsPerSlice); assert this.leafSlices.length <= maximumNumberOfSlices : "more slices created than the maximum allowed"; } @@ -265,6 +264,9 @@ public static LeafSlice[] computeSlices(List leaves, int maxS if (maxSliceNum < 1) { throw new IllegalArgumentException("maxSliceNum must be >= 1 (got " + maxSliceNum + ")"); } + if (maxSliceNum == 1) { + return new LeafSlice[] { new LeafSlice(new ArrayList<>(leaves)) }; + } // total number of documents to be searched final int numDocs = leaves.stream().mapToInt(l -> l.reader().maxDoc()).sum(); // percentage of documents per slice, minimum 10% diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcherTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcherTests.java index bf1a17f1852b1..4acf66ff979ab 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcherTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcherTests.java @@ -40,7 +40,6 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.index.IndexSortConfig.TIME_SERIES_SORT; @@ -52,7 +51,7 @@ public class TimeSeriesIndexSearcherTests extends ESTestCase { // Open a searcher over a set of leaves // Collection should be in order - public void testCollectInOrderAcrossSegments() throws IOException, InterruptedException { + public void testCollectInOrderAcrossSegments() throws IOException { Directory dir = newDirectory(); RandomIndexWriter iw = getIndexWriter(dir); @@ -61,29 +60,31 @@ public void testCollectInOrderAcrossSegments() throws IOException, InterruptedEx final int THREADS = 5; final int DOC_COUNTS = 500; ExecutorService indexer = Executors.newFixedThreadPool(THREADS); - for (int i = 0; i < THREADS; i++) { - indexer.submit(() -> { - Document doc = new Document(); - for (int j = 0; j < DOC_COUNTS; j++) { - String tsid = "tsid" + randomIntBetween(0, 30); - long time = clock.addAndGet(randomIntBetween(0, 10)); - doc.clear(); - doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(tsid))); - doc.add(new NumericDocValuesField(DataStream.TIMESTAMP_FIELD_NAME, time)); - try { - iw.addDocument(doc); - } catch (IOException e) { - throw new UncheckedIOException(e); + try { + for (int i = 0; i < THREADS; i++) { + indexer.submit(() -> { + Document doc = new Document(); + for (int j = 0; j < DOC_COUNTS; j++) { + String tsid = "tsid" + randomIntBetween(0, 30); + long time = clock.addAndGet(randomIntBetween(0, 10)); + doc.clear(); + doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(tsid))); + doc.add(new NumericDocValuesField(DataStream.TIMESTAMP_FIELD_NAME, time)); + try { + iw.addDocument(doc); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } - } - }); + }); + } + } finally { + terminate(indexer); } - indexer.shutdown(); - assertTrue(indexer.awaitTermination(30, TimeUnit.SECONDS)); iw.close(); IndexReader reader = DirectoryReader.open(dir); - IndexSearcher searcher = new IndexSearcher(reader); + IndexSearcher searcher = newSearcher(reader); TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of()); @@ -96,7 +97,7 @@ public void testCollectInOrderAcrossSegments() throws IOException, InterruptedEx dir.close(); } - public void testCollectMinScoreAcrossSegments() throws IOException, InterruptedException { + public void testCollectMinScoreAcrossSegments() throws IOException { Directory dir = newDirectory(); RandomIndexWriter iw = getIndexWriter(dir); @@ -119,7 +120,7 @@ public void testCollectMinScoreAcrossSegments() throws IOException, InterruptedE iw.close(); IndexReader reader = DirectoryReader.open(dir); - IndexSearcher searcher = new IndexSearcher(reader); + IndexSearcher searcher = newSearcher(reader); TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of()); indexSearcher.setMinimumScore(2f); @@ -192,7 +193,7 @@ public void testCollectFromMiddle() throws IOException { iw.close(); IndexReader reader = DirectoryReader.open(dir); - IndexSearcher searcher = new IndexSearcher(reader); + IndexSearcher searcher = newSearcher(reader); TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of());