Skip to content

Commit

Permalink
Avoid execute ESQL planning on refresh thread (elastic#104591)
Browse files Browse the repository at this point in the history
A recent report shows that we can perform ESQL planning on the refresh
thread pool after waiting for refreshes from search-idle shards. While
the planning process is generally lightweight, it may become expensive
at times. Therefore, we should fork off the refresh thread pool
immediately upon resuming ESQL execution. Another place where we should
fork off is after field_caps. I will look into that later.
  • Loading branch information
dnhatn committed Jan 25, 2024
1 parent 02a1255 commit 99d3c52
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 35 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/104591.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 104591
summary: Avoid execute ESQL planning on refresh thread
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchShardsGroup;
import org.elasticsearch.action.search.SearchShardsRequest;
Expand All @@ -24,7 +25,6 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.compute.OwningChannelActionListener;
import org.elasticsearch.compute.data.BlockFactory;
Expand Down Expand Up @@ -332,50 +332,55 @@ private void acquireSearchContexts(
Map<Index, AliasFilter> aliasFilters,
ActionListener<List<SearchContext>> listener
) {
final List<IndexShard> targetShards = new ArrayList<>();
try {
List<IndexShard> targetShards = new ArrayList<>();
for (ShardId shardId : shardIds) {
var indexShard = searchService.getIndicesService().indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
targetShards.add(indexShard);
}
if (targetShards.isEmpty()) {
listener.onResponse(List.of());
return;
} catch (Exception e) {
listener.onFailure(e);
return;
}
final var doAcquire = ActionRunnable.supply(listener, () -> {
final List<SearchContext> searchContexts = new ArrayList<>(targetShards.size());
boolean success = false;
try {
for (IndexShard shard : targetShards) {
var aliasFilter = aliasFilters.getOrDefault(shard.shardId().getIndex(), AliasFilter.EMPTY);
var shardRequest = new ShardSearchRequest(shard.shardId(), configuration.absoluteStartedTimeInMillis(), aliasFilter);
SearchContext context = searchService.createSearchContext(shardRequest, SearchService.NO_TIMEOUT);
searchContexts.add(context);
}
for (SearchContext searchContext : searchContexts) {
searchContext.preProcess();
}
success = true;
return searchContexts;
} finally {
if (success == false) {
IOUtils.close(searchContexts);
}
}
CountDown countDown = new CountDown(targetShards.size());
});
final AtomicBoolean waitedForRefreshes = new AtomicBoolean();
try (RefCountingRunnable refs = new RefCountingRunnable(() -> {
if (waitedForRefreshes.get()) {
esqlExecutor.execute(doAcquire);
} else {
doAcquire.run();
}
})) {
for (IndexShard targetShard : targetShards) {
targetShard.ensureShardSearchActive(ignored -> {
if (countDown.countDown()) {
ActionListener.completeWith(listener, () -> {
final List<SearchContext> searchContexts = new ArrayList<>(targetShards.size());
boolean success = false;
try {
for (IndexShard shard : targetShards) {
var aliasFilter = aliasFilters.getOrDefault(shard.shardId().getIndex(), AliasFilter.EMPTY);
var shardRequest = new ShardSearchRequest(
shard.shardId(),
configuration.absoluteStartedTimeInMillis(),
aliasFilter
);
SearchContext context = searchService.createSearchContext(shardRequest, SearchService.NO_TIMEOUT);
searchContexts.add(context);
}
for (SearchContext searchContext : searchContexts) {
searchContext.preProcess();
}
success = true;
return searchContexts;
} finally {
if (success == false) {
IOUtils.close(searchContexts);
}
}
});
final Releasable ref = refs.acquire();
targetShard.ensureShardSearchActive(await -> {
try (ref) {
if (await) {
waitedForRefreshes.set(true);
}
}
});
}
} catch (Exception e) {
listener.onFailure(e);
}
}

Expand Down Expand Up @@ -505,6 +510,7 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
final ActionListener<DataNodeResponse> listener = new OwningChannelActionListener<>(channel);
final EsqlConfiguration configuration = request.configuration();
acquireSearchContexts(request.shardIds(), configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> {
assert ThreadPool.assertCurrentThreadPool(ESQL_THREAD_POOL_NAME);
var computeContext = new ComputeContext(sessionId, searchContexts, configuration, null, exchangeSink);
runCompute(parentTask, computeContext, request.plan(), ActionListener.wrap(driverProfiles -> {
// don't return until all pages are fetched
Expand Down

0 comments on commit 99d3c52

Please sign in to comment.