Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce executor for concurrent search #98204

Merged
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
20e6dbe
Introduce executor for concurrent search
javanna Aug 4, 2023
01733dc
Update docs/changelog/98204.yaml
javanna Aug 4, 2023
9229f4d
forbidden api
javanna Aug 4, 2023
f67551f
grrrrrrrrrr
javanna Aug 4, 2023
db6e906
leftover
javanna Aug 4, 2023
3b86ab1
forbidden api
javanna Aug 4, 2023
438d76d
spotless
javanna Aug 4, 2023
ad260e0
Update server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
javanna Aug 4, 2023
5a8ec56
iter
javanna Aug 4, 2023
9069bfe
iter
javanna Aug 7, 2023
026258c
iter
javanna Aug 7, 2023
fed2aaf
Merge branch 'main' into enhancement/bounded_executor_concurrent_search
javanna Aug 7, 2023
6688e92
iter
javanna Aug 7, 2023
cfcc819
iter
javanna Aug 7, 2023
d2a982b
special case for no segments
javanna Aug 7, 2023
0b2b243
spotless
javanna Aug 8, 2023
103363f
terminate thread pool in test
javanna Aug 8, 2023
b44a239
Merge branch 'main' into enhancement/bounded_executor_concurrent_search
javanna Aug 8, 2023
4220d6a
fix thread pool executor cast
javanna Aug 8, 2023
89faa2b
iter
javanna Aug 8, 2023
9ec1fd5
Merge branch 'main' into enhancement/bounded_executor_concurrent_search
javanna Aug 8, 2023
eced2c8
iter
javanna Aug 8, 2023
2844a05
iter
javanna Aug 8, 2023
5959f83
remove TODO and clarify comments
javanna Aug 8, 2023
ee55047
clarify docs
javanna Aug 8, 2023
8c77ed6
spotless
javanna Aug 8, 2023
af76ab6
iter
javanna Aug 8, 2023
262996d
Remove supportsOffloadingSequentialCollection() method and do aggrega…
martijnvg Aug 9, 2023
bb4ffca
Also postCollect when there are no slices
martijnvg Aug 9, 2023
c3d8034
address tests
martijnvg Aug 9, 2023
151b93b
spotless
martijnvg Aug 9, 2023
23121e3
Correctly overwrite supportsParallelCollection(...)
martijnvg Aug 10, 2023
bca4050
fixed unit tests
martijnvg Aug 10, 2023
df0ff91
Also perform postCollection when executing search in sort order.
martijnvg Aug 10, 2023
9b92a0b
Comment
javanna Aug 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/98204.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 98204
summary: Introduce executor for concurrent search
area: Search
type: enhancement
issues: []
10 changes: 8 additions & 2 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@ There are several thread pools, but the important ones include:

[[search-threadpool]]
`search`::
For count/search/suggest operations. Thread pool type is
`fixed` with a size of `int((`<<node.processors,
For coordination of count/search operations at the shard level whose computation
is offloaded to the search_worker thread pool. Used also by fetch and other search
related operations Thread pool type is `fixed` with a size of `int((`<<node.processors,
`# of allocated processors`>>`pass:[ * ]3) / 2) + 1`, and queue_size of `1000`.

`search_worker`::
For the heavy workload of count/search operations that may be executed concurrently
across segments within the same shard when possible. Thread pool type is `fixed`
with a size of `int((`<<node.processors, `# of allocated processors`>>`pass:[ * ]3) / 2) + 1`, and unbounded queue_size .

[[search-throttled]]`search_throttled`::
For count/search/suggest/get operations on `search_throttled indices`.
Thread pool type is `fixed` with a size of `1`, and queue_size of `100`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ResourceWatcherService.RELOAD_INTERVAL_LOW,
SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING,
SearchModule.INDICES_MAX_NESTED_DEPTH_SETTING,
SearchModule.SEARCH_CONCURRENCY_ENABLED,
SearchService.SEARCH_WORKER_THREADS_ENABLED,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have renamed the escape hatch to disable concurrency to align it with the name of the new thread pool, as it effectively affects whether the new thread pool is enabled or not.

ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING,
ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.LongSupplier;

final class DefaultSearchContext extends SearchContext {
Expand Down Expand Up @@ -136,10 +137,11 @@ final class DefaultSearchContext extends SearchContext {
SearchShardTarget shardTarget,
LongSupplier relativeTimeSupplier,
TimeValue timeout,
int minimumDocsPerSlice,
FetchPhase fetchPhase,
boolean lowLevelCancellation,
boolean parallelize
Executor executor,
int maximumNumberOfSlices,
int minimumDocsPerSlice
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand All @@ -150,19 +152,26 @@ final class DefaultSearchContext extends SearchContext {
this.indexShard = readerContext.indexShard();

Engine.Searcher engineSearcher = readerContext.acquireSearcher("search");
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
engineSearcher.getSimilarity(),
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
minimumDocsPerSlice,
lowLevelCancellation,
// TODO not set the for now, this needs a special thread pool and can be enabled after its introduction
// parallelize
// ? (EsThreadPoolExecutor) this.indexService.getThreadPool().executor(ThreadPool.Names.CONCURRENT_COLLECTION_TBD)
// : null,
null
);
if (executor == null) {
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
engineSearcher.getSimilarity(),
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
lowLevelCancellation
);
} else {
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
engineSearcher.getSimilarity(),
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
lowLevelCancellation,
executor,
maximumNumberOfSlices,
minimumDocsPerSlice
);
}
releasables.addAll(List.of(engineSearcher, searcher));

this.relativeTimeSupplier = relativeTimeSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,6 @@ public class SearchModule {
Setting.Property.NodeScope
);

public static final Setting<Boolean> SEARCH_CONCURRENCY_ENABLED = Setting.boolSetting(
"search.concurrency_enabled",
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to SearchService together with the other existing search settings (including minimum docs per slice which affects search concurrency)

private final Map<String, Highlighter> highlighters;

private final List<FetchSubPhase> fetchSubPhases = new ArrayList<>();
Expand Down
42 changes: 28 additions & 14 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -141,7 +143,6 @@
import static org.elasticsearch.core.TimeValue.timeValueMillis;
import static org.elasticsearch.core.TimeValue.timeValueMinutes;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.search.SearchModule.SEARCH_CONCURRENCY_ENABLED;

public class SearchService extends AbstractLifecycleComponent implements IndexEventListener {
private static final Logger logger = LogManager.getLogger(SearchService.class);
Expand Down Expand Up @@ -211,6 +212,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

public static final Setting<Boolean> SEARCH_WORKER_THREADS_ENABLED = Setting.boolSetting(
"search.worker_threads_enabled",
true,
Property.NodeScope,
Property.Dynamic
);

public static final Setting<Integer> MAX_OPEN_SCROLL_CONTEXT = Setting.intSetting(
"search.max_open_scroll_context",
500,
Expand Down Expand Up @@ -253,7 +261,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final DfsPhase dfsPhase = new DfsPhase();

private final FetchPhase fetchPhase;
private volatile boolean enableConcurrentCollection;
private volatile boolean enableSearchWorkerThreads;

private volatile long defaultKeepAlive;

Expand Down Expand Up @@ -344,16 +352,12 @@ public SearchService(
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER, this::setEnableRewriteAggsToFilterByFilter);

enableConcurrentCollection = SEARCH_CONCURRENCY_ENABLED.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_CONCURRENCY_ENABLED, this::setEnableConcurrentCollection);
}

private void setEnableConcurrentCollection(boolean concurrentCollection) {
this.enableConcurrentCollection = concurrentCollection;
enableSearchWorkerThreads = SEARCH_WORKER_THREADS_ENABLED.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_WORKER_THREADS_ENABLED, this::setEnableSearchWorkerThreads);
}

boolean isConcurrentCollectionEnabled() {
return this.enableConcurrentCollection;
private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) {
this.enableSearchWorkerThreads = enableSearchWorkerThreads;
}

private static void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
Expand Down Expand Up @@ -1039,7 +1043,7 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time
final Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier();
final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet());
try (ReaderContext readerContext = new ReaderContext(id, indexService, indexShard, reader, -1L, true)) {
DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout, null);
DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout, ResultsType.NONE);
searchContext.addReleasable(readerContext.markAsUsed(0L));
return searchContext;
}
Expand All @@ -1060,16 +1064,20 @@ private DefaultSearchContext createSearchContext(
reader.indexShard().shardId(),
request.getClusterAlias()
);
ExecutorService executor = this.enableSearchWorkerThreads ? threadPool.executor(Names.SEARCH_WORKER) : null;
int maximumNumberOfSlices = executor instanceof ThreadPoolExecutor tpe
&& supportsParallelCollection(resultsType, request.source()) ? tpe.getMaximumPoolSize() : 1;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whenever search workers are disabled through escape hatch, we don't set the executor.
Whenever the request does not support offloading sequential collection to the worker threads, we also don't set the executor.

Otherwise, we do set an executor, but we really parallelize only the DFS phase. The query phase still executes sequentially, because it does not support concurrency yet, but sequential execution is performed by the worker threads.

Copy link
Member Author

@javanna javanna Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update:

Whenever search workers are disabled through escape hatch, we don't set the executor.
Otherwise, we do set an executor, but we really parallelize only the DFS phase. The query phase still executes sequentially, because it does not support concurrency yet, but sequential execution is performed by the worker threads.

searchContext = new DefaultSearchContext(
reader,
request,
shardTarget,
threadPool::relativeTimeInMillis,
timeout,
minimumDocsPerSlice,
fetchPhase,
lowLevelCancellation,
this.enableConcurrentCollection && concurrentSearchEnabled(resultsType, request.source())
executor,
maximumNumberOfSlices,
minimumDocsPerSlice
);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
Expand All @@ -1089,10 +1097,16 @@ this.enableConcurrentCollection && concurrentSearchEnabled(resultsType, request.
return searchContext;
}

static boolean concurrentSearchEnabled(ResultsType resultsType, SearchSourceBuilder source) {
static boolean supportsParallelCollection(ResultsType resultsType, SearchSourceBuilder source) {
if (resultsType == ResultsType.DFS) {
return true; // only enable concurrent collection for DFS phase for now
}
/*
TODO uncomment this block to enable inter-segment concurrency for the query phase
if (resultsType == ResultsType.QUERY) {
return source == null || source.aggregations() == null || source.aggregations().supportsParallelCollection();
}
*/
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,35 +220,15 @@ public boolean isInSortOrderExecutionRequired() {
}

/**
* Return false if this aggregation or any of the child aggregations does not support concurrent search.
* As a result, such aggregation will always be executed sequentially despite concurrency is enabled for the query phase.
* Note: aggregations that don't support concurrency, may or may not support offloading their collection to the search worker threads,
* depending on what {@link #supportsOffloadingSequentialCollection()} returns.
* Return false if this aggregation or any of the child aggregations does not support parallel collection.
* As a result, a request including such aggregation is always executed sequentially despite concurrency is enabled for the query phase.
*/
public boolean supportsConcurrentExecution() {
public boolean supportsParallelCollection() {
if (isInSortOrderExecutionRequired()) {
return false;
}
for (AggregationBuilder builder : factoriesBuilder.getAggregatorFactories()) {
if (builder.supportsConcurrentExecution() == false) {
return false;
}
}
return supportsOffloadingSequentialCollection();
}

/**
* Returns false if this aggregation or any of its child aggregations does not support offloading its sequential collection
* to a separate thread. As a result, such aggregation will always be executed sequentially, and fully in the search thread,
* without offloading its collection to the search worker threads.
* Note: aggregations that don't support offloading sequential collection, don't support concurrency by definition.
*/
public boolean supportsOffloadingSequentialCollection() {
if (isInSortOrderExecutionRequired()) {
return false;
}
for (AggregationBuilder builder : factoriesBuilder.getAggregatorFactories()) {
if (builder.supportsOffloadingSequentialCollection() == false) {
if (builder.supportsParallelCollection() == false) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private static void executeInSortOrder(SearchContext context, BucketCollector co
searcher.setProfiler(context);
try {
searcher.search(context.rewrittenQuery(), collector);
collector.postCollection();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking that this is another exception to the offloading that we should remove. We should provide an executor to the time series index searcher, and offload its sequential execution to it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same thing, when making that change.
However this isn't a normal searcher, but a TimeSeriesIndexSearcher and does searching completely different.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, I think though that because it's part of the query phase, it could potentially cause us to take in more heavy work than desired, say that we have many searches being offloaded and at the same time many others using this time series index searcher, we'd have many threads (potentially double compared to before adding the search workers thread pool) doing I/O and CPU bound stuff.

I don't think this is a blocker for this PR but we should look into it as a follow-up.

} catch (IOException e) {
throw new AggregationExecutionException("Could not perform time series aggregation", e);
}
Expand Down Expand Up @@ -113,7 +114,6 @@ public static void execute(SearchContext context) {
final List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
for (Aggregator aggregator : aggregators) {
try {
aggregator.postCollection();
aggregations.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,17 @@ public boolean isInSortOrderExecutionRequired() {
}

/**
* Return false if this aggregation or any of the child aggregations does not support concurrent search
* Return false if this aggregation or any of the child aggregations does not support parallel collection.
* As a result, a request including such aggregation is always executed sequentially despite concurrency is enabled for the query
* phase.
*/
public boolean supportsConcurrentExecution() {
public boolean supportsParallelCollection() {
javanna marked this conversation as resolved.
Show resolved Hide resolved
for (AggregationBuilder builder : aggregationBuilders) {
if (builder.supportsConcurrentExecution() == false) {
if (builder.supportsParallelCollection() == false) {
return false;
}
}
return isInSortOrderExecutionRequired() == false;
return true;
javanna marked this conversation as resolved.
Show resolved Hide resolved
}

public Builder addAggregator(AggregationBuilder factory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public final Collector asCollector() {
return new BucketCollectorWrapper(this);
}

private record BucketCollectorWrapper(BucketCollector bucketCollector) implements Collector {
public record BucketCollectorWrapper(BucketCollector bucketCollector) implements Collector {

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public boolean supportsSampling() {
}

@Override
public boolean supportsOffloadingSequentialCollection() {
public boolean supportsParallelCollection() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public BucketCardinality bucketCardinality() {
}

@Override
public boolean supportsConcurrentExecution() {
public boolean supportsParallelCollection() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public BucketCardinality bucketCardinality() {
}

@Override
public boolean supportsOffloadingSequentialCollection() {
public boolean supportsParallelCollection() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public boolean supportsSampling() {
}

@Override
public boolean supportsConcurrentExecution() {
public boolean supportsParallelCollection() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public boolean supportsSampling() {
}

@Override
public boolean supportsOffloadingSequentialCollection() {
public boolean supportsParallelCollection() {
javanna marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

Expand Down
Loading