From 88a1f775e8dcfe71746da0c4322d133235a900c6 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 3 Jul 2024 15:53:46 -0400 Subject: [PATCH 1/2] ESQL: Plumb a way to run phased plans INLINESTATS is going to run two ESQL commands - one to get the STATS and one to join the stats results to the output. This plumbs a way for `EsqlSession#execute` to run multiple dips into the compute engine using a `BiConsumer> runPhase`. For now, we just plug that right into the output to keep things working as they are now. But soon, so soon, we'll plug in a second phase. --- .../xpack/esql/execution/PlanExecutor.java | 8 ++- .../xpack/esql/plugin/ComputeService.java | 9 ++-- .../esql/plugin/TransportEsqlQueryAction.java | 49 ++++++++++--------- .../xpack/esql/session/EsqlSession.java | 14 +++++- .../xpack/esql/session/Result.java | 4 +- .../esql/stats/PlanExecutorMetricsTests.java | 12 +++-- 6 files changed, 60 insertions(+), 36 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java index df67f4609c33..4e07c3084ab7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java @@ -20,9 +20,12 @@ import org.elasticsearch.xpack.esql.session.EsqlConfiguration; import org.elasticsearch.xpack.esql.session.EsqlSession; import org.elasticsearch.xpack.esql.session.IndexResolver; +import org.elasticsearch.xpack.esql.session.Result; import org.elasticsearch.xpack.esql.stats.Metrics; import org.elasticsearch.xpack.esql.stats.QueryMetric; +import java.util.function.BiConsumer; + import static org.elasticsearch.action.ActionListener.wrap; public class PlanExecutor { @@ -48,7 +51,8 @@ public void esql( String sessionId, EsqlConfiguration cfg, EnrichPolicyResolver enrichPolicyResolver, - ActionListener listener + BiConsumer> runPhase, + ActionListener listener ) { final var session = new EsqlSession( sessionId, @@ -63,7 +67,7 @@ public void esql( ); QueryMetric clientId = QueryMetric.fromString("rest"); metrics.total(clientId); - session.execute(request, wrap(listener::onResponse, ex -> { + session.execute(request, runPhase, wrap(listener::onResponse, ex -> { // TODO when we decide if we will differentiate Kibana from REST, this String value will likely come from the request metrics.failed(clientId); listener.onFailure(ex); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 4ebc4af25813..e28c8e843464 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -72,6 +72,7 @@ import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.session.EsqlConfiguration; +import org.elasticsearch.xpack.esql.session.Result; import java.util.ArrayList; import java.util.Collections; @@ -89,8 +90,6 @@ * Computes the result of a {@link PhysicalPlan}. */ public class ComputeService { - public record Result(List pages, List profiles) {} - private static final Logger LOGGER = LogManager.getLogger(ComputeService.class); private final SearchService searchService; private final BigArrays bigArrays; @@ -176,7 +175,7 @@ public void execute( rootTask, computeContext, coordinatorPlan, - listener.map(driverProfiles -> new Result(collectedPages, driverProfiles)) + listener.map(driverProfiles -> new Result(physicalPlan.output(), collectedPages, driverProfiles)) ); return; } else { @@ -201,7 +200,9 @@ public void execute( ); try ( Releasable ignored = exchangeSource.addEmptySink(); - RefCountingListener refs = new RefCountingListener(listener.map(unused -> new Result(collectedPages, collectedProfiles))) + RefCountingListener refs = new RefCountingListener( + listener.map(unused -> new Result(physicalPlan.output(), collectedPages, collectedProfiles)) + ) ) { // run compute on the coordinator exchangeSource.addCompletionListener(refs.acquire()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 9328992120c0..9a09324ae25e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -37,7 +37,9 @@ import org.elasticsearch.xpack.esql.enrich.EnrichLookupService; import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver; import org.elasticsearch.xpack.esql.execution.PlanExecutor; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.session.EsqlConfiguration; +import org.elasticsearch.xpack.esql.session.Result; import java.io.IOException; import java.time.ZoneOffset; @@ -45,6 +47,7 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.Executor; +import java.util.function.BiConsumer; import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; @@ -157,37 +160,37 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener> runPhase = (physicalPlan, resultListener) -> computeService.execute( + sessionId, + (CancellableTask) task, + physicalPlan, + configuration, + resultListener + ); + planExecutor.esql( request, sessionId, configuration, enrichPolicyResolver, - listener.delegateFailureAndWrap( - (delegate, physicalPlan) -> computeService.execute( - sessionId, - (CancellableTask) task, - physicalPlan, - configuration, - delegate.map(result -> { - List columns = physicalPlan.output() - .stream() - .map(c -> new ColumnInfoImpl(c.qualifiedName(), c.dataType().outputType())) - .toList(); - EsqlQueryResponse.Profile profile = configuration.profile() - ? new EsqlQueryResponse.Profile(result.profiles()) - : null; - if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) { - String id = asyncTask.getExecutionId().getEncoded(); - return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), id, false, request.async()); - } else { - return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async()); - } - }) - ) - ) + runPhase, + listener.map(result -> toResponse(task, request, configuration, result)) ); } + private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, EsqlConfiguration configuration, Result result) { + List columns = result.layout() + .stream() + .map(c -> new ColumnInfoImpl(c.qualifiedName(), c.dataType().outputType())) + .toList(); + EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null; + if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) { + String id = asyncTask.getExecutionId().getEncoded(); + return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), id, false, request.async()); + } + return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async()); + } + /** * Returns the ID for this compute session. The ID is unique within the cluster, and is used * to identify the compute-session across nodes. The ID is just the TaskID of the task that diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 3119b328e807..370de6bb2ce8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -58,6 +58,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -110,10 +111,19 @@ public String sessionId() { return sessionId; } - public void execute(EsqlQueryRequest request, ActionListener listener) { + public void execute( + EsqlQueryRequest request, + BiConsumer> runPhase, + ActionListener listener + ) { LOGGER.debug("ESQL query:\n{}", request.query()); + LogicalPlan logicalPlan = parse(request.query(), request.params()); + logicalPlanToPhysicalPlan(logicalPlan, request, listener.delegateFailureAndWrap((l, r) -> runPhase.accept(r, l))); + } + + private void logicalPlanToPhysicalPlan(LogicalPlan logicalPlan, EsqlQueryRequest request, ActionListener listener) { optimizedPhysicalPlan( - parse(request.query(), request.params()), + logicalPlan, listener.map(plan -> EstimatesRowSize.estimateRowSize(0, plan.transformUp(FragmentExec.class, f -> { QueryBuilder filter = request.filter(); if (filter != null) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java index 7cbf3987af2c..1a6becadf7e4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java @@ -7,8 +7,10 @@ package org.elasticsearch.xpack.esql.session; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.xpack.esql.core.expression.Attribute; import java.util.List; -public record Result(List columns, List> values) {} +public record Result(List layout, List pages, List profiles) {} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java index 5883d41f3212..427c30311df0 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.esql.execution.PlanExecutor; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.session.IndexResolver; +import org.elasticsearch.xpack.esql.session.Result; import org.elasticsearch.xpack.esql.type.EsqlDataTypeRegistry; import org.junit.After; import org.junit.Before; @@ -33,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning; import static org.hamcrest.Matchers.instanceOf; @@ -100,9 +102,10 @@ public void testFailedMetric() { var request = new EsqlQueryRequest(); // test a failed query: xyz field doesn't exist request.query("from test | stats m = max(xyz)"); - planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, new ActionListener<>() { + BiConsumer> runPhase = (p, r) -> fail("this shouldn't happen"); + planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, runPhase, new ActionListener<>() { @Override - public void onResponse(PhysicalPlan physicalPlan) { + public void onResponse(Result result) { fail("this shouldn't happen"); } @@ -119,9 +122,10 @@ public void onFailure(Exception e) { // fix the failing query: foo field does exist request.query("from test | stats m = max(foo)"); - planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, new ActionListener<>() { + runPhase = (p, r) -> r.onResponse(null); + planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, runPhase, new ActionListener<>() { @Override - public void onResponse(PhysicalPlan physicalPlan) {} + public void onResponse(Result result) {} @Override public void onFailure(Exception e) { From 40a42361420d58b9c24647e21f6711814b91446f Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 5 Jul 2024 09:25:17 -0400 Subject: [PATCH 2/2] Rename --- .../esql/plugin/TransportEsqlQueryAction.java | 2 +- .../elasticsearch/xpack/esql/session/Result.java | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 9a09324ae25e..5a6812c96975 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -179,7 +179,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener columns = result.layout() + List columns = result.schema() .stream() .map(c -> new ColumnInfoImpl(c.qualifiedName(), c.dataType().outputType())) .toList(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java index 1a6becadf7e4..5abaa78f5419 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java @@ -7,10 +7,23 @@ package org.elasticsearch.xpack.esql.session; +import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan; import java.util.List; -public record Result(List layout, List pages, List profiles) {} +/** + * Results from running a chunk of ESQL. + * @param schema "Schema" of the {@link Attribute}s that are produced by the {@link LogicalPlan} + * that was run. Each {@link Page} contains a {@link Block} of values for each + * attribute in this list. + * @param pages Actual values produced by running the ESQL. + * @param profiles {@link DriverProfile}s from all drivers that ran to produce the output. These + * are quite cheap to build, so we build them for all ESQL runs, regardless of if + * users have asked for them. But we only include them in the results if users ask + * for them. + */ +public record Result(List schema, List pages, List profiles) {}