diff --git a/docs/changelog/98204.yaml b/docs/changelog/98204.yaml new file mode 100644 index 0000000000000..417d4ec28d5a3 --- /dev/null +++ b/docs/changelog/98204.yaml @@ -0,0 +1,5 @@ +pr: 98204 +summary: Introduce executor for concurrent search +area: Search +type: enhancement +issues: [] diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 70da29576765d..4ca0b1339ac40 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -13,10 +13,16 @@ There are several thread pools, but the important ones include: [[search-threadpool]] `search`:: - For count/search/suggest operations. Thread pool type is - `fixed` with a size of `int((`<>`pass:[ * ]3) / 2) + 1`, and queue_size of `1000`. +`search_worker`:: + For the heavy workload of count/search operations that may be executed concurrently + across segments within the same shard when possible. Thread pool type is `fixed` + with a size of `int((`<>`pass:[ * ]3) / 2) + 1`, and unbounded queue_size . + [[search-throttled]]`search_throttled`:: For count/search/suggest/get operations on `search_throttled indices`. Thread pool type is `fixed` with a size of `1`, and queue_size of `100`. diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index ffd8ba1a4068b..f9fc3af5539cd 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -502,7 +502,7 @@ public void apply(Settings value, Settings current, Settings previous) { ResourceWatcherService.RELOAD_INTERVAL_LOW, SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING, SearchModule.INDICES_MAX_NESTED_DEPTH_SETTING, - SearchModule.SEARCH_CONCURRENCY_ENABLED, + SearchService.SEARCH_WORKER_THREADS_ENABLED, ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING, ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING, ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING, diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 6364d15c1b214..ac5c608ca9635 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -64,6 +64,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.function.LongSupplier; final class DefaultSearchContext extends SearchContext { @@ -136,10 +137,11 @@ final class DefaultSearchContext extends SearchContext { SearchShardTarget shardTarget, LongSupplier relativeTimeSupplier, TimeValue timeout, - int minimumDocsPerSlice, FetchPhase fetchPhase, boolean lowLevelCancellation, - boolean parallelize + Executor executor, + int maximumNumberOfSlices, + int minimumDocsPerSlice ) throws IOException { this.readerContext = readerContext; this.request = request; @@ -150,19 +152,26 @@ final class DefaultSearchContext extends SearchContext { this.indexShard = readerContext.indexShard(); Engine.Searcher engineSearcher = readerContext.acquireSearcher("search"); - this.searcher = new ContextIndexSearcher( - engineSearcher.getIndexReader(), - engineSearcher.getSimilarity(), - engineSearcher.getQueryCache(), - engineSearcher.getQueryCachingPolicy(), - minimumDocsPerSlice, - lowLevelCancellation, - // TODO not set the for now, this needs a special thread pool and can be enabled after its introduction - // parallelize - // ? (EsThreadPoolExecutor) this.indexService.getThreadPool().executor(ThreadPool.Names.CONCURRENT_COLLECTION_TBD) - // : null, - null - ); + if (executor == null) { + this.searcher = new ContextIndexSearcher( + engineSearcher.getIndexReader(), + engineSearcher.getSimilarity(), + engineSearcher.getQueryCache(), + engineSearcher.getQueryCachingPolicy(), + lowLevelCancellation + ); + } else { + this.searcher = new ContextIndexSearcher( + engineSearcher.getIndexReader(), + engineSearcher.getSimilarity(), + engineSearcher.getQueryCache(), + engineSearcher.getQueryCachingPolicy(), + lowLevelCancellation, + executor, + maximumNumberOfSlices, + minimumDocsPerSlice + ); + } releasables.addAll(List.of(engineSearcher, searcher)); this.relativeTimeSupplier = relativeTimeSupplier; diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java index 350a727158d5d..a50d7c5e1f99e 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java @@ -281,13 +281,6 @@ public class SearchModule { Setting.Property.NodeScope ); - public static final Setting SEARCH_CONCURRENCY_ENABLED = Setting.boolSetting( - "search.concurrency_enabled", - true, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - private final Map highlighters; private final List fetchSubPhases = new ArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index b78e088e3ed43..5a73962c65d85 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -130,6 +130,8 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -141,7 +143,6 @@ import static org.elasticsearch.core.TimeValue.timeValueMillis; import static org.elasticsearch.core.TimeValue.timeValueMinutes; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; -import static org.elasticsearch.search.SearchModule.SEARCH_CONCURRENCY_ENABLED; public class SearchService extends AbstractLifecycleComponent implements IndexEventListener { private static final Logger logger = LogManager.getLogger(SearchService.class); @@ -211,6 +212,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); + public static final Setting SEARCH_WORKER_THREADS_ENABLED = Setting.boolSetting( + "search.worker_threads_enabled", + true, + Property.NodeScope, + Property.Dynamic + ); + public static final Setting MAX_OPEN_SCROLL_CONTEXT = Setting.intSetting( "search.max_open_scroll_context", 500, @@ -253,7 +261,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final DfsPhase dfsPhase = new DfsPhase(); private final FetchPhase fetchPhase; - private volatile boolean enableConcurrentCollection; + private volatile boolean enableSearchWorkerThreads; private volatile long defaultKeepAlive; @@ -344,16 +352,12 @@ public SearchService( clusterService.getClusterSettings() .addSettingsUpdateConsumer(ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER, this::setEnableRewriteAggsToFilterByFilter); - enableConcurrentCollection = SEARCH_CONCURRENCY_ENABLED.get(settings); - clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_CONCURRENCY_ENABLED, this::setEnableConcurrentCollection); - } - - private void setEnableConcurrentCollection(boolean concurrentCollection) { - this.enableConcurrentCollection = concurrentCollection; + enableSearchWorkerThreads = SEARCH_WORKER_THREADS_ENABLED.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_WORKER_THREADS_ENABLED, this::setEnableSearchWorkerThreads); } - boolean isConcurrentCollectionEnabled() { - return this.enableConcurrentCollection; + private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) { + this.enableSearchWorkerThreads = enableSearchWorkerThreads; } private static void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) { @@ -1039,7 +1043,7 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time final Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier(); final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); try (ReaderContext readerContext = new ReaderContext(id, indexService, indexShard, reader, -1L, true)) { - DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout, null); + DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout, ResultsType.NONE); searchContext.addReleasable(readerContext.markAsUsed(0L)); return searchContext; } @@ -1060,16 +1064,20 @@ private DefaultSearchContext createSearchContext( reader.indexShard().shardId(), request.getClusterAlias() ); + ExecutorService executor = this.enableSearchWorkerThreads ? threadPool.executor(Names.SEARCH_WORKER) : null; + int maximumNumberOfSlices = executor instanceof ThreadPoolExecutor tpe + && supportsParallelCollection(resultsType, request.source()) ? tpe.getMaximumPoolSize() : 1; searchContext = new DefaultSearchContext( reader, request, shardTarget, threadPool::relativeTimeInMillis, timeout, - minimumDocsPerSlice, fetchPhase, lowLevelCancellation, - this.enableConcurrentCollection && concurrentSearchEnabled(resultsType, request.source()) + executor, + maximumNumberOfSlices, + minimumDocsPerSlice ); // we clone the query shard context here just for rewriting otherwise we // might end up with incorrect state since we are using now() or script services @@ -1089,10 +1097,16 @@ this.enableConcurrentCollection && concurrentSearchEnabled(resultsType, request. return searchContext; } - static boolean concurrentSearchEnabled(ResultsType resultsType, SearchSourceBuilder source) { + static boolean supportsParallelCollection(ResultsType resultsType, SearchSourceBuilder source) { if (resultsType == ResultsType.DFS) { return true; // only enable concurrent collection for DFS phase for now } + /* + TODO uncomment this block to enable inter-segment concurrency for the query phase + if (resultsType == ResultsType.QUERY) { + return source == null || source.aggregations() == null || source.aggregations().supportsParallelCollection(); + } + */ return false; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java index 05394b6e2ec0b..6aa545c981fa3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java @@ -220,35 +220,15 @@ public boolean isInSortOrderExecutionRequired() { } /** - * Return false if this aggregation or any of the child aggregations does not support concurrent search. - * As a result, such aggregation will always be executed sequentially despite concurrency is enabled for the query phase. - * Note: aggregations that don't support concurrency, may or may not support offloading their collection to the search worker threads, - * depending on what {@link #supportsOffloadingSequentialCollection()} returns. + * Return false if this aggregation or any of the child aggregations does not support parallel collection. + * As a result, a request including such aggregation is always executed sequentially despite concurrency is enabled for the query phase. */ - public boolean supportsConcurrentExecution() { + public boolean supportsParallelCollection() { if (isInSortOrderExecutionRequired()) { return false; } for (AggregationBuilder builder : factoriesBuilder.getAggregatorFactories()) { - if (builder.supportsConcurrentExecution() == false) { - return false; - } - } - return supportsOffloadingSequentialCollection(); - } - - /** - * Returns false if this aggregation or any of its child aggregations does not support offloading its sequential collection - * to a separate thread. As a result, such aggregation will always be executed sequentially, and fully in the search thread, - * without offloading its collection to the search worker threads. - * Note: aggregations that don't support offloading sequential collection, don't support concurrency by definition. - */ - public boolean supportsOffloadingSequentialCollection() { - if (isInSortOrderExecutionRequired()) { - return false; - } - for (AggregationBuilder builder : factoriesBuilder.getAggregatorFactories()) { - if (builder.supportsOffloadingSequentialCollection() == false) { + if (builder.supportsParallelCollection() == false) { return false; } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index 2705562f4f129..6dc05fa8fe843 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -72,6 +72,7 @@ private static void executeInSortOrder(SearchContext context, BucketCollector co searcher.setProfiler(context); try { searcher.search(context.rewrittenQuery(), collector); + collector.postCollection(); } catch (IOException e) { throw new AggregationExecutionException("Could not perform time series aggregation", e); } @@ -113,7 +114,6 @@ public static void execute(SearchContext context) { final List aggregations = new ArrayList<>(aggregators.length); for (Aggregator aggregator : aggregators) { try { - aggregator.postCollection(); aggregations.add(aggregator.buildTopLevel()); } catch (IOException e) { throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index 3fd547e340da3..2c265b066a8b5 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -334,15 +334,17 @@ public boolean isInSortOrderExecutionRequired() { } /** - * Return false if this aggregation or any of the child aggregations does not support concurrent search + * Return false if this aggregation or any of the child aggregations does not support parallel collection. + * As a result, a request including such aggregation is always executed sequentially despite concurrency is enabled for the query + * phase. */ - public boolean supportsConcurrentExecution() { + public boolean supportsParallelCollection() { for (AggregationBuilder builder : aggregationBuilders) { - if (builder.supportsConcurrentExecution() == false) { + if (builder.supportsParallelCollection() == false) { return false; } } - return isInSortOrderExecutionRequired() == false; + return true; } public Builder addAggregator(AggregationBuilder factory) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java index 69e60a6f5d6cd..1d34d674b2d3d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java @@ -79,7 +79,7 @@ public final Collector asCollector() { return new BucketCollectorWrapper(this); } - private record BucketCollectorWrapper(BucketCollector bucketCollector) implements Collector { + public record BucketCollectorWrapper(BucketCollector bucketCollector) implements Collector { @Override public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java index c6092dc0328d8..272d0db8cd002 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java @@ -97,7 +97,7 @@ public boolean supportsSampling() { } @Override - public boolean supportsOffloadingSequentialCollection() { + public boolean supportsParallelCollection() { return false; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java index 7f01c8809f6d2..a0a42ba101981 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregationBuilder.java @@ -137,7 +137,7 @@ public BucketCardinality bucketCardinality() { } @Override - public boolean supportsConcurrentExecution() { + public boolean supportsParallelCollection() { return false; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregationBuilder.java index d81209844da40..0839b2b83ceeb 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregationBuilder.java @@ -81,7 +81,7 @@ public BucketCardinality bucketCardinality() { } @Override - public boolean supportsOffloadingSequentialCollection() { + public boolean supportsParallelCollection() { return false; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java index 6249dae0e3445..cf75abfee4d02 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java @@ -134,7 +134,7 @@ public boolean supportsSampling() { } @Override - public boolean supportsConcurrentExecution() { + public boolean supportsParallelCollection() { return false; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregationBuilder.java index 9f857305a7196..51037b85ab292 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregationBuilder.java @@ -113,7 +113,7 @@ public boolean supportsSampling() { } @Override - public boolean supportsOffloadingSequentialCollection() { + public boolean supportsParallelCollection() { return false; } diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index 43e767fbd1333..0ddf5402eac53 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -37,19 +37,22 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.SparseFixedBitSet; import org.apache.lucene.util.ThreadInterruptedException; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.Releasable; import org.elasticsearch.lucene.util.CombinedBitSet; +import org.elasticsearch.search.aggregations.BucketCollector; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.profile.Timer; +import org.elasticsearch.search.profile.query.InternalProfileCollector; import org.elasticsearch.search.profile.query.ProfileWeight; import org.elasticsearch.search.profile.query.QueryProfileBreakdown; import org.elasticsearch.search.profile.query.QueryProfiler; import org.elasticsearch.search.profile.query.QueryTimingType; +import org.elasticsearch.search.query.QueryPhaseCollector; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -57,11 +60,13 @@ import java.util.Objects; import java.util.PriorityQueue; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicInteger; /** * Context-aware extension of {@link IndexSearcher}. @@ -78,13 +83,12 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { // make sure each slice has at least 10% of the documents as a way to limit memory usage and // to keep the error margin of terms aggregation low - private static final double MINIMUM_DOCS_PERCENT_PER_SLICE = 0.1; + static final double MINIMUM_DOCS_PERCENT_PER_SLICE = 0.1; private AggregatedDfs aggregatedDfs; private QueryProfiler profiler; private final MutableQueryTimeout cancellable; - private final QueueSizeBasedExecutor queueSizeBasedExecutor; private final LeafSlice[] leafSlices; // don't create slices with less than this number of docs private final int minimumDocsPerSlice; @@ -99,16 +103,7 @@ public ContextIndexSearcher( QueryCachingPolicy queryCachingPolicy, boolean wrapWithExitableDirectoryReader ) throws IOException { - this( - reader, - similarity, - queryCache, - queryCachingPolicy, - new MutableQueryTimeout(), - Integer.MAX_VALUE, - wrapWithExitableDirectoryReader, - null - ); + this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout(), wrapWithExitableDirectoryReader, null, -1, -1); } /** constructor for concurrent search */ @@ -117,9 +112,10 @@ public ContextIndexSearcher( Similarity similarity, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, - int minimumDocsPerSlice, boolean wrapWithExitableDirectoryReader, - ThreadPoolExecutor executor + Executor executor, + int maximumNumberOfSlices, + int minimumDocsPerSlice ) throws IOException { this( reader, @@ -127,21 +123,23 @@ public ContextIndexSearcher( queryCache, queryCachingPolicy, new MutableQueryTimeout(), - minimumDocsPerSlice, wrapWithExitableDirectoryReader, - executor + executor, + maximumNumberOfSlices, + minimumDocsPerSlice ); } - private ContextIndexSearcher( + ContextIndexSearcher( IndexReader reader, Similarity similarity, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, MutableQueryTimeout cancellable, - int minimumDocsPerSlice, boolean wrapWithExitableDirectoryReader, - ThreadPoolExecutor executor + Executor executor, + int maximumNumberOfSlices, + int minimumDocsPerSlice ) throws IOException { // we need to pass the executor up so it can potentially be used as a sliceExecutor by knn search super(wrapWithExitableDirectoryReader ? new ExitableDirectoryReader((DirectoryReader) reader, cancellable) : reader, executor); @@ -149,17 +147,14 @@ private ContextIndexSearcher( setQueryCache(queryCache); setQueryCachingPolicy(queryCachingPolicy); this.cancellable = cancellable; - this.queueSizeBasedExecutor = executor != null ? new QueueSizeBasedExecutor(executor) : null; this.minimumDocsPerSlice = minimumDocsPerSlice; - if (executor != null) { - this.leafSlices = computeSlices( - getLeafContexts(), - queueSizeBasedExecutor.threadPoolExecutor.getMaximumPoolSize(), - minimumDocsPerSlice - ); - assert (this.leafSlices.length <= executor.getMaximumPoolSize()); - } else { + if (executor == null) { this.leafSlices = null; + } else { + // we offload to the executor unconditionally, including requests that don't support concurrency + // the few cases that don't support offloading entirely won't get an executor + this.leafSlices = computeSlices(getLeafContexts(), maximumNumberOfSlices, minimumDocsPerSlice); + assert this.leafSlices.length <= maximumNumberOfSlices : "more slices created than the maximum allowed"; } } @@ -253,19 +248,27 @@ protected LeafSlice[] slices(List leaves) { return IndexSearcher.slices(leaves, Math.max(1, leaves.size()), 1); } + /** + * Returns the slices created by this {@link ContextIndexSearcher}, different from those created by the base class and + * returned by {@link IndexSearcher#getSlices()}. The former are used for parallelizing the collection, while the latter are used + * for now to parallelize rewrite (e.g. knn query rewrite) + */ + final LeafSlice[] getSlicesForCollection() { + return leafSlices; + } + /** * Each computed slice contains at least 10% of the total data in the leaves with a * minimum given by the minDocsPerSlice parameter and the final number * of {@link LeafSlice} will be equal or lower than the max number of slices. */ - /* pkg private for testing */ - static LeafSlice[] computeSlices(List leaves, int maxSliceNum, int minDocsPerSlice) { + public static LeafSlice[] computeSlices(List leaves, int maxSliceNum, int minDocsPerSlice) { if (maxSliceNum < 1) { throw new IllegalArgumentException("maxSliceNum must be >= 1 (got " + maxSliceNum + ")"); } // total number of documents to be searched final int numDocs = leaves.stream().mapToInt(l -> l.reader().maxDoc()).sum(); - // percentage of documents per slice, minumum 10% + // percentage of documents per slice, minimum 10% final double percentageDocsPerThread = Math.max(MINIMUM_DOCS_PERCENT_PER_SLICE, 1.0 / maxSliceNum); // compute slices return computeSlices(leaves, Math.max(minDocsPerSlice, (int) (percentageDocsPerThread * numDocs))); @@ -333,13 +336,26 @@ public T search(Query query, CollectorManager col } /** - * Similar to the lucene implementation but it will wait for all threads to fisinsh before returning even if an error is thrown. - * In that case, other exceptions will be ignored and the first exception is thrown after all threads are finished. + * Similar to the lucene implementation, with the following changes made: + * 1) it will wait for all threads to finish before returning when an exception is thrown. In that case, subsequent exceptions will be + * ignored and the first exception is re-thrown after all tasks are completed. + * 2) Tasks are cancelled on exception, as well as on timeout, to prevent needless computation + * 3) collection is unconditionally offloaded to the executor when set, even when there is a single slice or the request does not + * support concurrent collection. The executor is not set only when concurrent search has been explicitly disabled at the cluster level. + * 4) postCollection is performed after each segment is collected. This is needed for aggregations, performed by search worker threads + * so it can be parallelized. Also, it needs to happen in the same thread where doc_values are read, as it consumes them and Lucene + * does not allow consuming them from a different thread. + * 5) handles the ES TimeExceededException * */ private T search(Weight weight, CollectorManager collectorManager, C firstCollector) throws IOException { - if (queueSizeBasedExecutor == null || leafSlices.length <= 1) { + // the executor will be null only when concurrency is disabled at the cluster level + if (getExecutor() == null) { search(leafContexts, weight, firstCollector); return collectorManager.reduce(Collections.singletonList(firstCollector)); + } else if (leafSlices.length == 0) { + assert leafContexts.isEmpty(); + doAggregationPostCollection(firstCollector); + return collectorManager.reduce(Collections.singletonList(firstCollector)); } else { final List collectors = new ArrayList<>(leafSlices.length); collectors.add(firstCollector); @@ -351,26 +367,63 @@ private T search(Weight weight, CollectorManager throw new IllegalStateException("CollectorManager does not always produce collectors with the same score mode"); } } - final List> listTasks = new ArrayList<>(); + final List> listTasks = new ArrayList<>(); for (int i = 0; i < leafSlices.length; ++i) { final LeafReaderContext[] leaves = leafSlices[i].leaves; final C collector = collectors.get(i); - FutureTask task = new FutureTask<>(() -> { - search(Arrays.asList(leaves), weight, collector); - return collector; - }); + AtomicInteger state = new AtomicInteger(0); + RunnableFuture task = new FutureTask<>(() -> { + if (state.compareAndSet(0, 1)) { + // A slice throws exception or times out: cancel all the tasks, to prevent slices that haven't started yet from + // starting and performing needless computation. + // TODO we will also want to cancel tasks that have already started, reusing the timeout mechanism + try { + search(Arrays.asList(leaves), weight, collector); + if (timeExceeded) { + for (Future future : listTasks) { + FutureUtils.cancel(future); + } + } + } catch (Exception e) { + for (Future future : listTasks) { + FutureUtils.cancel(future); + } + throw e; + } + return collector; + } + throw new CancellationException(); + }) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + /* + Future#get (called down below after submitting all tasks) throws CancellationException for a cancelled task while + it is still running. It's important to make sure that search does not leave any tasks behind when it returns. + Overriding cancel ensures that tasks that are already started are left alone once cancelled, so Future#get will + wait for them to finish instead of throwing CancellationException. + Tasks that are cancelled before they are started won't start (same behaviour as the original implementation). + */ + return state.compareAndSet(0, -1); + } + @Override + public boolean isCancelled() { + return state.get() == -1; + } + }; listTasks.add(task); } logger.trace("Collecting using " + listTasks.size() + " tasks."); - queueSizeBasedExecutor.invokeAll(listTasks); + for (Runnable task : listTasks) { + getExecutor().execute(task); + } RuntimeException exception = null; final List collectedCollectors = new ArrayList<>(); + boolean cancellation = false; for (Future future : listTasks) { try { collectedCollectors.add(future.get()); - // TODO: when there is an exception, it would be great to cancel the queries / threads } catch (InterruptedException e) { if (exception == null) { exception = new ThreadInterruptedException(e); @@ -379,16 +432,22 @@ private T search(Weight weight, CollectorManager } } catch (ExecutionException e) { if (exception == null) { - if (e.getCause() instanceof RuntimeException runtimeException) { - exception = runtimeException; + if (e.getCause() instanceof CancellationException) { + // thrown by the manual cancellation implemented above - we cancel on exception and we will throw the root cause + cancellation = true; } else { - exception = new RuntimeException(e.getCause()); + if (e.getCause() instanceof RuntimeException runtimeException) { + exception = runtimeException; + } else { + exception = new RuntimeException(e.getCause()); + } } } else { // we ignore further exceptions } } } + assert cancellation == false || exception != null || timeExceeded : "cancellation without an exception or timeout?"; if (exception != null) { throw exception; } @@ -405,8 +464,20 @@ public void search(List leaves, Weight weight, Collector coll } } catch (@SuppressWarnings("unused") TimeExceededException e) { timeExceeded = true; + } finally { + doAggregationPostCollection(collector); } + } + private void doAggregationPostCollection(Collector collector) throws IOException { + if (collector instanceof QueryPhaseCollector queryPhaseCollector) { + if (queryPhaseCollector.getAggsCollector() instanceof BucketCollector.BucketCollectorWrapper aggsCollector) { + aggsCollector.bucketCollector().postCollection(); + } + } + if (collector instanceof InternalProfileCollector profilerCollector) { + profilerCollector.doPostCollection(); + } } /** If the search has timed out following Elasticsearch custom implementation */ @@ -575,52 +646,4 @@ public void clear() { runnables.clear(); } } - - private static class QueueSizeBasedExecutor { - private static final double LIMITING_FACTOR = 1.5; - - private final ThreadPoolExecutor threadPoolExecutor; - - QueueSizeBasedExecutor(ThreadPoolExecutor threadPoolExecutor) { - this.threadPoolExecutor = threadPoolExecutor; - } - - public void invokeAll(Collection tasks) { - int i = 0; - - for (Runnable task : tasks) { - boolean shouldExecuteOnCallerThread = false; - - // Execute last task on caller thread - if (i == tasks.size() - 1) { - shouldExecuteOnCallerThread = true; - } - - if (threadPoolExecutor.getQueue().size() >= (threadPoolExecutor.getMaximumPoolSize() * LIMITING_FACTOR)) { - shouldExecuteOnCallerThread = true; - } - - processTask(task, shouldExecuteOnCallerThread); - - ++i; - } - } - - protected void processTask(final Runnable task, final boolean shouldExecuteOnCallerThread) { - if (task == null) { - throw new IllegalArgumentException("Input is null"); - } - - if (shouldExecuteOnCallerThread == false) { - try { - threadPoolExecutor.execute(task); - - return; - } catch (@SuppressWarnings("unused") RejectedExecutionException e) { - // Execute on caller thread - } - } - task.run(); - } - } } diff --git a/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java b/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java index b79d06d57b891..75d039dfc45b6 100644 --- a/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java +++ b/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java @@ -10,7 +10,10 @@ import org.apache.lucene.sandbox.search.ProfilerCollector; import org.apache.lucene.search.Collector; +import org.elasticsearch.search.aggregations.BucketCollector; +import org.elasticsearch.search.query.QueryPhaseCollector; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -74,4 +77,18 @@ public CollectorResult getCollectorTree() { } return new CollectorResult(getName(), getReason(), getTime(), childResults); } + + public void doPostCollection() throws IOException { + if (wrappedCollector instanceof InternalProfileCollector profileCollector) { + profileCollector.doPostCollection(); + } else if (wrappedCollector instanceof QueryPhaseCollector queryPhaseCollector) { + if (queryPhaseCollector.getAggsCollector() instanceof BucketCollector.BucketCollectorWrapper aggsCollector) { + aggsCollector.bucketCollector().postCollection(); + } else if (queryPhaseCollector.getAggsCollector() instanceof InternalProfileCollector profileCollector) { + profileCollector.doPostCollection(); + } + } else if (wrappedCollector instanceof BucketCollector.BucketCollectorWrapper aggsCollector) { + aggsCollector.bucketCollector().postCollection(); + } + } } diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhaseCollector.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhaseCollector.java index a8403d2ebb27c..19e90dc05422e 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhaseCollector.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhaseCollector.java @@ -41,7 +41,7 @@ * When top docs as well as aggs are collected (because both collectors were provided), skipping low scoring hits via * {@link Scorable#setMinCompetitiveScore(float)} is not supported for either of the collectors. */ -final class QueryPhaseCollector implements Collector { +public final class QueryPhaseCollector implements Collector { private final Collector aggsCollector; private final Collector topDocsCollector; private final TerminateAfterChecker terminateAfterChecker; @@ -457,4 +457,8 @@ boolean isTerminatedAfter() { return terminatedAfter; } } + + public Collector getAggsCollector() { + return aggsCollector; + } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index c345d7ca00f4c..eafa676bb7293 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -65,6 +65,7 @@ public static class Names { public static final String ANALYZE = "analyze"; public static final String WRITE = "write"; public static final String SEARCH = "search"; + public static final String SEARCH_WORKER = "search_worker"; public static final String SEARCH_COORDINATION = "search_coordination"; public static final String AUTO_COMPLETE = "auto_complete"; public static final String SEARCH_THROTTLED = "search_throttled"; @@ -118,6 +119,7 @@ public static ThreadPoolType fromType(String type) { entry(Names.ANALYZE, ThreadPoolType.FIXED), entry(Names.WRITE, ThreadPoolType.FIXED), entry(Names.SEARCH, ThreadPoolType.FIXED), + entry(Names.SEARCH_WORKER, ThreadPoolType.FIXED), entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED), entry(Names.MANAGEMENT, ThreadPoolType.SCALING), entry(Names.FLUSH, ThreadPoolType.SCALING), @@ -195,26 +197,19 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000, new TaskTrackingConfig(true, 0.1)) ); + int searchOrGetThreadPoolSize = searchOrGetThreadPoolSize(allocatedProcessors); builders.put( Names.GET, - new FixedExecutorBuilder( - settings, - Names.GET, - searchOrGetThreadPoolSize(allocatedProcessors), - 1000, - TaskTrackingConfig.DO_NOT_TRACK - ) + new FixedExecutorBuilder(settings, Names.GET, searchOrGetThreadPoolSize, 1000, TaskTrackingConfig.DO_NOT_TRACK) ); builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16, TaskTrackingConfig.DO_NOT_TRACK)); builders.put( Names.SEARCH, - new FixedExecutorBuilder( - settings, - Names.SEARCH, - searchOrGetThreadPoolSize(allocatedProcessors), - 1000, - TaskTrackingConfig.DEFAULT - ) + new FixedExecutorBuilder(settings, Names.SEARCH, searchOrGetThreadPoolSize, 1000, TaskTrackingConfig.DEFAULT) + ); + builders.put( + Names.SEARCH_WORKER, + new FixedExecutorBuilder(settings, Names.SEARCH_WORKER, searchOrGetThreadPoolSize, -1, TaskTrackingConfig.DEFAULT) ); builders.put( Names.SEARCH_COORDINATION, diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index ebb1d723eb382..bc0082da7ee84 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -142,16 +142,17 @@ protected Engine.Searcher acquireSearcherInternal(String source) { target, null, timeout, - randomIntBetween(1, Integer.MAX_VALUE), null, false, - false + null, + randomInt(), + randomInt() ); contextWithoutScroll.from(300); contextWithoutScroll.close(); // resultWindow greater than maxResultWindow and scrollContext is null - IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> contextWithoutScroll.preProcess()); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, contextWithoutScroll::preProcess); assertThat( exception.getMessage(), equalTo( @@ -181,13 +182,14 @@ protected Engine.Searcher acquireSearcherInternal(String source) { target, null, timeout, - randomIntBetween(1, Integer.MAX_VALUE), null, false, - false + null, + randomInt(), + randomInt() ); context1.from(300); - exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess()); + exception = expectThrows(IllegalArgumentException.class, context1::preProcess); assertThat( exception.getMessage(), equalTo( @@ -210,12 +212,12 @@ protected Engine.Searcher acquireSearcherInternal(String source) { when(rescoreContext.getWindowSize()).thenReturn(500); context1.addRescore(rescoreContext); - exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess()); + exception = expectThrows(IllegalArgumentException.class, context1::preProcess); assertThat(exception.getMessage(), equalTo("Cannot use [sort] option in conjunction with [rescore].")); // rescore is null but sort is not null and rescoreContext.getWindowSize() exceeds maxResultWindow context1.sort(null); - exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess()); + exception = expectThrows(IllegalArgumentException.class, context1::preProcess); assertThat( exception.getMessage(), @@ -255,10 +257,11 @@ public ScrollContext scrollContext() { target, null, timeout, - randomIntBetween(1, Integer.MAX_VALUE), null, false, - false + null, + randomInt(), + randomInt() ); SliceBuilder sliceBuilder = mock(SliceBuilder.class); @@ -266,7 +269,7 @@ public ScrollContext scrollContext() { when(sliceBuilder.getMax()).thenReturn(numSlices); context2.sliceBuilder(sliceBuilder); - exception = expectThrows(IllegalArgumentException.class, () -> context2.preProcess()); + exception = expectThrows(IllegalArgumentException.class, context2::preProcess); assertThat( exception.getMessage(), equalTo( @@ -291,10 +294,11 @@ public ScrollContext scrollContext() { target, null, timeout, - randomIntBetween(1, Integer.MAX_VALUE), null, false, - false + null, + randomInt(), + randomInt() ); ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery(); context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(); @@ -317,10 +321,11 @@ public ScrollContext scrollContext() { target, null, timeout, - randomIntBetween(1, Integer.MAX_VALUE), null, false, - false + null, + randomInt(), + randomInt() ); context4.sliceBuilder(new SliceBuilder(1, 2)).parsedQuery(parsedQuery).preProcess(); Query query1 = context4.query(); @@ -384,10 +389,11 @@ protected Engine.Searcher acquireSearcherInternal(String source) { target, null, timeout, - randomIntBetween(1, Integer.MAX_VALUE), null, false, - false + null, + randomInt(), + randomInt() ); assertThat(context.searcher().hasCancellations(), is(false)); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index e9b307c088f4c..3b4e01b02431a 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -10,8 +10,10 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.Term; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.search.TotalHitCountCollectorManager; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; @@ -89,10 +91,12 @@ import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.fetch.subphase.FieldAndFormat; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ContextIndexSearcher; import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.search.query.NonCountingTermQuery; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.suggest.SuggestBuilder; @@ -117,6 +121,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -128,7 +133,7 @@ import static java.util.Collections.singletonList; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED; -import static org.elasticsearch.search.SearchModule.SEARCH_CONCURRENCY_ENABLED; +import static org.elasticsearch.search.SearchService.SEARCH_WORKER_THREADS_ENABLED; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; @@ -1895,45 +1900,140 @@ public void testDfsQueryPhaseRewrite() { assertTrue(service.freeReaderContext(contextId)); } - public void testEnableConcurrentCollection() { - createIndex("index", Settings.EMPTY); - SearchService service = getInstanceFromNode(SearchService.class); - assertTrue(service.isConcurrentCollectionEnabled()); + public void testEnableSearchWorkerThreads() throws IOException { + IndexService indexService = createIndex("index", Settings.EMPTY); + IndexShard indexShard = indexService.getShard(0); + ShardSearchRequest request = new ShardSearchRequest( + OriginalIndices.NONE, + new SearchRequest().allowPartialSearchResults(randomBoolean()), + indexShard.shardId(), + 0, + indexService.numberOfShards(), + AliasFilter.EMPTY, + 1f, + System.currentTimeMillis(), + null + ); + try (ReaderContext readerContext = createReaderContext(indexService, indexShard)) { + SearchService service = getInstanceFromNode(SearchService.class); + SearchShardTask task = new SearchShardTask(0, "type", "action", "description", null, emptyMap()); + { + SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.DFS, randomBoolean()); + assertNotNull(searchContext.searcher().getExecutor()); + } - try { - ClusterUpdateSettingsResponse response = client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(SEARCH_CONCURRENCY_ENABLED.getKey(), false).build()) - .get(); - assertTrue(response.isAcknowledged()); - assertFalse(service.isConcurrentCollectionEnabled()); + try { + ClusterUpdateSettingsResponse response = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(SEARCH_WORKER_THREADS_ENABLED.getKey(), false).build()) + .get(); + assertTrue(response.isAcknowledged()); + { + SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.DFS, randomBoolean()); + assertNull(searchContext.searcher().getExecutor()); + } + } finally { + // reset original default setting + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull(SEARCH_WORKER_THREADS_ENABLED.getKey()).build()) + .get(); + SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.DFS, randomBoolean()); + assertNotNull(searchContext.searcher().getExecutor()); + } + } + } - } finally { - // reset original default setting - client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().putNull(SEARCH_CONCURRENCY_ENABLED.getKey()).build()) - .get(); + /** + * Verify that a single slice is created for requests that don't support parallel collection, while computation + * is still offloaded to the worker threads. + */ + public void testSupportsParallelCollectionAffectsSlicing() throws Exception { + IndexService indexService = createIndex("index", Settings.EMPTY); + ThreadPoolExecutor executor = (ThreadPoolExecutor) indexService.getThreadPool().executor(ThreadPool.Names.SEARCH_WORKER); + int numDocs = randomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("index").setId(String.valueOf(i)).setSource("field", "value").get(); + if (i % 5 == 0) { + indicesAdmin().prepareRefresh("index").get(); + } + } + final IndexShard indexShard = indexService.getShard(0); + ShardSearchRequest request = new ShardSearchRequest( + OriginalIndices.NONE, + new SearchRequest().allowPartialSearchResults(randomBoolean()), + indexShard.shardId(), + 0, + indexService.numberOfShards(), + AliasFilter.EMPTY, + 1f, + System.currentTimeMillis(), + null + ); + SearchService service = getInstanceFromNode(SearchService.class); + NonCountingTermQuery termQuery = new NonCountingTermQuery(new Term("field", "value")); + assertEquals(0, executor.getCompletedTaskCount()); + int taskCount = 0; + try (ReaderContext readerContext = createReaderContext(indexService, indexShard)) { + SearchShardTask task = new SearchShardTask(0, "type", "action", "description", null, emptyMap()); + { + SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.DFS, true); + ContextIndexSearcher searcher = searchContext.searcher(); + assertNotNull(searcher.getExecutor()); + assertSame(executor, searcher.getExecutor()); + int maxNumSlices = ((ThreadPoolExecutor) searcher.getExecutor()).getMaximumPoolSize(); + int numSlices = ContextIndexSearcher.computeSlices(searcher.getIndexReader().leaves(), maxNumSlices, 1).length; + searcher.search(termQuery, new TotalHitCountCollectorManager()); + assertBusy(() -> assertEquals(numSlices, executor.getCompletedTaskCount())); + taskCount += numSlices; + } + { + SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.QUERY, true); + ContextIndexSearcher searcher = searchContext.searcher(); + assertNotNull(searcher.getExecutor()); + assertSame(executor, searcher.getExecutor()); + searcher.search(termQuery, new TotalHitCountCollectorManager()); + int expectedTaskCount = ++taskCount; + assertBusy(() -> assertEquals(expectedTaskCount, executor.getCompletedTaskCount())); + } + { + SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.FETCH, true); + ContextIndexSearcher searcher = searchContext.searcher(); + assertNotNull(searcher.getExecutor()); + assertSame(executor, searcher.getExecutor()); + searcher.search(termQuery, new TotalHitCountCollectorManager()); + int expectedTaskCount = ++taskCount; + assertBusy(() -> assertEquals(expectedTaskCount, executor.getCompletedTaskCount())); + } + { + SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.NONE, true); + ContextIndexSearcher searcher = searchContext.searcher(); + assertNotNull(searcher.getExecutor()); + assertSame(executor, searcher.getExecutor()); + searcher.search(termQuery, new TotalHitCountCollectorManager()); + int expectedTaskCount = ++taskCount; + assertBusy(() -> assertEquals(expectedTaskCount, executor.getCompletedTaskCount())); + } } } - public void testConcurrencyConditions() { + public void testSupportsParallelCollection() { SearchSourceBuilder searchSourceBuilder = randomBoolean() ? null : new SearchSourceBuilder(); if (searchSourceBuilder != null && randomBoolean()) { searchSourceBuilder.aggregation(new TermsAggregationBuilder("terms")); } - assertTrue(SearchService.concurrentSearchEnabled(ResultsType.DFS, searchSourceBuilder)); + assertTrue(SearchService.supportsParallelCollection(ResultsType.DFS, searchSourceBuilder)); assertFalse( - SearchService.concurrentSearchEnabled( + SearchService.supportsParallelCollection( randomFrom(randomFrom(ResultsType.QUERY, ResultsType.NONE, ResultsType.FETCH)), searchSourceBuilder ) ); } - private ReaderContext createReaderContext(IndexService indexService, IndexShard indexShard) { + private static ReaderContext createReaderContext(IndexService indexService, IndexShard indexShard) { return new ReaderContext( new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong()), indexService, diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/AggregatorFactoriesTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/AggregatorFactoriesTests.java index 92b2e15b212da..665f66fcca316 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/AggregatorFactoriesTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/AggregatorFactoriesTests.java @@ -23,7 +23,12 @@ import org.elasticsearch.index.query.WrapperQueryBuilder; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -39,6 +44,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; @@ -310,6 +316,49 @@ public void testBuildPipelineTreeResolvesPipelineOrder() { assertThat(tree.aggregators().stream().map(PipelineAggregator::name).collect(toList()), equalTo(List.of("foo", "bar"))); } + public void testSupportsParallelCollection() { + { + AggregatorFactories.Builder builder = new AggregatorFactories.Builder(); + assertTrue(builder.supportsParallelCollection()); + builder.addAggregator(new FilterAggregationBuilder("name", new MatchAllQueryBuilder())); + assertTrue(builder.supportsParallelCollection()); + builder.addAggregator(new TermsAggregationBuilder("terms")); + assertFalse(builder.supportsParallelCollection()); + } + { + AggregatorFactories.Builder builder = new AggregatorFactories.Builder(); + builder.addAggregator(new TermsAggregationBuilder("terms")); + assertFalse(builder.supportsParallelCollection()); + } + { + AggregatorFactories.Builder builder = new AggregatorFactories.Builder(); + builder.addAggregator(new CardinalityAggregationBuilder("cardinality")); + assertFalse(builder.supportsParallelCollection()); + } + { + AggregatorFactories.Builder builder = new AggregatorFactories.Builder(); + builder.addAggregator(new NestedAggregationBuilder("nested", "path")); + assertFalse(builder.supportsParallelCollection()); + } + { + AggregatorFactories.Builder builder = new AggregatorFactories.Builder(); + builder.addAggregator( + new CompositeAggregationBuilder("composite", Collections.singletonList(new TermsValuesSourceBuilder("name"))) + ); + assertFalse(builder.supportsParallelCollection()); + } + { + AggregatorFactories.Builder builder = new AggregatorFactories.Builder(); + builder.addAggregator(new FilterAggregationBuilder("terms", new MatchAllQueryBuilder()) { + @Override + public boolean isInSortOrderExecutionRequired() { + return true; + } + }); + assertFalse(builder.supportsParallelCollection()); + } + } + @Override protected NamedXContentRegistry xContentRegistry() { return xContentRegistry; diff --git a/server/src/test/java/org/elasticsearch/search/dfs/DfsPhaseTests.java b/server/src/test/java/org/elasticsearch/search/dfs/DfsPhaseTests.java index a93c7ef4abf1c..0f213073757ce 100644 --- a/server/src/test/java/org/elasticsearch/search/dfs/DfsPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/search/dfs/DfsPhaseTests.java @@ -73,9 +73,10 @@ public void testSingleKnnSearch() throws IOException { IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), - 1, randomBoolean(), - this.threadPoolExecutor + threadPoolExecutor, + threadPoolExecutor.getMaximumPoolSize(), + 1 ); Query query = new KnnFloatVectorQuery("float_vector", new float[] { 0, 0, 0 }, numDocs, null); diff --git a/server/src/test/java/org/elasticsearch/search/internal/ContextIndexSearcherTests.java b/server/src/test/java/org/elasticsearch/search/internal/ContextIndexSearcherTests.java index e8823dfb0f42e..72bda8ac0215d 100644 --- a/server/src/test/java/org/elasticsearch/search/internal/ContextIndexSearcherTests.java +++ b/server/src/test/java/org/elasticsearch/search/internal/ContextIndexSearcherTests.java @@ -45,8 +45,10 @@ import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.SimpleCollector; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHitCountCollectorManager; import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; @@ -56,12 +58,12 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.SparseFixedBitSet; +import org.apache.lucene.util.ThreadInterruptedException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.IOUtils; import org.elasticsearch.index.IndexSettings; @@ -78,12 +80,16 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.List; import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.search.internal.ContextIndexSearcher.intersectScorerAndBitSet; @@ -181,220 +187,89 @@ public void collect(int doc, long bucket) throws IOException { directory.close(); } - /** - * Check that knn queries rewrite parallelizes on the number of segments if there are enough - * threads available. - */ - public void testConcurrentKnnRewrite() throws Exception { - final Directory directory = newDirectory(); + private int indexDocs(Directory directory) throws IOException { try ( - IndexWriter iw = new IndexWriter( + RandomIndexWriter iw = new RandomIndexWriter( + random(), directory, - new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE) + new IndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE) ) ) { - final int numDocs = randomIntBetween(100, 200); + final int numDocs = randomIntBetween(500, 1000); for (int i = 0; i < numDocs; i++) { Document document = new Document(); document.add(new StringField("field", "value", Field.Store.NO)); iw.addDocument(document); if (rarely()) { - iw.commit(); + iw.flush(); } } - } - - // make sure we have more threads than segments available to check later call to execute method - int nThreads = randomIntBetween(2, 5); - - // use an executor that counts calls to its "execute" method - AtomicInteger executeCalls = new AtomicInteger(0); - ThreadPoolExecutor executor = null; - DirectoryReader directoryReader = null; - try { - executor = new PrioritizedEsThreadPoolExecutor( - "test", - nThreads, - Integer.MAX_VALUE, - 0L, - TimeUnit.MILLISECONDS, - EsExecutors.daemonThreadFactory("queuetest"), - new ThreadContext(Settings.EMPTY), - null - ) { - @Override - public void execute(Runnable command) { - executeCalls.incrementAndGet(); - super.execute(command); - } - }; - - directoryReader = DirectoryReader.open(directory); - ContextIndexSearcher searcher = new ContextIndexSearcher( - directoryReader, - IndexSearcher.getDefaultSimilarity(), - IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), - 1, - randomBoolean(), - executor - ); - // check that we calculate one slice per segment - int numSegments = directoryReader.getContext().leaves().size(); - assertEquals(numSegments, searcher.slices(directoryReader.getContext().leaves()).length); - - KnnFloatVectorQuery vectorQuery = new KnnFloatVectorQuery("float_vector", new float[] { 0, 0, 0 }, 10, null); - Query rewritenQuery = vectorQuery.rewrite(searcher); - // Note: we expect one execute calls less than segments since the last is executed on the caller thread. - // For details see QueueSizeBasedExecutor#processTask - assertEquals(numSegments - 1, executeCalls.get()); - - AtomicInteger collectorCalls = new AtomicInteger(0); - searcher.search(rewritenQuery, new CollectorManager() { - - @Override - public Collector newCollector() { - collectorCalls.incrementAndGet(); - return new Collector() { - @Override - public LeafCollector getLeafCollector(LeafReaderContext context) { - return new LeafBucketCollector() { - @Override - public void collect(int doc, long owningBucketOrd) throws IOException { - // noop - } - }; - } - - @Override - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE; - } - }; - } - - @Override - public Object reduce(Collection collectors) throws IOException { - return null; - } - }); - LeafSlice[] leafSlices = ContextIndexSearcher.computeSlices( - directoryReader.getContext().leaves(), - executor.getMaximumPoolSize(), - 1 - ); - assertEquals(leafSlices.length, collectorCalls.get()); - } finally { - directoryReader.close(); - directory.close(); - executor.shutdown(); + return numDocs; } } - public void testConcurrentSearchAllThreadsFinish() throws Exception { - final Directory directory = newDirectory(); - IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE)); - final int numDocs = randomIntBetween(100, 200); - for (int i = 0; i < numDocs; i++) { - Document document = new Document(); - document.add(new StringField("field", "value", Field.Store.NO)); - iw.addDocument(document); - if (rarely()) { - iw.commit(); + /** + * Check that knn queries rewrite parallelizes on the number of segments + */ + public void testConcurrentRewrite() throws Exception { + try (Directory directory = newDirectory()) { + indexDocs(directory); + try (DirectoryReader directoryReader = DirectoryReader.open(directory)) { + AtomicInteger executeCalls = new AtomicInteger(0); + ContextIndexSearcher searcher = new ContextIndexSearcher( + directoryReader, + IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + randomBoolean(), + command -> { + executeCalls.incrementAndGet(); + command.run(); + }, + randomIntBetween(1, Integer.MAX_VALUE), + randomIntBetween(1, Integer.MAX_VALUE) + ); + // check that we create one slice per segment + int numSegments = directoryReader.getContext().leaves().size(); + assertEquals(numSegments, searcher.slices(directoryReader.getContext().leaves()).length); + KnnFloatVectorQuery vectorQuery = new KnnFloatVectorQuery("float_vector", new float[] { 0, 0, 0 }, 10, null); + vectorQuery.rewrite(searcher); + // Note: we expect one execute calls less than segments since the last is executed on the caller thread. + // For details see QueueSizeBasedExecutor#processTask + assertEquals(numSegments - 1, executeCalls.get()); } } + } - iw.close(); - DirectoryReader directoryReader = DirectoryReader.open(directory); - ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4); - - AtomicInteger missingDocs = new AtomicInteger(); - AtomicInteger visitDocs = new AtomicInteger(0); - - // determine how many docs are in the first slice for correct, this is how much we are missing by - // throwing the exception in the first collector - int minDocsPerSlice = 1; - - ContextIndexSearcher searcher = new ContextIndexSearcher( - directoryReader, - IndexSearcher.getDefaultSimilarity(), - IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), - minDocsPerSlice, - randomBoolean(), - executor - ); - - LeafSlice[] leafSlices = ContextIndexSearcher.computeSlices( - directoryReader.getContext().leaves(), - executor.getMaximumPoolSize(), - minDocsPerSlice - ); - // The test collector manager throws an exception when the first segment gets collected. - // All documents in that slice count towards the "missing" docs in the later assertion. - int docsFirstSlice = Arrays.stream(leafSlices[0].leaves).map(LeafReaderContext::reader).mapToInt(LeafReader::maxDoc).sum(); - AtomicInteger collectorCalls = new AtomicInteger(0); - CollectorManager collectorManager = new CollectorManager<>() { - boolean first = true; - - @Override - public Collector newCollector() { - collectorCalls.incrementAndGet(); - if (first) { - first = false; - return new Collector() { - @Override - public LeafCollector getLeafCollector(LeafReaderContext context) { - missingDocs.set(docsFirstSlice); - throw new IllegalArgumentException("fake exception"); - } - - @Override - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE; - } - }; - } else { - return new Collector() { - @Override - public LeafCollector getLeafCollector(LeafReaderContext context) { - return new LeafBucketCollector() { - @Override - public void collect(int doc, long owningBucketOrd) { - while (true) { - int current = visitDocs.get(); - if (visitDocs.compareAndSet(current, current + 1)) { - break; - } - } - } - }; - } - - @Override - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE; - } - }; - } - } - - @Override - public Void reduce(Collection collectors) { - return null; + /** + * Test that collection starts one task per slice, all offloaded to the separate executor, none executed in the caller thread + */ + public void testConcurrentCollection() throws IOException { + try (Directory directory = newDirectory()) { + int numDocs = indexDocs(directory); + int maxNumSlices = randomIntBetween(1, 100); + try (DirectoryReader directoryReader = DirectoryReader.open(directory)) { + AtomicInteger executeCalls = new AtomicInteger(0); + ContextIndexSearcher searcher = new ContextIndexSearcher( + directoryReader, + IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + randomBoolean(), + command -> { + executeCalls.incrementAndGet(); + command.run(); + }, + maxNumSlices, + 1 + ); + Integer totalHits = searcher.search(new MatchAllDocsQuery(), new TotalHitCountCollectorManager()); + assertEquals(numDocs, totalHits.intValue()); + int numExpectedTasks = ContextIndexSearcher.computeSlices(searcher.getIndexReader().leaves(), maxNumSlices, 1).length; + // check that each slice goes to the executor, no matter the queue size or the number of slices + assertEquals(numExpectedTasks, executeCalls.get()); } - }; - - IllegalArgumentException exception = expectThrows( - IllegalArgumentException.class, - () -> searcher.search(new MatchAllDocsQuery(), collectorManager) - ); - assertThat(exception.getMessage(), equalTo("fake exception")); - assertEquals(leafSlices.length, collectorCalls.get()); - assertThat(visitDocs.get() + missingDocs.get(), equalTo(numDocs)); - directoryReader.close(); - directory.close(); - executor.shutdown(); + } } public void testContextIndexSearcherSparseNoDeletions() throws IOException { @@ -535,7 +410,7 @@ public void testComputeSlices() throws IOException { IOUtils.close(reader, w, dir); } - private void assertSlices(LeafSlice[] slices, int numDocs, int numThreads) { + private static void assertSlices(LeafSlice[] slices, int numDocs, int numThreads) { // checks that the number of slices is not bigger than the number of available threads // and each slice contains at least 10% of the data (which means the max number of slices is 10) int sumDocs = 0; @@ -586,17 +461,8 @@ public TermsEnum iterator() { } public void testReduceIsCalledOnTimeout() throws IOException { - try (Directory dir = newDirectory();) { - try (RandomIndexWriter w = new RandomIndexWriter(random(), dir)) { - int docs = randomIntBetween(1, 1000); - for (int i = 0; i < docs; i++) { - Document doc = new Document(); - StringField fooField = new StringField("foo", randomBoolean() ? "bar" : "foo", Field.Store.NO); - doc.add(fooField); - w.addDocument(doc); - } - } - + try (Directory dir = newDirectory()) { + indexDocs(dir); ThreadPoolExecutor executor = null; try (DirectoryReader directoryReader = DirectoryReader.open(dir)) { if (randomBoolean()) { @@ -607,9 +473,10 @@ public void testReduceIsCalledOnTimeout() throws IOException { IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), - 1, true, - executor + executor, + executor == null ? -1 : executor.getMaximumPoolSize(), + 1 ); boolean[] called = new boolean[1]; CollectorManager manager = new CollectorManager<>() { @@ -624,7 +491,7 @@ public Void reduce(Collection collectors) { return null; } }; - contextIndexSearcher.search(new Query() { + contextIndexSearcher.search(new TestQuery() { @Override public Query rewrite(IndexSearcher indexSearcher) throws IOException { if (randomBoolean()) { @@ -651,34 +518,365 @@ public boolean isCacheable(LeafReaderContext ctx) { } }; } + }, manager); + assertTrue(contextIndexSearcher.timeExceeded()); + assertThat(called[0], equalTo(true)); + } finally { + if (executor != null) { + terminate(executor); + } + } + } + } - @Override - public String toString(String field) { - return "query"; + /** + * Simulate one or more exceptions being thrown while collecting, through a custom query that throws IOException in its Weight#scorer. + * Verify that the slices that had to wait because there were no available threads in the pool are not started following the exception, + * which triggers a cancellation of all the tasks that are part of the running search. + * Simulate having N threads busy doing other work (e.g. other searches) otherwise all slices can be executed directly, given that + * the number of slices is dependent on the max pool size. + */ + public void testCancelSliceTasksOnException() throws Exception { + try (Directory dir = newDirectory()) { + indexDocs(dir); + int numThreads = randomIntBetween(4, 6); + int numBusyThreads = randomIntBetween(0, 3); + int numAvailableThreads = numThreads - numBusyThreads; + ThreadPoolExecutor executor = EsExecutors.newFixed( + ContextIndexSearcherTests.class.getName(), + numThreads, + -1, + EsExecutors.daemonThreadFactory(""), + new ThreadContext(Settings.EMPTY), + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + ); + ExecutorTestWrapper executorTestWrapper = new ExecutorTestWrapper(executor, numBusyThreads); + try (DirectoryReader directoryReader = DirectoryReader.open(dir)) { + Set throwingLeaves = new HashSet<>(); + Set scoredLeaves = new CopyOnWriteArraySet<>(); + final int[] newCollectorsCalls; + final boolean[] reduceCalled; + LeafSlice[] leafSlices; + try ( + ContextIndexSearcher contextIndexSearcher = new ContextIndexSearcher( + directoryReader, + IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + true, + executorTestWrapper, + executor.getMaximumPoolSize(), + 1 + ) + ) { + leafSlices = contextIndexSearcher.getSlicesForCollection(); + int numThrowingLeafSlices = randomIntBetween(1, 3); + for (int i = 0; i < numThrowingLeafSlices; i++) { + LeafSlice throwingLeafSlice = leafSlices[randomIntBetween(0, Math.min(leafSlices.length, numAvailableThreads) - 1)]; + throwingLeaves.add(randomFrom(throwingLeafSlice.leaves)); } + Query query = new TestQuery() { + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) { + return new ConstantScoreWeight(this, boost) { + @Override + public Scorer scorer(LeafReaderContext context) throws IOException { + if (throwingLeaves.contains(context)) { + // a random segment of some random slices throws exception. Other slices may or may not have started + throw new IOException(); + } + scoredLeaves.add(context); + return new ConstantScoreScorer( + this, + boost, + ScoreMode.COMPLETE, + DocIdSetIterator.all(context.reader().maxDoc()) + ); + } - @Override - public void visit(QueryVisitor visitor) { - visitor.visitLeaf(this); + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return false; + } + }; + } + }; + newCollectorsCalls = new int[] { 0 }; + reduceCalled = new boolean[] { false }; + CollectorManager collectorManager = new CollectorManager<>() { + @Override + public Collector newCollector() { + newCollectorsCalls[0]++; + return new SimpleCollector() { + @Override + public void collect(int doc) { + + } + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE; + } + }; + } + + @Override + public Integer reduce(Collection collectors) { + reduceCalled[0] = true; + return null; + } + }; + RuntimeException executionException = expectThrows( + RuntimeException.class, + () -> contextIndexSearcher.search(query, collectorManager) + ); + assertBusy(() -> { + // active count is approximate, wait until it converges to the expected number + if (executor.getActiveCount() > numBusyThreads) { + throw new AssertionError("no search tasks should be left running"); + } + }); + + assertThat(executionException.getCause(), instanceOf(IOException.class)); + } + // as many tasks as slices have been created + assertEquals(leafSlices.length, newCollectorsCalls[0]); + // unexpected exception thrown, reduce is not called, there are no results to return + assertFalse(reduceCalled[0]); + Set expectedScoredLeaves = new HashSet<>(); + // the first N slices, where N is the number of available permits, will run straight-away, the others will be cancelled + for (int i = 0; i < leafSlices.length; i++) { + if (i == numAvailableThreads) { + break; + } + LeafSlice leafSlice = leafSlices[i]; + for (LeafReaderContext context : leafSlice.leaves) { + // collect the segments that we expect to score in each slice, and stop at those that throw + if (throwingLeaves.contains(context)) { + break; + } + expectedScoredLeaves.add(context); } + } + // The slice that threw exception is not counted. The others that could be executed directly are, but they may have been + // cancelled before they could even start, hence we are going to score at most the segments that the slices that can be + // executed straight-away (before reaching the max pool size) are made of. We can't guarantee that we score all of them. + // We do want to guarantee that the remaining slices won't even start and none of their leaves are scored. + assertTrue(expectedScoredLeaves.containsAll(scoredLeaves)); + } finally { + executorTestWrapper.stopBusyThreads(); + terminate(executor); + } + } + } - @Override - public boolean equals(Object o) { - return sameClassAs(o); + /** + * Simulate one or more timeout being thrown while collecting, through a custom query that times out in its Weight#scorer. + * Verify that the slices that had to wait because there were no available threads in the pool are not started following the timeout, + * which triggers a cancellation of all the tasks that are part of the running search. + * Simulate having N threads busy doing other work (e.g. other searches) otherwise all slices can be executed directly, given that + * the number of slices is dependent on the max pool size. + */ + public void testCancelSliceTasksOnTimeout() throws Exception { + try (Directory dir = newDirectory()) { + indexDocs(dir); + int numThreads = randomIntBetween(4, 6); + int numBusyThreads = randomIntBetween(0, 3); + int numAvailableThreads = numThreads - numBusyThreads; + ThreadPoolExecutor executor = EsExecutors.newFixed( + ContextIndexSearcherTests.class.getName(), + numThreads, + -1, + EsExecutors.daemonThreadFactory(""), + new ThreadContext(Settings.EMPTY), + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + ); + ExecutorTestWrapper executorTestWrapper = new ExecutorTestWrapper(executor, numBusyThreads); + try (DirectoryReader directoryReader = DirectoryReader.open(dir)) { + Set throwingLeaves = new HashSet<>(); + Set scoredLeaves = new CopyOnWriteArraySet<>(); + final int[] newCollectorsCalls; + final boolean[] reduceCalled; + LeafSlice[] leafSlices; + try ( + ContextIndexSearcher contextIndexSearcher = new ContextIndexSearcher( + directoryReader, + IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + true, + executorTestWrapper, + executor.getMaximumPoolSize(), + 1 + ) + ) { + leafSlices = contextIndexSearcher.getSlicesForCollection(); + int numThrowingLeafSlices = randomIntBetween(1, 3); + for (int i = 0; i < numThrowingLeafSlices; i++) { + LeafSlice throwingLeafSlice = leafSlices[randomIntBetween(0, Math.min(leafSlices.length, numAvailableThreads) - 1)]; + throwingLeaves.add(randomFrom(throwingLeafSlice.leaves)); } + Query query = new TestQuery() { + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) { + return new ConstantScoreWeight(this, boost) { + @Override + public Scorer scorer(LeafReaderContext context) { + if (throwingLeaves.contains(context)) { + // a random segment of some random slices throws exception. Other slices may or may not have + // started. + contextIndexSearcher.throwTimeExceededException(); + } + scoredLeaves.add(context); + return new ConstantScoreScorer( + this, + boost, + ScoreMode.COMPLETE, + DocIdSetIterator.all(context.reader().maxDoc()) + ); + } - @Override - public int hashCode() { - return classHash(); + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return false; + } + }; + } + }; + newCollectorsCalls = new int[] { 0 }; + reduceCalled = new boolean[] { false }; + CollectorManager collectorManager = new CollectorManager<>() { + @Override + public Collector newCollector() { + newCollectorsCalls[0]++; + return new SimpleCollector() { + @Override + public void collect(int doc) { + + } + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE; + } + }; + } + + @Override + public Integer reduce(Collection collectors) { + reduceCalled[0] = true; + return null; + } + }; + contextIndexSearcher.search(query, collectorManager); + assertBusy(() -> { + // active count is approximate, wait until it converges to the expected number + if (executor.getActiveCount() > numBusyThreads) { + throw new AssertionError("no search tasks should be left running"); + } + }); + assertTrue(contextIndexSearcher.timeExceeded()); + } + // as many tasks as slices have been created + assertEquals(leafSlices.length, newCollectorsCalls[0]); + assertTrue(reduceCalled[0]); + Set expectedScoredLeaves = new HashSet<>(); + // the first N slices, where N is the number of available permits, will run straight-away, the others will be cancelled + for (int i = 0; i < leafSlices.length; i++) { + if (i == numAvailableThreads) { + break; } - }, manager); - assertTrue(contextIndexSearcher.timeExceeded()); - assertThat(called[0], equalTo(true)); + LeafSlice leafSlice = leafSlices[i]; + for (LeafReaderContext context : leafSlice.leaves) { + // collect the segments that we expect to score in each slice, and stop at those that throw + if (throwingLeaves.contains(context)) { + break; + } + expectedScoredLeaves.add(context); + } + } + // The slice that timed out is not counted. The others that could be executed directly are, but they may have been + // cancelled before they could even start, hence we are going to score at most the segments that the slices that can be + // executed straight-away (before reaching the max pool size) are made of. We can't guarantee that we score all of them. + // We do want to guarantee that the remaining slices won't even start and none of their leaves are scored. + assertTrue(expectedScoredLeaves.containsAll(scoredLeaves)); } finally { - if (executor != null) { - terminate(executor); + executorTestWrapper.stopBusyThreads(); + terminate(executor); + } + } + } + + private static class ExecutorTestWrapper implements Executor { + private final ThreadPoolExecutor executor; + private final AtomicInteger startedTasks = new AtomicInteger(0); + private final CountDownLatch busyThreadsLatch = new CountDownLatch(1); + + ExecutorTestWrapper(ThreadPoolExecutor executor, int numBusyThreads) { + this.executor = executor; + // keep some of the threads occupied to simulate the situation where the slices tasks get queued up. + // This is a realistic scenario that does not get tested otherwise by executing a single concurrent search, given that the + // number of slices is capped by max pool size. + for (int i = 0; i < numBusyThreads; i++) { + execute(() -> { + try { + busyThreadsLatch.await(); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } + }); + } + } + + void stopBusyThreads() { + busyThreadsLatch.countDown(); + } + + @Override + public void execute(Runnable command) { + int started = startedTasks.incrementAndGet(); + if (started > executor.getMaximumPoolSize()) { + try { + /* + There could be tasks that complete quickly before the exception is handled, which leaves room for new tasks that are + about to get cancelled to start before their cancellation becomes effective. We can accept that cancellation may or may + not be effective for the slices that belong to the first batch of tasks until all threads are busy and adjust the + test expectations accordingly, but for the subsequent slices, we want to assert that they are cancelled and never + executed. The only way to guarantee that is waiting for cancellation to kick in. + */ + assertBusy(() -> { + Future future = (Future) command; + if (future.isCancelled() == false) { + throw new AssertionError("task should be cancelled"); + } + }); + } catch (Exception e) { + throw new RuntimeException(e); } } + executor.execute(command); + } + } + + private static class TestQuery extends Query { + @Override + public String toString(String field) { + return "query"; + } + + @Override + public void visit(QueryVisitor visitor) { + visitor.visitLeaf(this); + } + + @Override + public boolean equals(Object o) { + return sameClassAs(o); + } + + @Override + public int hashCode() { + return classHash(); } } diff --git a/server/src/test/java/org/elasticsearch/search/query/NonCountingTermQuery.java b/server/src/test/java/org/elasticsearch/search/query/NonCountingTermQuery.java index 502345cf7978d..4f0c6c0513211 100644 --- a/server/src/test/java/org/elasticsearch/search/query/NonCountingTermQuery.java +++ b/server/src/test/java/org/elasticsearch/search/query/NonCountingTermQuery.java @@ -23,9 +23,9 @@ * Using this query we will never early-terminate the collection phase because we can already * get the document count from the term statistics of each segment. */ -class NonCountingTermQuery extends TermQuery { +public final class NonCountingTermQuery extends TermQuery { - NonCountingTermQuery(Term term) { + public NonCountingTermQuery(Term term) { super(term); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index d38b3d077b06e..9e9df3bca6889 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1402,6 +1402,8 @@ private Environment createEnvironment(String nodeName) { Settings.builder() .put(NODE_NAME_SETTING.getKey(), nodeName) .put(PATH_HOME_SETTING.getKey(), tempDir.resolve(nodeName).toAbsolutePath()) + // test uses the same executor service for all thread pools, search worker would need to be a different one + .put(SearchService.SEARCH_WORKER_THREADS_ENABLED.getKey(), false) .put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath()) .putList( ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index 97f0037f20367..36b719bd97b4a 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -23,6 +23,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.ThreadPoolExecutor; import static org.elasticsearch.threadpool.ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING; import static org.elasticsearch.threadpool.ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING; @@ -384,4 +386,23 @@ public void testWriteThreadPoolUsesTaskExecutionTimeTrackingEsThreadPoolExecutor assertTrue(terminate(threadPool)); } } + + public void testSearchWorkedThreadPool() { + final int allocatedProcessors = randomIntBetween(1, EsExecutors.allocatedProcessors(Settings.EMPTY)); + final ThreadPool threadPool = new TestThreadPool( + "test", + Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), allocatedProcessors).build() + ); + try { + ExecutorService executor = threadPool.executor(ThreadPool.Names.SEARCH_WORKER); + assertThat(executor, instanceOf(ThreadPoolExecutor.class)); + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; + int expectedPoolSize = allocatedProcessors * 3 / 2 + 1; + assertEquals(expectedPoolSize, threadPoolExecutor.getCorePoolSize()); + assertEquals(expectedPoolSize, threadPoolExecutor.getMaximumPoolSize()); + assertThat(threadPoolExecutor.getQueue(), instanceOf(LinkedTransferQueue.class)); + } finally { + assertTrue(terminate(threadPool)); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 27101cc980c4c..1de38e3d155be 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -594,7 +594,7 @@ public Void reduce(Collection collectors) { return null; } }; - if (aggTestConfig.builder().supportsConcurrentExecution()) { + if (aggTestConfig.builder().supportsParallelCollection()) { searcher.search(rewritten, collectorManager); } else { searcher.search(rewritten, collectorManager.newCollector()); @@ -928,9 +928,10 @@ protected IndexSearcher newIndexSearcher(DirectoryReader indexReader) throws IOE IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), - 1, // forces multiple slices randomBoolean(), - this.threadPoolExecutor + this.threadPoolExecutor, + this.threadPoolExecutor.getMaximumPoolSize(), + 1 // forces multiple slices ); } } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/BaseAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/BaseAggregationTestCase.java index 525ea0d8914c2..0a4af80c93a34 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/BaseAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/BaseAggregationTestCase.java @@ -60,26 +60,11 @@ public void testFromXContent() throws IOException { public void testSupportsConcurrentExecution() { AB builder = createTestAggregatorBuilder(); - boolean supportsConcurrency = builder.supportsConcurrentExecution(); - if (supportsConcurrency) { - assertTrue(builder.supportsOffloadingSequentialCollection()); - } - AggregationBuilder bucketBuilder = new HistogramAggregationBuilder("test"); - assertTrue(bucketBuilder.supportsConcurrentExecution()); - bucketBuilder.subAggregation(builder); - assertThat(bucketBuilder.supportsConcurrentExecution(), equalTo(supportsConcurrency)); - if (bucketBuilder.supportsConcurrentExecution()) { - assertTrue(bucketBuilder.supportsOffloadingSequentialCollection()); - } - } - - public void testSupportsOffloadingSequentialCollection() { - AB builder = createTestAggregatorBuilder(); - boolean supportsOffloadingSequentialCollection = builder.supportsOffloadingSequentialCollection(); + boolean supportsConcurrency = builder.supportsParallelCollection(); AggregationBuilder bucketBuilder = new HistogramAggregationBuilder("test"); - assertTrue(bucketBuilder.supportsOffloadingSequentialCollection()); + assertTrue(bucketBuilder.supportsParallelCollection()); bucketBuilder.subAggregation(builder); - assertThat(bucketBuilder.supportsOffloadingSequentialCollection(), equalTo(supportsOffloadingSequentialCollection)); + assertThat(bucketBuilder.supportsParallelCollection(), equalTo(supportsConcurrency)); } /** diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregationBuilder.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregationBuilder.java index 487da5733e183..1df2f6634ba1d 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregationBuilder.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregationBuilder.java @@ -153,7 +153,7 @@ public boolean supportsSampling() { } @Override - public boolean supportsConcurrentExecution() { + public boolean supportsParallelCollection() { return false; } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java index 061930ebaced1..b0ff6b5718288 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java @@ -192,6 +192,7 @@ public static boolean assertCurrentThreadMayAccessBlobStore() { ThreadPool.Names.SNAPSHOT, ThreadPool.Names.GENERIC, ThreadPool.Names.SEARCH, + ThreadPool.Names.SEARCH_WORKER, ThreadPool.Names.SEARCH_THROTTLED, // Cache asynchronous fetching runs on a dedicated thread pool.