diff --git a/docs/changelog/104591.yaml b/docs/changelog/104591.yaml new file mode 100644 index 000000000000..0bd054385753 --- /dev/null +++ b/docs/changelog/104591.yaml @@ -0,0 +1,5 @@ +pr: 104591 +summary: Avoid execute ESQL planning on refresh thread +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 172fc0a3dc5c..307b17814e14 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchShardsGroup; @@ -22,7 +23,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; @@ -435,51 +435,60 @@ private void acquireSearchContexts( Map aliasFilters, ActionListener> listener ) { + final List targetShards = new ArrayList<>(); try { - List targetShards = new ArrayList<>(); for (ShardId shardId : shardIds) { var indexShard = searchService.getIndicesService().indexServiceSafe(shardId.getIndex()).getShard(shardId.id()); targetShards.add(indexShard); } - if (targetShards.isEmpty()) { - listener.onResponse(List.of()); - return; + } catch (Exception e) { + listener.onFailure(e); + return; + } + final var doAcquire = ActionRunnable.supply(listener, () -> { + final List searchContexts = new ArrayList<>(targetShards.size()); + boolean success = false; + try { + for (IndexShard shard : targetShards) { + var aliasFilter = aliasFilters.getOrDefault(shard.shardId().getIndex(), AliasFilter.EMPTY); + var shardRequest = new ShardSearchRequest( + shard.shardId(), + configuration.absoluteStartedTimeInMillis(), + aliasFilter, + clusterAlias + ); + SearchContext context = searchService.createSearchContext(shardRequest, SearchService.NO_TIMEOUT); + searchContexts.add(context); + } + for (SearchContext searchContext : searchContexts) { + searchContext.preProcess(); + } + success = true; + return searchContexts; + } finally { + if (success == false) { + IOUtils.close(searchContexts); + } + } + }); + final AtomicBoolean waitedForRefreshes = new AtomicBoolean(); + try (RefCountingRunnable refs = new RefCountingRunnable(() -> { + if (waitedForRefreshes.get()) { + esqlExecutor.execute(doAcquire); + } else { + doAcquire.run(); } - CountDown countDown = new CountDown(targetShards.size()); + })) { for (IndexShard targetShard : targetShards) { - targetShard.ensureShardSearchActive(ignored -> { - if (countDown.countDown()) { - ActionListener.completeWith(listener, () -> { - final List searchContexts = new ArrayList<>(targetShards.size()); - boolean success = false; - try { - for (IndexShard shard : targetShards) { - var aliasFilter = aliasFilters.getOrDefault(shard.shardId().getIndex(), AliasFilter.EMPTY); - var shardRequest = new ShardSearchRequest( - shard.shardId(), - configuration.absoluteStartedTimeInMillis(), - aliasFilter, - clusterAlias - ); - SearchContext context = searchService.createSearchContext(shardRequest, SearchService.NO_TIMEOUT); - searchContexts.add(context); - } - for (SearchContext searchContext : searchContexts) { - searchContext.preProcess(); - } - success = true; - return searchContexts; - } finally { - if (success == false) { - IOUtils.close(searchContexts); - } - } - }); + final Releasable ref = refs.acquire(); + targetShard.ensureShardSearchActive(await -> { + try (ref) { + if (await) { + waitedForRefreshes.set(true); + } } }); } - } catch (Exception e) { - listener.onFailure(e); } } @@ -585,6 +594,7 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> { + assert ThreadPool.assertCurrentThreadPool(ESQL_THREAD_POOL_NAME); var computeContext = new ComputeContext(sessionId, clusterAlias, searchContexts, configuration, null, exchangeSink); runCompute(parentTask, computeContext, request.plan(), ActionListener.wrap(driverProfiles -> { // don't return until all pages are fetched