Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Plumb a way to run phased plans #110445

Merged
merged 3 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
import org.elasticsearch.xpack.esql.session.EsqlSession;
import org.elasticsearch.xpack.esql.session.IndexResolver;
import org.elasticsearch.xpack.esql.session.Result;
import org.elasticsearch.xpack.esql.stats.Metrics;
import org.elasticsearch.xpack.esql.stats.QueryMetric;

import java.util.function.BiConsumer;

import static org.elasticsearch.action.ActionListener.wrap;

public class PlanExecutor {
Expand All @@ -48,7 +51,8 @@ public void esql(
String sessionId,
EsqlConfiguration cfg,
EnrichPolicyResolver enrichPolicyResolver,
ActionListener<PhysicalPlan> listener
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
ActionListener<Result> listener
) {
final var session = new EsqlSession(
sessionId,
Expand All @@ -63,7 +67,7 @@ public void esql(
);
QueryMetric clientId = QueryMetric.fromString("rest");
metrics.total(clientId);
session.execute(request, wrap(listener::onResponse, ex -> {
session.execute(request, runPhase, wrap(listener::onResponse, ex -> {
// TODO when we decide if we will differentiate Kibana from REST, this String value will likely come from the request
metrics.failed(clientId);
listener.onFailure(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
import org.elasticsearch.xpack.esql.session.Result;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -89,8 +90,6 @@
* Computes the result of a {@link PhysicalPlan}.
*/
public class ComputeService {
public record Result(List<Page> pages, List<DriverProfile> profiles) {}

private static final Logger LOGGER = LogManager.getLogger(ComputeService.class);
private final SearchService searchService;
private final BigArrays bigArrays;
Expand Down Expand Up @@ -176,7 +175,7 @@ public void execute(
rootTask,
computeContext,
coordinatorPlan,
listener.map(driverProfiles -> new Result(collectedPages, driverProfiles))
listener.map(driverProfiles -> new Result(physicalPlan.output(), collectedPages, driverProfiles))
);
return;
} else {
Expand All @@ -201,7 +200,9 @@ public void execute(
);
try (
Releasable ignored = exchangeSource.addEmptySink();
RefCountingListener refs = new RefCountingListener(listener.map(unused -> new Result(collectedPages, collectedProfiles)))
RefCountingListener refs = new RefCountingListener(
listener.map(unused -> new Result(physicalPlan.output(), collectedPages, collectedProfiles))
)
) {
// run compute on the coordinator
exchangeSource.addCompletionListener(refs.acquire());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
import org.elasticsearch.xpack.esql.execution.PlanExecutor;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
import org.elasticsearch.xpack.esql.session.Result;

import java.io.IOException;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;

Expand Down Expand Up @@ -157,37 +160,37 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
request.tables()
);
String sessionId = sessionID(task);
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase = (physicalPlan, resultListener) -> computeService.execute(
sessionId,
(CancellableTask) task,
physicalPlan,
configuration,
resultListener
);

planExecutor.esql(
request,
sessionId,
configuration,
enrichPolicyResolver,
listener.delegateFailureAndWrap(
(delegate, physicalPlan) -> computeService.execute(
sessionId,
(CancellableTask) task,
physicalPlan,
configuration,
delegate.map(result -> {
List<ColumnInfoImpl> columns = physicalPlan.output()
.stream()
.map(c -> new ColumnInfoImpl(c.qualifiedName(), c.dataType().outputType()))
.toList();
EsqlQueryResponse.Profile profile = configuration.profile()
? new EsqlQueryResponse.Profile(result.profiles())
: null;
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
String id = asyncTask.getExecutionId().getEncoded();
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), id, false, request.async());
} else {
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async());
}
})
)
)
runPhase,
listener.map(result -> toResponse(task, request, configuration, result))
);
}

private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, EsqlConfiguration configuration, Result result) {
List<ColumnInfoImpl> columns = result.layout()
.stream()
.map(c -> new ColumnInfoImpl(c.qualifiedName(), c.dataType().outputType()))
.toList();
EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null;
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
String id = asyncTask.getExecutionId().getEncoded();
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), id, false, request.async());
}
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async());
}

/**
* Returns the ID for this compute session. The ID is unique within the cluster, and is used
* to identify the compute-session across nodes. The ID is just the TaskID of the task that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -110,10 +111,19 @@ public String sessionId() {
return sessionId;
}

public void execute(EsqlQueryRequest request, ActionListener<PhysicalPlan> listener) {
public void execute(
EsqlQueryRequest request,
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
ActionListener<Result> listener
) {
LOGGER.debug("ESQL query:\n{}", request.query());
LogicalPlan logicalPlan = parse(request.query(), request.params());
logicalPlanToPhysicalPlan(logicalPlan, request, listener.delegateFailureAndWrap((l, r) -> runPhase.accept(r, l)));
}

private void logicalPlanToPhysicalPlan(LogicalPlan logicalPlan, EsqlQueryRequest request, ActionListener<PhysicalPlan> listener) {
optimizedPhysicalPlan(
parse(request.query(), request.params()),
logicalPlan,
listener.map(plan -> EstimatesRowSize.estimateRowSize(0, plan.transformUp(FragmentExec.class, f -> {
QueryBuilder filter = request.filter();
if (filter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.xpack.esql.core.expression.Attribute;

import java.util.List;

public record Result(List<Attribute> columns, List<List<Object>> values) {}
public record Result(List<Attribute> layout, List<Page> pages, List<DriverProfile> profiles) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: there's also a datatype Layout which is not quite just a list of attributes; we sometimes also use fields (schema would also be appropriate but isn't really used in our code base)

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.xpack.esql.execution.PlanExecutor;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.IndexResolver;
import org.elasticsearch.xpack.esql.session.Result;
import org.elasticsearch.xpack.esql.type.EsqlDataTypeRegistry;
import org.junit.After;
import org.junit.Before;
Expand All @@ -33,6 +34,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -100,9 +102,10 @@ public void testFailedMetric() {
var request = new EsqlQueryRequest();
// test a failed query: xyz field doesn't exist
request.query("from test | stats m = max(xyz)");
planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, new ActionListener<>() {
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase = (p, r) -> fail("this shouldn't happen");
planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, runPhase, new ActionListener<>() {
@Override
public void onResponse(PhysicalPlan physicalPlan) {
public void onResponse(Result result) {
fail("this shouldn't happen");
}

Expand All @@ -119,9 +122,10 @@ public void onFailure(Exception e) {

// fix the failing query: foo field does exist
request.query("from test | stats m = max(foo)");
planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, new ActionListener<>() {
runPhase = (p, r) -> r.onResponse(null);
planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, runPhase, new ActionListener<>() {
@Override
public void onResponse(PhysicalPlan physicalPlan) {}
public void onResponse(Result result) {}

@Override
public void onFailure(Exception e) {
Expand Down