From b5c6c2da30249c0e504a59fa3944693a050f4884 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 24 Jul 2024 17:16:37 -0400 Subject: [PATCH] ESQL: INLINESTATS (#109583) This implements `INLINESTATS`. Most of the heavy lifting is done by `LOOKUP`, with this change mostly adding a new abstraction to logical plans, and interface I'm calling `Phased`. Implementing this interface allows a logical plan node to cut the query into phases. `INLINESTATS` implements it by asking for a "first phase" that's the same query, up to `INLINESTATS`, but with `INLINESTATS` replaced with `STATS`. The next phase replaces the `INLINESTATS` with a `LOOKUP` on the results of the first phase. So, this query: ``` FROM foo | EVAL bar = a * b | INLINESTATS m = MAX(bar) BY b | WHERE m = bar | LIMIT 1 ``` gets split into ``` FROM foo | EVAL bar = a * b | STATS m = MAX(bar) BY b ``` followed by ``` FROM foo | EVAL bar = a * b | LOOKUP (results of m = MAX(bar) BY b) ON b | WHERE m = bar | LIMIT 1 ``` --- docs/changelog/109583.yaml | 29 + docs/reference/esql/esql-commands.asciidoc | 6 + .../processing-commands/inlinestats.asciidoc | 102 ++++ .../esql/processing-commands/lookup.asciidoc | 2 +- .../aggregation/table/RowInTableLookup.java | 3 + .../operator/RowInTableLookupOperator.java | 9 + .../xpack/esql/ccq/MultiClusterSpecIT.java | 1 + .../xpack/esql/qa/single_node/RestEsqlIT.java | 270 +++++++++- .../xpack/esql/qa/rest/RestEsqlTestCase.java | 12 +- .../src/main/resources/inlinestats.csv-spec | 503 ++++++++++++++++++ .../src/main/resources/union_types.csv-spec | 61 +++ .../xpack/esql/action/EsqlCapabilities.java | 5 + .../xpack/esql/analysis/Analyzer.java | 20 +- .../xpack/esql/io/stream/PlanNamedTypes.java | 2 + .../esql/optimizer/PhysicalPlanOptimizer.java | 4 +- .../xpack/esql/package-info.java | 2 + .../xpack/esql/parser/LogicalPlanBuilder.java | 4 + .../xpack/esql/plan/logical/Aggregate.java | 10 +- .../xpack/esql/plan/logical/InlineStats.java | 182 ++++++- .../xpack/esql/plan/logical/Phased.java | 135 +++++ .../xpack/esql/plan/logical/Stats.java | 39 ++ .../xpack/esql/plugin/EsqlPlugin.java | 2 + .../xpack/esql/session/EsqlSession.java | 45 +- .../xpack/esql/SerializationTestUtils.java | 2 + .../esql/io/stream/PlanNamedTypesTests.java | 2 + .../esql/parser/StatementParserTests.java | 2 + .../xpack/esql/plan/logical/PhasedTests.java | 146 +++++ .../esql/tree/EsqlNodeSubclassTests.java | 5 +- 28 files changed, 1569 insertions(+), 36 deletions(-) create mode 100644 docs/changelog/109583.yaml create mode 100644 docs/reference/esql/processing-commands/inlinestats.asciidoc create mode 100644 x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java create mode 100644 x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java diff --git a/docs/changelog/109583.yaml b/docs/changelog/109583.yaml new file mode 100644 index 0000000000000..84757e307b4fb --- /dev/null +++ b/docs/changelog/109583.yaml @@ -0,0 +1,29 @@ +pr: 109583 +summary: "ESQL: INLINESTATS" +area: ES|QL +type: feature +issues: + - 107589 +highlight: + title: "ESQL: INLINESTATS" + body: |- + This adds the `INLINESTATS` command to ESQL which performs a STATS and + then enriches the results into the output stream. So, this query: + + [source,esql] + ---- + FROM test + | INLINESTATS m=MAX(a * b) BY b + | WHERE m == a * b + | SORT a DESC, b DESC + | LIMIT 3 + ---- + + Produces output like: + + | a | b | m | + | --- | --- | ----- | + | 99 | 999 | 98901 | + | 99 | 998 | 98802 | + | 99 | 997 | 98703 | + notable: true diff --git a/docs/reference/esql/esql-commands.asciidoc b/docs/reference/esql/esql-commands.asciidoc index bed79299b1cc1..235113ac1394a 100644 --- a/docs/reference/esql/esql-commands.asciidoc +++ b/docs/reference/esql/esql-commands.asciidoc @@ -37,6 +37,9 @@ image::images/esql/processing-command.svg[A processing command changing an input * <> * <> * <> +ifeval::["{release-state}"=="unreleased"] +* experimental:[] <> +endif::[] * <> * <> ifeval::["{release-state}"=="unreleased"] @@ -59,6 +62,9 @@ include::processing-commands/drop.asciidoc[] include::processing-commands/enrich.asciidoc[] include::processing-commands/eval.asciidoc[] include::processing-commands/grok.asciidoc[] +ifeval::["{release-state}"=="unreleased"] +include::processing-commands/inlinestats.asciidoc[] +endif::[] include::processing-commands/keep.asciidoc[] include::processing-commands/limit.asciidoc[] ifeval::["{release-state}"=="unreleased"] diff --git a/docs/reference/esql/processing-commands/inlinestats.asciidoc b/docs/reference/esql/processing-commands/inlinestats.asciidoc new file mode 100644 index 0000000000000..0b8d38c7b280f --- /dev/null +++ b/docs/reference/esql/processing-commands/inlinestats.asciidoc @@ -0,0 +1,102 @@ +[discrete] +[[esql-inlinestats-by]] +=== `INLINESTATS ... BY` + +experimental::["INLINESTATS is highly experimental and only available in SNAPSHOT versions."] + +The `INLINESTATS` command calculates an aggregate result and adds new columns +with the result to the stream of input data. + +**Syntax** + +[source,esql] +---- +INLINESTATS [column1 =] expression1[, ..., [columnN =] expressionN] +[BY grouping_expression1[, ..., grouping_expressionN]] +---- + +*Parameters* + +`columnX`:: +The name by which the aggregated value is returned. If omitted, the name is +equal to the corresponding expression (`expressionX`). If multiple columns +have the same name, all but the rightmost column with this name will be ignored. + +`expressionX`:: +An expression that computes an aggregated value. If its name coincides with one +of the computed columns, that column will be ignored. + +`grouping_expressionX`:: +An expression that outputs the values to group by. + +NOTE: Individual `null` values are skipped when computing aggregations. + +*Description* + +The `INLINESTATS` command calculates an aggregate result and merges that result +back into the stream of input data. Without the optional `BY` clause this will +produce a single result which is appended to each row. With a `BY` clause this +will produce one result per grouping and merge the result into the stream based on +matching group keys. + +All of the <> are supported. + +*Examples* + +Find the employees that speak the most languages (it's a tie!): + +[source.merge.styled,esql] +---- +include::{esql-specs}/inlinestats.csv-spec[tag=max-languages] +---- +[%header.monospaced.styled,format=dsv,separator=|] +|=== +include::{esql-specs}/inlinestats.csv-spec[tag=max-languages-result] +|=== + +Find the longest tenured employee who's last name starts with each letter of the alphabet: + +[source.merge.styled,esql] +---- +include::{esql-specs}/inlinestats.csv-spec[tag=longest-tenured-by-first] +---- +[%header.monospaced.styled,format=dsv,separator=|] +|=== +include::{esql-specs}/inlinestats.csv-spec[tag=longest-tenured-by-first-result] +|=== + +Find the northern and southern most airports: + +[source.merge.styled,esql] +---- +include::{esql-specs}/inlinestats.csv-spec[tag=extreme-airports] +---- +[%header.monospaced.styled,format=dsv,separator=|] +|=== +include::{esql-specs}/inlinestats.csv-spec[tag=extreme-airports-result] +|=== + +NOTE: Our test data doesn't have many "small" airports. + +If a `BY` field is multivalued then `INLINESTATS` will put the row in *each* +bucket like <>: + +[source.merge.styled,esql] +---- +include::{esql-specs}/inlinestats.csv-spec[tag=mv-group] +---- +[%header.monospaced.styled,format=dsv,separator=|] +|=== +include::{esql-specs}/inlinestats.csv-spec[tag=mv-group-result] +|=== + +To treat each group key as its own row use <> before `INLINESTATS`: + +[source.merge.styled,esql] +---- +include::{esql-specs}/inlinestats.csv-spec[tag=mv-expand] +---- +[%header.monospaced.styled,format=dsv,separator=|] +|=== +include::{esql-specs}/inlinestats.csv-spec[tag=mv-expand-result] +|=== diff --git a/docs/reference/esql/processing-commands/lookup.asciidoc b/docs/reference/esql/processing-commands/lookup.asciidoc index 7bb3a5791deef..ca456d8e70eed 100644 --- a/docs/reference/esql/processing-commands/lookup.asciidoc +++ b/docs/reference/esql/processing-commands/lookup.asciidoc @@ -2,7 +2,7 @@ [[esql-lookup]] === `LOOKUP` -experimental::["LOOKUP is a highly experimental and only available in SNAPSHOT versions."] +experimental::["LOOKUP is highly experimental and only available in SNAPSHOT versions."] `LOOKUP` matches values from the input against a `table` provided in the request, adding the other fields from the `table` to the output. diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/table/RowInTableLookup.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/table/RowInTableLookup.java index 4fa582e761e18..8bc9bacf8ff21 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/table/RowInTableLookup.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/table/RowInTableLookup.java @@ -37,6 +37,9 @@ public abstract sealed class RowInTableLookup implements Releasable permits Empt public abstract String toString(); public static RowInTableLookup build(BlockFactory blockFactory, Block[] keys) { + if (keys.length < 1) { + throw new IllegalArgumentException("expected [keys] to be non-empty"); + } int positions = keys[0].getPositionCount(); for (int k = 0; k < keys.length; k++) { if (positions != keys[k].getPositionCount()) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/RowInTableLookupOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/RowInTableLookupOperator.java index 908c973fcad65..0decdc243c4a8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/RowInTableLookupOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/RowInTableLookupOperator.java @@ -40,6 +40,12 @@ public String toString() { * are never closed, so we need to build them from a non-tracking factory. */ public record Factory(Key[] keys, int[] blockMapping) implements Operator.OperatorFactory { + public Factory { + if (keys.length < 1) { + throw new IllegalArgumentException("expected [keys] to be non-empty"); + } + } + @Override public Operator get(DriverContext driverContext) { return new RowInTableLookupOperator(driverContext.blockFactory(), keys, blockMapping); @@ -56,6 +62,9 @@ public String describe() { private final int[] blockMapping; public RowInTableLookupOperator(BlockFactory blockFactory, Key[] keys, int[] blockMapping) { + if (keys.length < 1) { + throw new IllegalArgumentException("expected [keys] to be non-empty"); + } this.blockMapping = blockMapping; this.keys = new ArrayList<>(keys.length); Block[] blocks = new Block[keys.length]; diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 6143c2b4148f7..ece80c15e87e5 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -110,6 +110,7 @@ protected void shouldSkipTest(String testName) throws IOException { "Test " + testName + " is skipped on " + Clusters.oldVersion(), isEnabled(testName, instructions, Clusters.oldVersion()) ); + assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats")); } private TestFeatureService remoteFeaturesService() throws IOException { diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index bf54dcbfa96f6..af872715c2fea 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -10,15 +10,19 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; import org.apache.http.util.EntityUtils; +import org.apache.lucene.search.DocIdSetIterator; import org.elasticsearch.Build; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ListMatcher; +import org.elasticsearch.test.MapMatcher; import org.elasticsearch.test.TestClustersThreadFilter; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.LogType; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; import org.hamcrest.Matchers; import org.junit.Assert; @@ -27,14 +31,23 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; +import static org.elasticsearch.test.ListMatcher.matchesList; +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.Is.is; @ThreadLeakFilters(filters = TestClustersThreadFilter.class) @@ -49,7 +62,7 @@ protected String getTestRestCluster() { return cluster.getHttpAddresses(); } - @ParametersFactory + @ParametersFactory(argumentFormatting = "%1s") public static List modes() { return Arrays.stream(Mode.values()).map(m -> new Object[] { m }).toList(); } @@ -59,19 +72,7 @@ public RestEsqlIT(Mode mode) { } public void testBasicEsql() throws IOException { - StringBuilder b = new StringBuilder(); - for (int i = 0; i < 1000; i++) { - b.append(String.format(Locale.ROOT, """ - {"create":{"_index":"%s"}} - {"@timestamp":"2020-12-12","test":"value%s","value":%d} - """, testIndexName(), i, i)); - } - Request bulk = new Request("POST", "/_bulk"); - bulk.addParameter("refresh", "true"); - bulk.addParameter("filter_path", "errors"); - bulk.setJsonEntity(b.toString()); - Response response = client().performRequest(bulk); - Assert.assertEquals("{\"errors\":false}", EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + indexTestData(); RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | stats avg(value)"); if (Build.current().isSnapshot()) { @@ -273,6 +274,247 @@ public void testTableDuplicateNames() throws IOException { assertThat(re.getMessage(), containsString("[6:10] Duplicate field 'a'")); } + /** + * INLINESTATS can group on {@code NOW()}. It's a little silly, but + * doing something like {@code DATE_TRUNC(1 YEAR, NOW() - 1970-01-01T00:00:00Z)} is + * much more sensible. But just grouping on {@code NOW()} is enough to test this. + *

+ * This works because {@code NOW()} locks it's value at the start of the entire + * query. It's part of the "configuration" of the query. + *

+ */ + public void testInlineStatsNow() throws IOException { + indexTestData(); + + RequestObjectBuilder builder = requestObjectBuilder().query( + fromIndex() + " | EVAL now=NOW() | INLINESTATS AVG(value) BY now | SORT value ASC" + ); + Map result = runEsql(builder); + ListMatcher values = matchesList(); + for (int i = 0; i < 1000; i++) { + values = values.item( + matchesList().item("2020-12-12T00:00:00.000Z") + .item("value" + i) + .item("value" + i) + .item(i) + .item(any(String.class)) + .item(499.5) + ); + } + assertMap( + result, + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "@timestamp").entry("type", "date")) + .item(matchesMap().entry("name", "test").entry("type", "text")) + .item(matchesMap().entry("name", "test.keyword").entry("type", "keyword")) + .item(matchesMap().entry("name", "value").entry("type", "long")) + .item(matchesMap().entry("name", "now").entry("type", "date")) + .item(matchesMap().entry("name", "AVG(value)").entry("type", "double")) + ).entry("values", values) + ); + } + + public void testProfile() throws IOException { + indexTestData(); + + RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | STATS AVG(value)"); + builder.profile(true); + if (Build.current().isSnapshot()) { + // Lock to shard level partitioning, so we get consistent profile output + builder.pragmas(Settings.builder().put("data_partitioning", "shard").build()); + } + Map result = runEsql(builder); + assertMap( + result, + matchesMap().entry("columns", matchesList().item(matchesMap().entry("name", "AVG(value)").entry("type", "double"))) + .entry("values", List.of(List.of(499.5d))) + .entry("profile", matchesMap().entry("drivers", instanceOf(List.class))) + ); + + MapMatcher commonProfile = matchesMap().entry("iterations", greaterThan(0)) + .entry("cpu_nanos", greaterThan(0)) + .entry("took_nanos", greaterThan(0)) + .entry("operators", instanceOf(List.class)); + List> signatures = new ArrayList<>(); + @SuppressWarnings("unchecked") + List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); + for (Map p : profiles) { + assertThat(p, commonProfile); + List sig = new ArrayList<>(); + @SuppressWarnings("unchecked") + List> operators = (List>) p.get("operators"); + for (Map o : operators) { + sig.add(checkOperatorProfile(o)); + } + signatures.add(sig); + } + assertThat( + signatures, + containsInAnyOrder( + matchesList().item("LuceneSourceOperator") + .item("ValuesSourceReaderOperator") + .item("AggregationOperator") + .item("ExchangeSinkOperator"), + matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator"), + matchesList().item("ExchangeSourceOperator") + .item("AggregationOperator") + .item("ProjectOperator") + .item("LimitOperator") + .item("EvalOperator") + .item("ProjectOperator") + .item("OutputOperator") + ) + ); + } + + public void testInlineStatsProfile() throws IOException { + indexTestData(); + + RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | INLINESTATS AVG(value) | SORT value ASC"); + builder.profile(true); + if (Build.current().isSnapshot()) { + // Lock to shard level partitioning, so we get consistent profile output + builder.pragmas(Settings.builder().put("data_partitioning", "shard").build()); + } + Map result = runEsql(builder); + ListMatcher values = matchesList(); + for (int i = 0; i < 1000; i++) { + values = values.item(matchesList().item("2020-12-12T00:00:00.000Z").item("value" + i).item("value" + i).item(i).item(499.5)); + } + assertMap( + result, + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "@timestamp").entry("type", "date")) + .item(matchesMap().entry("name", "test").entry("type", "text")) + .item(matchesMap().entry("name", "test.keyword").entry("type", "keyword")) + .item(matchesMap().entry("name", "value").entry("type", "long")) + .item(matchesMap().entry("name", "AVG(value)").entry("type", "double")) + ).entry("values", values).entry("profile", matchesMap().entry("drivers", instanceOf(List.class))) + ); + + MapMatcher commonProfile = matchesMap().entry("iterations", greaterThan(0)) + .entry("cpu_nanos", greaterThan(0)) + .entry("took_nanos", greaterThan(0)) + .entry("operators", instanceOf(List.class)); + List> signatures = new ArrayList<>(); + @SuppressWarnings("unchecked") + List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); + for (Map p : profiles) { + assertThat(p, commonProfile); + List sig = new ArrayList<>(); + @SuppressWarnings("unchecked") + List> operators = (List>) p.get("operators"); + for (Map o : operators) { + sig.add(checkOperatorProfile(o)); + } + signatures.add(sig); + } + assertThat( + signatures, + containsInAnyOrder( + // First pass read and start agg + matchesList().item("LuceneSourceOperator") + .item("ValuesSourceReaderOperator") + .item("AggregationOperator") + .item("ExchangeSinkOperator"), + // First pass node level reduce + matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator"), + // First pass finish agg + matchesList().item("ExchangeSourceOperator") + .item("AggregationOperator") + .item("ProjectOperator") + .item("EvalOperator") + .item("ProjectOperator") + .item("OutputOperator"), + // Second pass read and join via eval + matchesList().item("LuceneSourceOperator") + .item("EvalOperator") + .item("ValuesSourceReaderOperator") + .item("TopNOperator") + .item("ValuesSourceReaderOperator") + .item("ProjectOperator") + .item("ExchangeSinkOperator"), + // Second pass node level reduce + matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator"), + // Second pass finish + matchesList().item("ExchangeSourceOperator").item("TopNOperator").item("OutputOperator") + ) + ); + } + + private String checkOperatorProfile(Map o) { + String name = (String) o.get("operator"); + name = name.replaceAll("\\[.+", ""); + MapMatcher status = switch (name) { + case "LuceneSourceOperator" -> matchesMap().entry("processed_slices", greaterThan(0)) + .entry("processed_shards", List.of(testIndexName() + ":0")) + .entry("total_slices", greaterThan(0)) + .entry("slice_index", 0) + .entry("slice_max", 0) + .entry("slice_min", 0) + .entry("current", DocIdSetIterator.NO_MORE_DOCS) + .entry("pages_emitted", greaterThan(0)) + .entry("processing_nanos", greaterThan(0)) + .entry("processed_queries", List.of("*:*")); + case "ValuesSourceReaderOperator" -> basicProfile().entry("readers_built", matchesMap().extraOk()); + case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0)).entry("aggregation_nanos", greaterThan(0)); + case "ExchangeSinkOperator" -> matchesMap().entry("pages_accepted", greaterThan(0)); + case "ExchangeSourceOperator" -> matchesMap().entry("pages_emitted", greaterThan(0)).entry("pages_waiting", 0); + case "ProjectOperator", "EvalOperator" -> basicProfile(); + case "LimitOperator" -> matchesMap().entry("pages_processed", greaterThan(0)) + .entry("limit", 1000) + .entry("limit_remaining", 999); + case "OutputOperator" -> null; + case "TopNOperator" -> matchesMap().entry("occupied_rows", 0) + .entry("ram_used", instanceOf(String.class)) + .entry("ram_bytes_used", greaterThan(0)); + default -> throw new AssertionError("unexpected status: " + o); + }; + MapMatcher expectedOp = matchesMap().entry("operator", startsWith(name)); + if (status != null) { + expectedOp = expectedOp.entry("status", status); + } + assertMap(o, expectedOp); + return name; + } + + private MapMatcher basicProfile() { + return matchesMap().entry("pages_processed", greaterThan(0)).entry("process_nanos", greaterThan(0)); + } + + private void indexTestData() throws IOException { + Request createIndex = new Request("PUT", testIndexName()); + createIndex.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1 + } + } + }"""); + Response response = client().performRequest(createIndex); + assertThat( + entityToMap(response.getEntity(), XContentType.JSON), + matchesMap().entry("shards_acknowledged", true).entry("index", testIndexName()).entry("acknowledged", true) + ); + + StringBuilder b = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + b.append(String.format(Locale.ROOT, """ + {"create":{"_index":"%s"}} + {"@timestamp":"2020-12-12","test":"value%s","value":%d} + """, testIndexName(), i, i)); + } + Request bulk = new Request("POST", "/_bulk"); + bulk.addParameter("refresh", "true"); + bulk.addParameter("filter_path", "errors"); + bulk.setJsonEntity(b.toString()); + response = client().performRequest(bulk); + Assert.assertEquals("{\"errors\":false}", EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + } + private void assertException(String query, String... errorMessages) throws IOException { ResponseException re = expectThrows(ResponseException.class, () -> runEsqlSync(requestObjectBuilder().query(query))); assertThat(re.getResponse().getStatusLine().getStatusCode(), equalTo(400)); diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index f3f172391f5ab..82b7459066586 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -119,6 +119,8 @@ public static class RequestObjectBuilder { private Boolean keepOnCompletion = null; + private Boolean profile = null; + public RequestObjectBuilder() throws IOException { this(randomFrom(XContentType.values())); } @@ -180,6 +182,11 @@ public RequestObjectBuilder pragmas(Settings pragmas) throws IOException { return this; } + public RequestObjectBuilder profile(boolean profile) { + this.profile = profile; + return this; + } + public RequestObjectBuilder build() throws IOException { if (isBuilt == false) { if (tables != null) { @@ -195,6 +202,9 @@ public RequestObjectBuilder build() throws IOException { } builder.endObject(); } + if (profile != null) { + builder.field("profile", profile); + } builder.endObject(); isBuilt = true; } @@ -756,7 +766,7 @@ static Map removeAsyncProperties(Map map) { return Collections.unmodifiableMap(copy); } - static Map entityToMap(HttpEntity entity, XContentType expectedContentType) throws IOException { + protected static Map entityToMap(HttpEntity entity, XContentType expectedContentType) throws IOException { try (InputStream content = entity.getContent()) { XContentType xContentType = XContentType.fromMediaType(entity.getContentType().getValue()); assertEquals(expectedContentType, xContentType); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec new file mode 100644 index 0000000000000..90d5bbd514c81 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec @@ -0,0 +1,503 @@ +maxOfInt +required_capability: inlinestats + +// tag::max-languages[] +FROM employees +| KEEP emp_no, languages +| INLINESTATS max_lang = MAX(languages) +| WHERE max_lang == languages +| SORT emp_no ASC +| LIMIT 5 +// end::max-languages[] +; + +// tag::max-languages-result[] +emp_no:integer | languages:integer | max_lang:integer + 10002 | 5 | 5 + 10004 | 5 | 5 + 10011 | 5 | 5 + 10012 | 5 | 5 + 10014 | 5 | 5 +// end::max-languages-result[] +; + +maxOfIntByKeyword +required_capability: inlinestats + +FROM employees +| KEEP emp_no, languages, gender +| INLINESTATS max_lang = MAX(languages) BY gender +| WHERE max_lang == languages +| SORT emp_no ASC +| LIMIT 5; + +emp_no:integer | languages:integer | gender:keyword | max_lang:integer + 10002 | 5 | F | 5 + 10004 | 5 | M | 5 + 10011 | 5 | null | 5 + 10012 | 5 | null | 5 + 10014 | 5 | null | 5 +; + +maxOfLongByKeyword +required_capability: inlinestats + +FROM employees +| KEEP emp_no, avg_worked_seconds, gender +| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY gender +| WHERE max_avg_worked_seconds == avg_worked_seconds +| SORT emp_no ASC; + +emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_seconds:long + 10007 | 393084805 | F | 393084805 + 10015 | 390266432 | null | 390266432 + 10030 | 394597613 | M | 394597613 +; + +maxOfLong +required_capability: inlinestats + +FROM employees +| KEEP emp_no, avg_worked_seconds, gender +| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) +| WHERE max_avg_worked_seconds == avg_worked_seconds +| SORT emp_no ASC; + +emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_seconds:long + 10030 | 394597613 | M | 394597613 +; + +// TODO allow inline calculation like BY l = SUBSTRING( +maxOfLongByCalculatedKeyword +required_capability: inlinestats + +// tag::longest-tenured-by-first[] +FROM employees +| EVAL l = SUBSTRING(last_name, 0, 1) +| KEEP emp_no, avg_worked_seconds, l +| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY l +| WHERE max_avg_worked_seconds == avg_worked_seconds +| SORT l ASC +| LIMIT 5 +// end::longest-tenured-by-first[] +; + +// tag::longest-tenured-by-first-result[] +emp_no:integer | avg_worked_seconds:long | l:keyword | max_avg_worked_seconds:long + 10065 | 372660279 | A | 372660279 + 10074 | 382397583 | B | 382397583 + 10044 | 387408356 | C | 387408356 + 10030 | 394597613 | D | 394597613 + 10087 | 305782871 | E | 305782871 +// end::longest-tenured-by-first-result[] +; + +maxOfLongByInt +required_capability: inlinestats + +FROM employees +| KEEP emp_no, avg_worked_seconds, languages +| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY languages +| WHERE max_avg_worked_seconds == avg_worked_seconds +| SORT languages ASC; + +emp_no:integer | avg_worked_seconds:long | languages:integer | max_avg_worked_seconds:long + 10044 | 387408356 | 1 | 387408356 + 10099 | 377713748 | 2 | 377713748 + 10030 | 394597613 | 3 | 394597613 + 10007 | 393084805 | 4 | 393084805 + 10015 | 390266432 | 5 | 390266432 + 10027 | 374037782 | null | 374037782 +; + +maxOfLongByIntDouble +required_capability: inlinestats + +FROM employees +| KEEP emp_no, avg_worked_seconds, languages, height +| EVAL height=ROUND(height, 1) +| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY languages, height +| WHERE max_avg_worked_seconds == avg_worked_seconds +| SORT languages, height ASC +| LIMIT 4; + +emp_no:integer | avg_worked_seconds:long | languages:integer | height:double | max_avg_worked_seconds:long + 10083 | 331236443 | 1 | 1.4 | 331236443 + 10084 | 359067056 | 1 | 1.5 | 359067056 + 10033 | 208374744 | 1 | 1.6 | 208374744 + 10086 | 328580163 | 1 | 1.7 | 328580163 +; + + +two +required_capability: inlinestats + +FROM employees +| KEEP emp_no, languages, avg_worked_seconds, gender +| INLINESTATS avg_avg_worked_seconds = AVG(avg_worked_seconds) BY languages +| WHERE avg_worked_seconds > avg_avg_worked_seconds +| INLINESTATS max_languages = MAX(languages) BY gender +| SORT emp_no ASC +| LIMIT 3; + +emp_no:integer | languages:integer | avg_worked_seconds:long | gender:keyword | avg_avg_worked_seconds:double | max_languages:integer + 10002 | 5 | 328922887 | F | 3.133013149047619E8 | 5 + 10006 | 3 | 372957040 | F | 2.978159518235294E8 | 5 + 10007 | 4 | 393084805 | F | 2.863684210555556E8 | 5 +; + +byMultivaluedSimple +required_capability: inlinestats + +// tag::mv-group[] +FROM airports +| INLINESTATS min_scalerank=MIN(scalerank) BY type +| EVAL type=MV_SORT(type), min_scalerank=MV_SORT(min_scalerank) +| KEEP abbrev, type, scalerank, min_scalerank +| WHERE abbrev == "GWL" +// end::mv-group[] +; + +// tag::mv-group-result[] +abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer + GWL | [mid, military] | 9 | [2, 4] +// end::mv-group-result[] +; + +byMultivaluedMvExpand +required_capability: inlinestats + +// tag::mv-expand[] +FROM airports +| KEEP abbrev, type, scalerank +| MV_EXPAND type +| INLINESTATS min_scalerank=MIN(scalerank) BY type +| SORT min_scalerank ASC +| WHERE abbrev == "GWL" +// end::mv-expand[] +; + +// tag::mv-expand-result[] +abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer + GWL | mid | 9 | 2 + GWL | military | 9 | 4 +// end::mv-expand-result[] +; + +byMvExpand +required_capability: inlinestats + +// tag::extreme-airports[] +FROM airports +| MV_EXPAND type +| EVAL lat = ST_Y(location) +| INLINESTATS most_northern=MAX(lat), most_southern=MIN(lat) BY type +| WHERE lat == most_northern OR lat == most_southern +| SORT lat DESC +| KEEP type, name, location +// end::extreme-airports[] +; + +// tag::extreme-airports-result[] + type:keyword | name:text | location:geo_point + mid | Svalbard Longyear | POINT (15.495229 78.246717) + major | Tromsø Langnes | POINT (18.9072624292132 69.6796790473478) + military | Severomorsk-3 (Murmansk N.E.) | POINT (33.2903527616285 69.0168711826804) + spaceport | Baikonur Cosmodrome | POINT (63.307354423875 45.9635739403124) + small | Dhamial | POINT (73.0320498392002 33.5614146278861) + small | Sahnewal | POINT (75.9570722403652 30.8503598561702) + spaceport | Centre Spatial Guyanais | POINT (-52.7684296893452 5.23941001258035) + military | Santos Air Force Base | POINT (-46.3052704931003 -23.9237590410637) + major | Christchurch Int'l | POINT (172.538675565223 -43.4885486784104) + mid | Hermes Quijada Int'l | POINT (-67.7530268462675 -53.7814746058316) +// end::extreme-airports-result[] +; + +brokenwhy-Ignore +required_capability: inlinestats + +FROM airports +| INLINESTATS min_scalerank=MIN(scalerank) BY type +| MV_EXPAND type +| WHERE scalerank == MV_MIN(scalerank); + +abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer + GWL | [mid, military] | 9 | [2, 4] +; + +afterStats +required_capability: inlinestats + +FROM airports +| STATS count=COUNT(*) BY country +| INLINESTATS avg=AVG(count) +| WHERE count > avg * 3 +| SORT count DESC, country ASC +; + +count:long | country:keyword | avg:double + 129 | United States | 4.455 + 50 | India | 4.455 + 45 | Mexico | 4.455 + 41 | China | 4.455 + 37 | Canada | 4.455 + 31 | Brazil | 4.455 + 26 | Russia | 4.455 + 19 | null | 4.455 + 17 | Australia | 4.455 + 17 | United Kingdom | 4.455 +; + +afterWhere +required_capability: inlinestats + +FROM airports +| WHERE country != "United States" +| INLINESTATS count=COUNT(*) BY country +| SORT count DESC, abbrev ASC +| KEEP abbrev, country, count +| LIMIT 4 +; + +abbrev:keyword | country:keyword | count:long + AGR | India | 50 + AMD | India | 50 + BBI | India | 50 + BDQ | India | 50 +; + +afterLookup +required_capability: inlinestats + +FROM airports +| RENAME scalerank AS int +| LOOKUP int_number_names ON int +| RENAME name as scalerank +| DROP int +| INLINESTATS count=COUNT(*) BY scalerank +| SORT abbrev ASC +| KEEP abbrev, scalerank +| LIMIT 4 +; + +abbrev:keyword | scalerank:keyword + ABJ | four + ABQ | six + ABV | five + ACA | four +; + +afterEnrich +required_capability: inlinestats +required_capability: enrich_load + +FROM airports +| KEEP abbrev, city +| WHERE abbrev NOT IN ("ADJ", "ATQ") // Skip airports in regions with right-to-left text which the test file isn't good with +| ENRICH city_names ON city WITH region +| WHERE MV_COUNT(region) == 1 +| INLINESTATS COUNT(*) BY region +| SORT abbrev ASC +| WHERE `COUNT(*)` > 1 +| LIMIT 3 +; + +abbrev:keyword | city:keyword | region:text | "COUNT(*)":long + ALA | Almaty | Жетісу ауданы | 2 + BXJ | Almaty | Жетісу ауданы | 2 + FUK | Fukuoka | 中央区 | 2 +; + +beforeStats +required_capability: inlinestats + +FROM airports +| EVAL lat = ST_Y(location) +| INLINESTATS avg_lat=AVG(lat) +| STATS northern=COUNT(lat > avg_lat OR NULL), southern=COUNT(lat < avg_lat OR NULL) +; + +northern:long | southern:long + 520 | 371 +; + +beforeKeepSort +required_capability: inlinestats + +FROM employees +| INLINESTATS max_salary = MAX(salary) by languages +| KEEP emp_no, languages, max_salary +| SORT emp_no ASC +| LIMIT 3; + +emp_no:integer | languages:integer | max_salary:integer + 10001 | 2 | 73578 + 10002 | 5 | 66817 + 10003 | 4 | 74572 +; + +beforeKeep +required_capability: inlinestats + +FROM employees +| INLINESTATS max_salary = MAX(salary) by languages +| KEEP emp_no, languages, max_salary +| LIMIT 3; + +ignoreOrder:true +emp_no:integer | languages:integer | max_salary:integer + 10001 | 2 | 73578 + 10002 | 5 | 66817 + 10003 | 4 | 74572 +; + +beforeEnrich +required_capability: inlinestats +required_capability: enrich_load + +FROM airports +| KEEP abbrev, type, city +| INLINESTATS COUNT(*) BY type +| ENRICH city_names ON city WITH region +| WHERE MV_COUNT(region) == 1 +| SORT abbrev ASC +| LIMIT 3 +; + +abbrev:keyword | type:keyword | city:keyword | "COUNT(*)":long | region:text + ABJ | mid | Abidjan | 499 | Abidjan + ABV | major | Abuja | 385 | Municipal Area Council + ACA | major | Acapulco de Juárez | 385 | Acapulco de Juárez +; + +beforeAndAfterEnrich +required_capability: inlinestats +required_capability: enrich_load + +FROM airports +| KEEP abbrev, type, city +| INLINESTATS COUNT(*) BY type +| ENRICH city_names ON city WITH region +| WHERE MV_COUNT(region) == 1 +| INLINESTATS count_region=COUNT(*) BY region +| SORT abbrev ASC +| LIMIT 3 +; + +abbrev:keyword | type:keyword | city:keyword | "COUNT(*)":long | region:text | count_region:long + ABJ | mid | Abidjan | 499 | Abidjan | 1 + ABV | major | Abuja | 385 | Municipal Area Council | 1 + ACA | major | Acapulco de Juárez | 385 | Acapulco de Juárez | 1 +; + + +shadowing +required_capability: inlinestats + +ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right" +| INLINESTATS env=VALUES(right) BY client_ip +; + +left:keyword | client_ip:keyword | right:keyword | env:keyword +left | 172.21.0.5 | right | right +; + +shadowingMulti +required_capability: inlinestats + +ROW left = "left", airport = "Zurich Airport ZRH", city = "Zürich", middle = "middle", region = "North-East Switzerland", right = "right" +| INLINESTATS airport=VALUES(left), region=VALUES(left), city_boundary=VALUES(left) BY city +; + +left:keyword | city:keyword | middle:keyword | right:keyword | airport:keyword | region:keyword | city_boundary:keyword +left | Zürich | middle | right | left | left | left +; + +shadowingSelf +required_capability: inlinestats + +ROW city="Raleigh" +| INLINESTATS city=COUNT(city) +; + +city:long +1 +; + +shadowingSelfBySelf-Ignore +required_capability: inlinestats + +ROW city="Raleigh" +| INLINESTATS city=COUNT(city) BY city +; + +city:long +1 +; + +shadowingInternal-Ignore +required_capability: inlinestats + +ROW city = "Zürich" +| INLINESTATS x=VALUES(city), x=VALUES(city) +; + +city:keyword | x:keyword +Zürich | Zürich +; + +byConstant-Ignore +required_capability: inlinestats + +FROM employees +| KEEP emp_no, languages +| INLINESTATS max_lang = MAX(languages) BY y=1 +| WHERE max_lang == languages +| SORT emp_no ASC +| LIMIT 5 +; + +emp_no:integer | languages:integer | max_lang:integer | y:integer + 10002 | 5 | 5 | 1 + 10004 | 5 | 5 | 1 + 10011 | 5 | 5 | 1 + 10012 | 5 | 5 | 1 + 10014 | 5 | 5 | 1 +; + +aggConstant +required_capability: inlinestats + +FROM employees +| KEEP emp_no +| INLINESTATS one = MAX(1) BY emp_no +| SORT emp_no ASC +| LIMIT 5 +; + +emp_no:integer | one:integer + 10001 | 1 + 10002 | 1 + 10003 | 1 + 10004 | 1 + 10005 | 1 +; + +percentile +required_capability: inlinestats + +FROM employees +| KEEP emp_no, salary +| INLINESTATS ninety_fifth_salary = PERCENTILE(salary, 95) +| WHERE salary > ninety_fifth_salary +| SORT emp_no ASC +| LIMIT 5 +; + +emp_no:integer | salary:integer | ninety_fifth_salary:double + 10007 | 74572 | 73584.95 + 10019 | 73717 | 73584.95 + 10027 | 73851 | 73584.95 + 10029 | 74999 | 73584.95 + 10045 | 74970 | 73584.95 +; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec index eaf27dca83b3e..e1aa411702420 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/union_types.csv-spec @@ -335,6 +335,27 @@ count:long | client_ip:ip 2 | 172.21.2.162 ; +statsUnionAggInline +required_capability: union_types +required_capability: inlinestats + +FROM sample_data, sample_data_str +| STATS + count = COUNT(CIDR_MATCH(TO_IP(client_ip), "172.21.0.0/24") OR NULL) + BY + @timestamp = DATE_TRUNC(10 minutes, @timestamp) +| SORT count DESC, @timestamp ASC +| LIMIT 4 +; + +count:long | @timestamp:date + 2 | 2023-10-23T13:30:00.000Z + 0 | 2023-10-23T12:10:00.000Z + 0 | 2023-10-23T12:20:00.000Z + 0 | 2023-10-23T13:50:00.000Z +; + + multiIndexIpStringStatsInline2 required_capability: union_types required_capability: union_types_agg_cast @@ -954,3 +975,43 @@ event_duration:long | _index:keyword | ts:date | ts_str:k 8268153 | sample_data_str | 2023-10-23T13:52:55.015Z | 2023-10-23T13:52:55.015Z | 1698069175015 | 172.21.3.15 | 172.21.3.15 8268153 | sample_data_ts_long | 2023-10-23T13:52:55.015Z | 1698069175015 | 1698069175015 | 172.21.3.15 | 172.21.3.15 ; + + +inlineStatsUnionGroup +required_capability: union_types +required_capability: inlinestats + +FROM sample_data, sample_data_ts_long +| EVAL @timestamp = SUBSTRING(TO_STRING(@timestamp), 0, 7) +| INLINESTATS count = COUNT(*) BY @timestamp +| SORT client_ip ASC, @timestamp ASC +| LIMIT 4 +; + +client_ip:ip | event_duration:long | message:keyword | @timestamp:keyword | count:long + 172.21.0.5 | 1232382 | Disconnected | 1698068 | 1 + 172.21.0.5 | 1232382 | Disconnected | 2023-10 | 7 +172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 1698064 | 1 +172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 2023-10 | 7 + +; + +inlineStatsUnionGroupTogether +required_capability: union_types +required_capability: inlinestats + +FROM sample_data, sample_data_ts_long +| EVAL @timestamp = TO_STRING(TO_DATETIME(@timestamp)) +| INLINESTATS count = COUNT(*) BY @timestamp +| SORT client_ip ASC, @timestamp ASC +| LIMIT 4 +; + +client_ip:ip | event_duration:long | message:keyword | @timestamp:keyword | count:long + 172.21.0.5 | 1232382 | Disconnected | 2023-10-23T13:33:34.937Z | 2 + 172.21.0.5 | 1232382 | Disconnected | 2023-10-23T13:33:34.937Z | 2 +172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 2023-10-23T12:27:28.948Z | 2 +172.21.2.113 | 2764889 | Connected to 10.1.0.2 | 2023-10-23T12:27:28.948Z | 2 +; + +# Once INLINESTATS supports expressions in agg functions and groups, convert the group in the inlinestats diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 9a985bef3af5f..e6142e8161d44 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -44,6 +44,11 @@ public enum Cap { */ FN_SUBSTRING_EMPTY_NULL, + /** + * Support for the {@code INLINESTATS} syntax. + */ + INLINESTATS(true), + /** * Support for aggregation function {@code TOP}. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 200341bc13e8a..75e494fe9671e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -75,6 +75,7 @@ import org.elasticsearch.xpack.esql.plan.logical.MvExpand; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.Rename; +import org.elasticsearch.xpack.esql.plan.logical.Stats; import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation; import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; @@ -399,8 +400,8 @@ protected LogicalPlan doRule(LogicalPlan plan) { childrenOutput.addAll(output); } - if (plan instanceof Aggregate agg) { - return resolveAggregate(agg, childrenOutput); + if (plan instanceof Stats stats) { + return resolveStats(stats, childrenOutput); } if (plan instanceof Drop d) { @@ -434,11 +435,11 @@ protected LogicalPlan doRule(LogicalPlan plan) { return plan.transformExpressionsOnly(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput)); } - private LogicalPlan resolveAggregate(Aggregate a, List childrenOutput) { + private LogicalPlan resolveStats(Stats stats, List childrenOutput) { // if the grouping is resolved but the aggs are not, use the former to resolve the latter // e.g. STATS a ... GROUP BY a = x + 1 Holder changed = new Holder<>(false); - List groupings = a.groupings(); + List groupings = stats.groupings(); // first resolve groupings since the aggs might refer to them // trying to globally resolve unresolved attributes will lead to some being marked as unresolvable if (Resolvables.resolved(groupings) == false) { @@ -452,12 +453,12 @@ private LogicalPlan resolveAggregate(Aggregate a, List childrenOutput } groupings = newGroupings; if (changed.get()) { - a = new Aggregate(a.source(), a.child(), a.aggregateType(), newGroupings, a.aggregates()); + stats = stats.with(newGroupings, stats.aggregates()); changed.set(false); } } - if (a.expressionsResolved() == false) { + if (stats.expressionsResolved() == false) { AttributeMap resolved = new AttributeMap<>(); for (Expression e : groupings) { Attribute attr = Expressions.attribute(e); @@ -468,7 +469,7 @@ private LogicalPlan resolveAggregate(Aggregate a, List childrenOutput List resolvedList = NamedExpressions.mergeOutputAttributes(new ArrayList<>(resolved.keySet()), childrenOutput); List newAggregates = new ArrayList<>(); - for (NamedExpression aggregate : a.aggregates()) { + for (NamedExpression aggregate : stats.aggregates()) { var agg = (NamedExpression) aggregate.transformUp(UnresolvedAttribute.class, ua -> { Expression ne = ua; Attribute maybeResolved = maybeResolveAttribute(ua, resolvedList); @@ -481,10 +482,10 @@ private LogicalPlan resolveAggregate(Aggregate a, List childrenOutput newAggregates.add(agg); } - a = changed.get() ? new Aggregate(a.source(), a.child(), a.aggregateType(), groupings, newAggregates) : a; + stats = changed.get() ? stats.with(groupings, newAggregates) : stats; } - return a; + return (LogicalPlan) stats; } private LogicalPlan resolveMvExpand(MvExpand p, List childrenOutput) { @@ -1122,6 +1123,7 @@ private LogicalPlan doRule(LogicalPlan plan) { // In ResolveRefs the aggregates are resolved from the groupings, which might have an unresolved MultiTypeEsField. // Now that we have resolved those, we need to re-resolve the aggregates. if (plan instanceof Aggregate agg) { + // TODO once inlinestats supports expressions in groups we'll likely need the same sort of extraction here // If the union-types resolution occurred in a child of the aggregate, we need to check the groupings plan = agg.transformExpressionsOnly(FieldAttribute.class, UnionTypesCleanup::checkUnresolved); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java index 76c6e1b9a6f1a..218a478b8425c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java @@ -36,6 +36,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Grok; +import org.elasticsearch.xpack.esql.plan.logical.InlineStats; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.Lookup; @@ -147,6 +148,7 @@ public static List namedTypeEntries() { of(LogicalPlan.class, EsqlProject.class, PlanNamedTypes::writeEsqlProject, PlanNamedTypes::readEsqlProject), of(LogicalPlan.class, Filter.class, PlanNamedTypes::writeFilter, PlanNamedTypes::readFilter), of(LogicalPlan.class, Grok.class, PlanNamedTypes::writeGrok, PlanNamedTypes::readGrok), + of(LogicalPlan.class, InlineStats.class, (PlanStreamOutput out, InlineStats v) -> v.writeTo(out), InlineStats::new), of(LogicalPlan.class, Join.class, (out, p) -> p.writeTo(out), Join::new), of(LogicalPlan.class, Limit.class, PlanNamedTypes::writeLimit, PlanNamedTypes::readLimit), of(LogicalPlan.class, LocalRelation.class, (out, p) -> p.writeTo(out), LocalRelation::new), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java index 75b05b7e5631c..4237852551e8a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java @@ -122,7 +122,9 @@ public PhysicalPlan apply(PhysicalPlan plan) { if (p instanceof HashJoinExec join) { attributes.removeAll(join.addedFields()); for (Attribute rhs : join.rightFields()) { - attributes.remove(rhs); + if (join.leftFields().stream().anyMatch(x -> x.semanticEquals(rhs)) == false) { + attributes.remove(rhs); + } } } if (p instanceof EnrichExec ee) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/package-info.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/package-info.java index b40531ea9359c..17d317bedbb6f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/package-info.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/package-info.java @@ -121,6 +121,8 @@ * function implementations. *
  • {@link org.elasticsearch.xpack.esql.action.RestEsqlQueryAction Sync} and * {@link org.elasticsearch.xpack.esql.action.RestEsqlAsyncQueryAction async} HTTP API entry points
  • + *
  • {@link org.elasticsearch.xpack.esql.plan.logical.Phased} - Marks a {@link org.elasticsearch.xpack.esql.plan.logical.LogicalPlan} + * node as requiring multiple ESQL executions to run.
  • * * *

    Query Planner

    diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index a0d88d152b7a2..6caab4abe8b38 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -56,6 +56,7 @@ import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation; import org.elasticsearch.xpack.esql.plan.logical.meta.MetaFunctions; import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import java.util.ArrayList; import java.util.Arrays; @@ -331,6 +332,9 @@ private void fail(Expression exp, String message, Object... args) { @Override public PlanFactory visitInlinestatsCommand(EsqlBaseParser.InlinestatsCommandContext ctx) { + if (false == EsqlPlugin.INLINESTATS_FEATURE_FLAG.isEnabled()) { + throw new ParsingException(source(ctx), "INLINESTATS command currently requires a snapshot build"); + } List aggregates = new ArrayList<>(visitFields(ctx.stats)); List groupings = visitGrouping(ctx.grouping); aggregates.addAll(groupings); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java index 78d77baa57aac..f3471e5ce8a0c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java @@ -26,9 +26,7 @@ import static java.util.Collections.emptyList; import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; -public class Aggregate extends UnaryPlan { - private List lazyOutput; - +public class Aggregate extends UnaryPlan implements Stats { public enum AggregateType { STANDARD, // include metrics aggregates such as rates @@ -54,6 +52,7 @@ static AggregateType readType(StreamInput in) throws IOException { private final AggregateType aggregateType; private final List groupings; private final List aggregates; + private List lazyOutput; public Aggregate( Source source, @@ -96,6 +95,11 @@ public Aggregate replaceChild(LogicalPlan newChild) { return new Aggregate(source(), newChild, aggregateType, groupings, aggregates); } + @Override + public Aggregate with(List newGroupings, List newAggregates) { + return new Aggregate(source(), child(), aggregateType(), newGroupings, newAggregates); + } + public AggregateType aggregateType() { return aggregateType; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java index 46ec56223384c..1c0d942537f8e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java @@ -7,21 +7,58 @@ package org.elasticsearch.xpack.esql.plan.logical; +import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockUtils; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasables; import org.elasticsearch.xpack.esql.core.capabilities.Resolvables; +import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.Expression; -import org.elasticsearch.xpack.esql.core.expression.Expressions; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput; +import org.elasticsearch.xpack.esql.plan.logical.join.Join; +import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig; +import org.elasticsearch.xpack.esql.plan.logical.join.JoinType; +import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; +import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; +import org.elasticsearch.xpack.esql.planner.PlannerUtils; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Objects; -public class InlineStats extends UnaryPlan { +import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; + +/** + * Enriches the stream of data with the results of running a {@link Aggregate STATS}. + *

    + * This is a {@link Phased} operation that doesn't have a "native" implementation. + * Instead, it's implemented as first running a {@link Aggregate STATS} and then + * a {@link Join}. + *

    + */ +public class InlineStats extends UnaryPlan implements NamedWriteable, Phased, Stats { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + InlineStats.class, + "InlineStats", + InlineStats::new + ); private final List groupings; private final List aggregates; + private List lazyOutput; public InlineStats(Source source, LogicalPlan child, List groupings, List aggregates) { super(source, child); @@ -29,6 +66,28 @@ public InlineStats(Source source, LogicalPlan child, List groupings, this.aggregates = aggregates; } + public InlineStats(StreamInput in) throws IOException { + this( + Source.readFrom((PlanStreamInput) in), + ((PlanStreamInput) in).readLogicalPlanNode(), + in.readNamedWriteableCollectionAsList(Expression.class), + in.readNamedWriteableCollectionAsList(NamedExpression.class) + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + source().writeTo(out); + ((PlanStreamOutput) out).writeLogicalPlanNode(child()); + out.writeNamedWriteableCollection(groupings); + out.writeNamedWriteableCollection(aggregates); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + @Override protected NodeInfo info() { return NodeInfo.create(this, InlineStats::new, child(), groupings, aggregates); @@ -39,10 +98,17 @@ public InlineStats replaceChild(LogicalPlan newChild) { return new InlineStats(source(), newChild, groupings, aggregates); } + @Override + public InlineStats with(List newGroupings, List newAggregates) { + return new InlineStats(source(), child(), newGroupings, newAggregates); + } + + @Override public List groupings() { return groupings; } + @Override public List aggregates() { return aggregates; } @@ -54,7 +120,19 @@ public boolean expressionsResolved() { @Override public List output() { - return Expressions.asAttributes(aggregates); + if (this.lazyOutput == null) { + List addedFields = new ArrayList<>(); + AttributeSet childOutput = child().outputSet(); + + for (NamedExpression agg : aggregates) { + if (childOutput.contains(agg) == false) { + addedFields.add(agg); + } + } + + this.lazyOutput = mergeOutputAttributes(addedFields, child().output()); + } + return lazyOutput; } @Override @@ -77,4 +155,102 @@ public boolean equals(Object obj) { && Objects.equals(aggregates, other.aggregates) && Objects.equals(child(), other.child()); } + + @Override + public LogicalPlan firstPhase() { + return new Aggregate(source(), child(), Aggregate.AggregateType.STANDARD, groupings, aggregates); + } + + @Override + public LogicalPlan nextPhase(List schema, List firstPhaseResult) { + if (equalsAndSemanticEquals(firstPhase().output(), schema) == false) { + throw new IllegalStateException("Unexpected first phase outputs: " + firstPhase().output() + " vs " + schema); + } + if (groupings.isEmpty()) { + return ungroupedNextPhase(schema, firstPhaseResult); + } + return groupedNextPhase(schema, firstPhaseResult); + } + + private LogicalPlan ungroupedNextPhase(List schema, List firstPhaseResult) { + if (firstPhaseResult.size() != 1) { + throw new IllegalArgumentException("expected single row"); + } + Page p = firstPhaseResult.get(0); + if (p.getPositionCount() != 1) { + throw new IllegalArgumentException("expected single row"); + } + List values = new ArrayList<>(schema.size()); + for (int i = 0; i < schema.size(); i++) { + Attribute s = schema.get(i); + Object value = BlockUtils.toJavaObject(p.getBlock(i), 0); + values.add(new Alias(source(), s.name(), null, new Literal(source(), value, s.dataType()), aggregates.get(i).id())); + } + return new Eval(source(), child(), values); + } + + private static boolean equalsAndSemanticEquals(List left, List right) { + if (left.equals(right) == false) { + return false; + } + for (int i = 0; i < left.size(); i++) { + if (left.get(i).semanticEquals(right.get(i)) == false) { + return false; + } + } + return true; + } + + private LogicalPlan groupedNextPhase(List schema, List firstPhaseResult) { + LocalRelation local = firstPhaseResultsToLocalRelation(schema, firstPhaseResult); + List groupingAttributes = new ArrayList<>(groupings.size()); + for (Expression g : groupings) { + if (g instanceof Attribute a) { + groupingAttributes.add(a); + } else { + throw new UnsupportedOperationException("INLINESTATS doesn't support expressions in grouping position yet"); + } + } + List leftFields = new ArrayList<>(groupingAttributes.size()); + List rightFields = new ArrayList<>(groupingAttributes.size()); + List rhsOutput = Join.makeReference(local.output()); + for (Attribute lhs : groupingAttributes) { + for (Attribute rhs : rhsOutput) { + if (lhs.name().equals(rhs.name())) { + leftFields.add(lhs); + rightFields.add(rhs); + break; + } + } + } + JoinConfig config = new JoinConfig(JoinType.LEFT, groupingAttributes, leftFields, rightFields); + return new Join(source(), child(), local, config); + } + + private LocalRelation firstPhaseResultsToLocalRelation(List schema, List firstPhaseResult) { + // Limit ourselves to 1mb of results similar to LOOKUP for now. + long bytesUsed = firstPhaseResult.stream().mapToLong(Page::ramBytesUsedByBlocks).sum(); + if (bytesUsed > ByteSizeValue.ofMb(1).getBytes()) { + throw new IllegalArgumentException("first phase result too large [" + ByteSizeValue.ofBytes(bytesUsed) + "] > 1mb"); + } + int positionCount = firstPhaseResult.stream().mapToInt(Page::getPositionCount).sum(); + Block.Builder[] builders = new Block.Builder[schema.size()]; + Block[] blocks; + try { + for (int b = 0; b < builders.length; b++) { + builders[b] = PlannerUtils.toElementType(schema.get(b).dataType()) + .newBlockBuilder(positionCount, PlannerUtils.NON_BREAKING_BLOCK_FACTORY); + } + for (Page p : firstPhaseResult) { + for (int b = 0; b < builders.length; b++) { + builders[b].copyFrom(p.getBlock(b), 0, p.getPositionCount()); + } + } + blocks = Block.Builder.buildAll(builders); + } finally { + Releasables.closeExpectNoException(builders); + } + return new LocalRelation(source(), schema, LocalSupplier.of(blocks)); + } + } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java new file mode 100644 index 0000000000000..ba0f97cdfa30b --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Phased.java @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical; + +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.xpack.esql.analysis.Analyzer; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.util.Holder; + +import java.util.List; + +/** + * Marks a {@link LogicalPlan} node as requiring multiple ESQL executions to run. + * All logical plans are now run by: + *
      + *
    1. {@link Analyzer analyzing} the entire query
    2. + *
    3. {@link Phased#extractFirstPhase extracting} the first phase from the + * logical plan
    4. + *
    5. if there isn't a first phase, run the entire logical plan and return the + * results. you are done.
    6. + *
    7. if there is first phase, run that
    8. + *
    9. {@link Phased#applyResultsFromFirstPhase applying} the results from the + * first phase into the logical plan
    10. + *
    11. start over from step 2 using the new logical plan
    12. + *
    + *

    For example, {@code INLINESTATS} is written like this:

    + *
    {@code
    + * FROM foo
    + * | EVAL bar = a * b
    + * | INLINESTATS m = MAX(bar) BY b
    + * | WHERE m = bar
    + * | LIMIT 1
    + * }
    + *

    And it's split into:

    + *
    {@code
    + * FROM foo
    + * | EVAL bar = a * b
    + * | STATS m = MAX(bar) BY b
    + * }
    + *

    and

    + *
    {@code
    + * FROM foo
    + * | EVAL bar = a * b
    + * | LOOKUP (results of m = MAX(bar) BY b) ON b
    + * | WHERE m = bar
    + * | LIMIT 1
    + * }
    + *

    If there are multiple {@linkplain Phased} nodes in the plan we always + * operate on the lowest one first, counting from the data source "upwards". + * Generally that'll read left to right in the query. So:

    + *
    {@code
    + * FROM foo | INLINESTATS | INLINESTATS
    + * }
    + * becomes + *
    {@code
    + * FROM foo | STATS
    + * }
    + * and + *
    {@code
    + * FROM foo | HASHJOIN | INLINESTATS
    + * }
    + * which is further broken into + *
    {@code
    + * FROM foo | HASHJOIN | STATS
    + * }
    + * and finally: + *
    {@code
    + * FROM foo | HASHJOIN | HASHJOIN
    + * }
    + */ +public interface Phased { + /** + * Return a {@link LogicalPlan} for the first "phase" of this operation. + * The result of this phase will be provided to {@link #nextPhase}. + */ + LogicalPlan firstPhase(); + + /** + * Use the results of plan provided from {@link #firstPhase} to produce the + * next phase of the query. + */ + LogicalPlan nextPhase(List schema, List firstPhaseResult); + + /** + * Find the first {@link Phased} operation and return it's {@link #firstPhase}. + * Or {@code null} if there aren't any {@linkplain Phased} operations. + */ + static LogicalPlan extractFirstPhase(LogicalPlan plan) { + if (false == plan.analyzed()) { + throw new IllegalArgumentException("plan must be analyzed"); + } + var firstPhase = new Holder(); + plan.forEachUp(t -> { + if (firstPhase.get() == null && t instanceof Phased phased) { + firstPhase.set(phased.firstPhase()); + } + }); + LogicalPlan firstPhasePlan = firstPhase.get(); + if (firstPhasePlan != null) { + firstPhasePlan.setAnalyzed(); + } + return firstPhasePlan; + } + + /** + * Merge the results of {@link #extractFirstPhase} into a {@link LogicalPlan} + * and produce a new {@linkplain LogicalPlan} that will execute the rest of the + * query. This plan may contain another + * {@link #firstPhase}. If it does then it will also need to be + * {@link #extractFirstPhase extracted} and the results will need to be applied + * again by calling this method. Eventually this will produce a plan which + * does not have a {@link #firstPhase} and that is the "final" + * phase of the plan. + */ + static LogicalPlan applyResultsFromFirstPhase(LogicalPlan plan, List schema, List result) { + if (false == plan.analyzed()) { + throw new IllegalArgumentException("plan must be analyzed"); + } + Holder seen = new Holder<>(false); + LogicalPlan applied = plan.transformUp(logicalPlan -> { + if (seen.get() == false && logicalPlan instanceof Phased phased) { + seen.set(true); + return phased.nextPhase(schema, result); + } + return logicalPlan; + }); + applied.setAnalyzed(); + return applied; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java new file mode 100644 index 0000000000000..1dde8e9e95990 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Stats.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical; + +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.NamedExpression; + +import java.util.List; + +/** + * STATS-like operations. Like {@link Aggregate} and {@link InlineStats}. + */ +public interface Stats { + /** + * Rebuild this plan with new groupings and new aggregates. + */ + Stats with(List newGroupings, List newAggregates); + + /** + * Have all the expressions in this plan been resolved? + */ + boolean expressionsResolved(); + + /** + * List containing both the aggregate expressions and grouping expressions. + */ + List aggregates(); + + /** + * List containing just the grouping expressions. + */ + List groupings(); + +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 0d9f3cd460af5..9809c361ea426 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.FeatureFlag; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; @@ -79,6 +80,7 @@ import java.util.function.Supplier; public class EsqlPlugin extends Plugin implements ActionPlugin { + public static final FeatureFlag INLINESTATS_FEATURE_FLAG = new FeatureFlag("esql_inlinestats"); public static final String ESQL_WORKER_THREAD_POOL_NAME = "esql_worker"; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 8c831cc260e03..ff234c4a91572 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -10,7 +10,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.fieldcaps.FieldCapabilities; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.core.Releasables; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; @@ -46,6 +49,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.Keep; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.Phased; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.RegexExtract; import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; @@ -125,14 +129,51 @@ public void execute( ); } + /** + * Execute an analyzed plan. Most code should prefer calling {@link #execute} but + * this is public for testing. See {@link Phased} for the sequence of operations. + */ public void executeAnalyzedPlan( EsqlQueryRequest request, BiConsumer> runPhase, LogicalPlan analyzedPlan, ActionListener listener ) { - // TODO phased execution lands here. - runPhase.accept(logicalPlanToPhysicalPlan(analyzedPlan, request), listener); + LogicalPlan firstPhase = Phased.extractFirstPhase(analyzedPlan); + if (firstPhase == null) { + runPhase.accept(logicalPlanToPhysicalPlan(analyzedPlan, request), listener); + } else { + executePhased(new ArrayList<>(), analyzedPlan, request, firstPhase, runPhase, listener); + } + } + + private void executePhased( + List profileAccumulator, + LogicalPlan mainPlan, + EsqlQueryRequest request, + LogicalPlan firstPhase, + BiConsumer> runPhase, + ActionListener listener + ) { + PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(firstPhase, request); + runPhase.accept(physicalPlan, listener.delegateFailureAndWrap((next, result) -> { + try { + profileAccumulator.addAll(result.profiles()); + LogicalPlan newMainPlan = Phased.applyResultsFromFirstPhase(mainPlan, physicalPlan.output(), result.pages()); + LogicalPlan newFirstPhase = Phased.extractFirstPhase(newMainPlan); + if (newFirstPhase == null) { + PhysicalPlan finalPhysicalPlan = logicalPlanToPhysicalPlan(newMainPlan, request); + runPhase.accept(finalPhysicalPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> { + profileAccumulator.addAll(finalResult.profiles()); + finalListener.onResponse(new Result(finalResult.schema(), finalResult.pages(), profileAccumulator)); + })); + } else { + executePhased(profileAccumulator, newMainPlan, request, newFirstPhase, runPhase, next); + } + } finally { + Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks))); + } + })); } private LogicalPlan parse(String query, QueryParams params) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java index 8c5a5a4b3ba3b..552aa9443438f 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.compute.data.Block; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.ExistsQueryBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; @@ -126,6 +127,7 @@ public static NamedWriteableRegistry writableRegistry() { entries.addAll(Expression.getNamedWriteables()); entries.addAll(EsqlScalarFunction.getNamedWriteables()); entries.addAll(AggregateFunction.getNamedWriteables()); + entries.addAll(Block.getNamedWriteables()); return new NamedWriteableRegistry(entries); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java index 55691526ea428..5cce61484ef87 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.Grok; +import org.elasticsearch.xpack.esql.plan.logical.InlineStats; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.Lookup; @@ -146,6 +147,7 @@ public void testPhysicalPlanEntries() { Eval.class, Filter.class, Grok.class, + InlineStats.class, Join.class, Limit.class, LocalRelation.class, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java index 2d53fabbfe10d..35688175f372a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java @@ -311,6 +311,7 @@ public void testAggsWithGroupKeyAsAgg() throws Exception { } public void testInlineStatsWithGroups() { + assumeTrue("INLINESTATS requires snapshot builds", Build.current().isSnapshot()); assertEquals( new InlineStats( EMPTY, @@ -327,6 +328,7 @@ public void testInlineStatsWithGroups() { } public void testInlineStatsWithoutGroups() { + assumeTrue("INLINESTATS requires snapshot builds", Build.current().isSnapshot()); assertEquals( new InlineStats( EMPTY, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java new file mode 100644 index 0000000000000..4bd8dd3a0e96f --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/PhasedTests.java @@ -0,0 +1,146 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical; + +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; +import org.elasticsearch.xpack.esql.core.index.EsIndex; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; + +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class PhasedTests extends ESTestCase { + public void testZeroLayers() { + EsRelation relation = new EsRelation(Source.synthetic("relation"), new EsIndex("foo", Map.of()), IndexMode.STANDARD, false); + relation.setAnalyzed(); + assertThat(Phased.extractFirstPhase(relation), nullValue()); + } + + public void testOneLayer() { + EsRelation relation = new EsRelation(Source.synthetic("relation"), new EsIndex("foo", Map.of()), IndexMode.STANDARD, false); + LogicalPlan orig = new Dummy(Source.synthetic("orig"), relation); + orig.setAnalyzed(); + assertThat(Phased.extractFirstPhase(orig), sameInstance(relation)); + LogicalPlan finalPhase = Phased.applyResultsFromFirstPhase( + orig, + List.of(new ReferenceAttribute(Source.EMPTY, "foo", DataType.KEYWORD)), + List.of() + ); + assertThat( + finalPhase, + equalTo(new Row(orig.source(), List.of(new Alias(orig.source(), "foo", new Literal(orig.source(), "foo", DataType.KEYWORD))))) + ); + assertThat(Phased.extractFirstPhase(finalPhase), nullValue()); + } + + public void testTwoLayer() { + EsRelation relation = new EsRelation(Source.synthetic("relation"), new EsIndex("foo", Map.of()), IndexMode.STANDARD, false); + LogicalPlan inner = new Dummy(Source.synthetic("inner"), relation); + LogicalPlan orig = new Dummy(Source.synthetic("outer"), inner); + orig.setAnalyzed(); + assertThat( + "extractFirstPhase should call #firstPhase on the earliest child in the plan", + Phased.extractFirstPhase(orig), + sameInstance(relation) + ); + LogicalPlan secondPhase = Phased.applyResultsFromFirstPhase( + orig, + List.of(new ReferenceAttribute(Source.EMPTY, "foo", DataType.KEYWORD)), + List.of() + ); + assertThat( + "applyResultsFromFirstPhase should call #nextPhase one th earliest child in the plan", + secondPhase, + equalTo( + new Dummy( + Source.synthetic("outer"), + new Row(orig.source(), List.of(new Alias(orig.source(), "foo", new Literal(orig.source(), "foo", DataType.KEYWORD)))) + ) + ) + ); + + assertThat(Phased.extractFirstPhase(secondPhase), sameInstance(secondPhase.children().get(0))); + LogicalPlan finalPhase = Phased.applyResultsFromFirstPhase( + secondPhase, + List.of(new ReferenceAttribute(Source.EMPTY, "foo", DataType.KEYWORD)), + List.of() + ); + assertThat( + finalPhase, + equalTo(new Row(orig.source(), List.of(new Alias(orig.source(), "foo", new Literal(orig.source(), "foo", DataType.KEYWORD))))) + ); + + assertThat(Phased.extractFirstPhase(finalPhase), nullValue()); + } + + public class Dummy extends UnaryPlan implements Phased { + Dummy(Source source, LogicalPlan child) { + super(source, child); + } + + @Override + public boolean expressionsResolved() { + throw new UnsupportedOperationException(); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, Dummy::new, child()); + } + + @Override + public int hashCode() { + return child().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Dummy == false) { + return false; + } + Dummy other = (Dummy) obj; + return child().equals(other.child()); + } + + @Override + public UnaryPlan replaceChild(LogicalPlan newChild) { + return new Dummy(source(), newChild); + } + + @Override + public List output() { + return child().output(); + } + + @Override + public LogicalPlan firstPhase() { + return child(); + } + + @Override + public LogicalPlan nextPhase(List schema, List firstPhaseResult) { + // Replace myself with a dummy "row" command + return new Row( + source(), + schema.stream().map(a -> new Alias(source(), a.name(), new Literal(source(), a.name(), DataType.KEYWORD))).toList() + ); + } + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java index 3036a729aa298..6c6c8ccb665a5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java @@ -46,6 +46,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Dissect; import org.elasticsearch.xpack.esql.plan.logical.Grok; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.PhasedTests; import org.elasticsearch.xpack.esql.plan.logical.join.JoinType; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.Stat; @@ -115,7 +116,7 @@ public class EsqlNodeSubclassTests> extends NodeS private static final Predicate CLASSNAME_FILTER = className -> { boolean esqlCore = className.startsWith("org.elasticsearch.xpack.esql.core") != false; boolean esqlProper = className.startsWith("org.elasticsearch.xpack.esql") != false; - return esqlCore || esqlProper; + return (esqlCore || esqlProper) && className.equals(PhasedTests.Dummy.class.getName()) == false; }; /** @@ -126,7 +127,7 @@ public class EsqlNodeSubclassTests> extends NodeS @SuppressWarnings("rawtypes") public static List nodeSubclasses() throws IOException { return subclassesOf(Node.class, CLASSNAME_FILTER).stream() - .filter(c -> testClassFor(c) == null) + .filter(c -> testClassFor(c) == null || c != PhasedTests.Dummy.class) .map(c -> new Object[] { c }) .toList(); }