Skip to content

Commit

Permalink
TimeSeriesIndexSearcher to offload to the provided executor
Browse files Browse the repository at this point in the history
We have introduced a search worker thread pool with elastic#98204 that is
responsible for the heavy workloads as part of the query and dfs phase,
no matter if it is parallelized across segments/slices or not. TSDB
aggregations are still executed in the search thread pool and this
commit moves their computation to the search worker thread pool, despite
the corresponding search thread blocks and  waits for such computation to be
completed before returning.
  • Loading branch information
javanna committed Aug 11, 2023
1 parent 6be476d commit e825d62
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.PriorityQueue;
import org.apache.lucene.util.ThreadInterruptedException;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.lucene.search.function.MinScoreScorer;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
Expand All @@ -36,6 +37,9 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.function.IntSupplier;

import static org.elasticsearch.index.IndexSortConfig.TIME_SERIES_SORT;
Expand Down Expand Up @@ -63,14 +67,17 @@ public TimeSeriesIndexSearcher(IndexSearcher searcher, List<Runnable> cancellati
searcher.getSimilarity(),
searcher.getQueryCache(),
searcher.getQueryCachingPolicy(),
false
false,
searcher.getExecutor(),
1,
-1
);
} catch (IOException e) {
// IOException from wrapping the index searcher which should never happen.
throw new RuntimeException(e);
}
this.cancellations = cancellations;
cancellations.forEach(cancellation -> this.searcher.addQueryCancellation(cancellation));
cancellations.forEach(this.searcher::addQueryCancellation);

assert TIME_SERIES_SORT.length == 2;
assert TIME_SERIES_SORT[0].getField().equals(TimeSeriesIdFieldMapper.NAME);
Expand All @@ -84,9 +91,29 @@ public void setMinimumScore(Float minimumScore) {
}

public void search(Query query, BucketCollector bucketCollector) throws IOException {
int seen = 0;
query = searcher.rewrite(query);
Weight weight = searcher.createWeight(query, bucketCollector.scoreMode(), 1);
if (searcher.getExecutor() == null) {
search(bucketCollector, weight);
return;
}
// offload to the search worker thread pool whenever possible. It will be null only when search.worker_threads_enabled is false
RunnableFuture<Void> task = new FutureTask<>(() -> {
search(bucketCollector, weight);
return null;
});
searcher.getExecutor().execute(task);
try {
task.get();
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}

private void search(BucketCollector bucketCollector, Weight weight) throws IOException {
int seen = 0;
int[] tsidOrd = new int[1];

// Create LeafWalker for each subreader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ public ContextIndexSearcher(
this.leafSlices = null;
} else {
// we offload to the executor unconditionally, including requests that don't support concurrency
// the few cases that don't support offloading entirely won't get an executor
this.leafSlices = computeSlices(getLeafContexts(), maximumNumberOfSlices, minimumDocsPerSlice);
assert this.leafSlices.length <= maximumNumberOfSlices : "more slices created than the maximum allowed";
}
Expand Down Expand Up @@ -265,6 +264,9 @@ public static LeafSlice[] computeSlices(List<LeafReaderContext> leaves, int maxS
if (maxSliceNum < 1) {
throw new IllegalArgumentException("maxSliceNum must be >= 1 (got " + maxSliceNum + ")");
}
if (maxSliceNum == 1) {
return new LeafSlice[] { new LeafSlice(new ArrayList<>(leaves)) };
}
// total number of documents to be searched
final int numDocs = leaves.stream().mapToInt(l -> l.reader().maxDoc()).sum();
// percentage of documents per slice, minimum 10%
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.index.IndexSortConfig.TIME_SERIES_SORT;
Expand All @@ -52,7 +51,7 @@ public class TimeSeriesIndexSearcherTests extends ESTestCase {
// Open a searcher over a set of leaves
// Collection should be in order

public void testCollectInOrderAcrossSegments() throws IOException, InterruptedException {
public void testCollectInOrderAcrossSegments() throws IOException {
Directory dir = newDirectory();
RandomIndexWriter iw = getIndexWriter(dir);

Expand All @@ -61,29 +60,31 @@ public void testCollectInOrderAcrossSegments() throws IOException, InterruptedEx
final int THREADS = 5;
final int DOC_COUNTS = 500;
ExecutorService indexer = Executors.newFixedThreadPool(THREADS);
for (int i = 0; i < THREADS; i++) {
indexer.submit(() -> {
Document doc = new Document();
for (int j = 0; j < DOC_COUNTS; j++) {
String tsid = "tsid" + randomIntBetween(0, 30);
long time = clock.addAndGet(randomIntBetween(0, 10));
doc.clear();
doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(tsid)));
doc.add(new NumericDocValuesField(DataStream.TIMESTAMP_FIELD_NAME, time));
try {
iw.addDocument(doc);
} catch (IOException e) {
throw new UncheckedIOException(e);
try {
for (int i = 0; i < THREADS; i++) {
indexer.submit(() -> {
Document doc = new Document();
for (int j = 0; j < DOC_COUNTS; j++) {
String tsid = "tsid" + randomIntBetween(0, 30);
long time = clock.addAndGet(randomIntBetween(0, 10));
doc.clear();
doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(tsid)));
doc.add(new NumericDocValuesField(DataStream.TIMESTAMP_FIELD_NAME, time));
try {
iw.addDocument(doc);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
});
});
}
} finally {
terminate(indexer);
}
indexer.shutdown();
assertTrue(indexer.awaitTermination(30, TimeUnit.SECONDS));
iw.close();

IndexReader reader = DirectoryReader.open(dir);
IndexSearcher searcher = new IndexSearcher(reader);
IndexSearcher searcher = newSearcher(reader);

TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of());

Expand All @@ -96,7 +97,7 @@ public void testCollectInOrderAcrossSegments() throws IOException, InterruptedEx
dir.close();
}

public void testCollectMinScoreAcrossSegments() throws IOException, InterruptedException {
public void testCollectMinScoreAcrossSegments() throws IOException {
Directory dir = newDirectory();
RandomIndexWriter iw = getIndexWriter(dir);

Expand All @@ -119,7 +120,7 @@ public void testCollectMinScoreAcrossSegments() throws IOException, InterruptedE
iw.close();

IndexReader reader = DirectoryReader.open(dir);
IndexSearcher searcher = new IndexSearcher(reader);
IndexSearcher searcher = newSearcher(reader);

TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of());
indexSearcher.setMinimumScore(2f);
Expand Down Expand Up @@ -192,7 +193,7 @@ public void testCollectFromMiddle() throws IOException {

iw.close();
IndexReader reader = DirectoryReader.open(dir);
IndexSearcher searcher = new IndexSearcher(reader);
IndexSearcher searcher = newSearcher(reader);

TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of());

Expand Down

0 comments on commit e825d62

Please sign in to comment.