diff --git a/docs/changelog/98414.yaml b/docs/changelog/98414.yaml new file mode 100644 index 0000000000000..0a245893676e0 --- /dev/null +++ b/docs/changelog/98414.yaml @@ -0,0 +1,5 @@ +pr: 98414 +summary: '`TimeSeriesIndexSearcher` to offload to the provided executor' +area: TSDB +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcher.java index 9aafa31e85c17..16efc62f2704f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcher.java @@ -21,6 +21,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.PriorityQueue; +import org.apache.lucene.util.ThreadInterruptedException; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.lucene.search.function.MinScoreScorer; import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; @@ -36,6 +37,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; import java.util.function.IntSupplier; import static org.elasticsearch.index.IndexSortConfig.TIME_SERIES_SORT; @@ -63,14 +67,17 @@ public TimeSeriesIndexSearcher(IndexSearcher searcher, List cancellati searcher.getSimilarity(), searcher.getQueryCache(), searcher.getQueryCachingPolicy(), - false + false, + searcher.getExecutor(), + 1, + -1 ); } catch (IOException e) { // IOException from wrapping the index searcher which should never happen. throw new RuntimeException(e); } this.cancellations = cancellations; - cancellations.forEach(cancellation -> this.searcher.addQueryCancellation(cancellation)); + cancellations.forEach(this.searcher::addQueryCancellation); assert TIME_SERIES_SORT.length == 2; assert TIME_SERIES_SORT[0].getField().equals(TimeSeriesIdFieldMapper.NAME); @@ -84,9 +91,32 @@ public void setMinimumScore(Float minimumScore) { } public void search(Query query, BucketCollector bucketCollector) throws IOException { - int seen = 0; query = searcher.rewrite(query); Weight weight = searcher.createWeight(query, bucketCollector.scoreMode(), 1); + if (searcher.getExecutor() == null) { + search(bucketCollector, weight); + return; + } + // offload to the search worker thread pool whenever possible. It will be null only when search.worker_threads_enabled is false + RunnableFuture task = new FutureTask<>(() -> { + search(bucketCollector, weight); + return null; + }); + searcher.getExecutor().execute(task); + try { + task.get(); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException runtimeException) { + throw runtimeException; + } + throw new RuntimeException(e.getCause()); + } + } + + private void search(BucketCollector bucketCollector, Weight weight) throws IOException { + int seen = 0; int[] tsidOrd = new int[1]; // Create LeafWalker for each subreader diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index cfcc0da68f3b3..a8c7c8876aa3c 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -151,7 +151,6 @@ public ContextIndexSearcher( this.leafSlices = null; } else { // we offload to the executor unconditionally, including requests that don't support concurrency - // the few cases that don't support offloading entirely won't get an executor this.leafSlices = computeSlices(getLeafContexts(), maximumNumberOfSlices, minimumDocsPerSlice); assert this.leafSlices.length <= maximumNumberOfSlices : "more slices created than the maximum allowed"; } @@ -265,6 +264,9 @@ public static LeafSlice[] computeSlices(List leaves, int maxS if (maxSliceNum < 1) { throw new IllegalArgumentException("maxSliceNum must be >= 1 (got " + maxSliceNum + ")"); } + if (maxSliceNum == 1) { + return new LeafSlice[] { new LeafSlice(new ArrayList<>(leaves)) }; + } // total number of documents to be searched final int numDocs = leaves.stream().mapToInt(l -> l.reader().maxDoc()).sum(); // percentage of documents per slice, minimum 10% diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcherTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcherTests.java index bf1a17f1852b1..4acf66ff979ab 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcherTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcherTests.java @@ -40,7 +40,6 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.index.IndexSortConfig.TIME_SERIES_SORT; @@ -52,7 +51,7 @@ public class TimeSeriesIndexSearcherTests extends ESTestCase { // Open a searcher over a set of leaves // Collection should be in order - public void testCollectInOrderAcrossSegments() throws IOException, InterruptedException { + public void testCollectInOrderAcrossSegments() throws IOException { Directory dir = newDirectory(); RandomIndexWriter iw = getIndexWriter(dir); @@ -61,29 +60,31 @@ public void testCollectInOrderAcrossSegments() throws IOException, InterruptedEx final int THREADS = 5; final int DOC_COUNTS = 500; ExecutorService indexer = Executors.newFixedThreadPool(THREADS); - for (int i = 0; i < THREADS; i++) { - indexer.submit(() -> { - Document doc = new Document(); - for (int j = 0; j < DOC_COUNTS; j++) { - String tsid = "tsid" + randomIntBetween(0, 30); - long time = clock.addAndGet(randomIntBetween(0, 10)); - doc.clear(); - doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(tsid))); - doc.add(new NumericDocValuesField(DataStream.TIMESTAMP_FIELD_NAME, time)); - try { - iw.addDocument(doc); - } catch (IOException e) { - throw new UncheckedIOException(e); + try { + for (int i = 0; i < THREADS; i++) { + indexer.submit(() -> { + Document doc = new Document(); + for (int j = 0; j < DOC_COUNTS; j++) { + String tsid = "tsid" + randomIntBetween(0, 30); + long time = clock.addAndGet(randomIntBetween(0, 10)); + doc.clear(); + doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(tsid))); + doc.add(new NumericDocValuesField(DataStream.TIMESTAMP_FIELD_NAME, time)); + try { + iw.addDocument(doc); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } - } - }); + }); + } + } finally { + terminate(indexer); } - indexer.shutdown(); - assertTrue(indexer.awaitTermination(30, TimeUnit.SECONDS)); iw.close(); IndexReader reader = DirectoryReader.open(dir); - IndexSearcher searcher = new IndexSearcher(reader); + IndexSearcher searcher = newSearcher(reader); TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of()); @@ -96,7 +97,7 @@ public void testCollectInOrderAcrossSegments() throws IOException, InterruptedEx dir.close(); } - public void testCollectMinScoreAcrossSegments() throws IOException, InterruptedException { + public void testCollectMinScoreAcrossSegments() throws IOException { Directory dir = newDirectory(); RandomIndexWriter iw = getIndexWriter(dir); @@ -119,7 +120,7 @@ public void testCollectMinScoreAcrossSegments() throws IOException, InterruptedE iw.close(); IndexReader reader = DirectoryReader.open(dir); - IndexSearcher searcher = new IndexSearcher(reader); + IndexSearcher searcher = newSearcher(reader); TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of()); indexSearcher.setMinimumScore(2f); @@ -192,7 +193,7 @@ public void testCollectFromMiddle() throws IOException { iw.close(); IndexReader reader = DirectoryReader.open(dir); - IndexSearcher searcher = new IndexSearcher(reader); + IndexSearcher searcher = newSearcher(reader); TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of());