-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ESQL: Compute support for filtering ungrouped aggs #112717
Conversation
Adds support to the compute engine for filtering which positions are processed by ungrouping aggs. This should allow syntax like: ``` | STATS success = COUNT(*) WHERE 200 <= response_code AND response_code < 300, redirect = COUNT(*) WHERE 300 <= response_code AND response_code < 400, client_err = COUNT(*) WHERE 400 <= response_code AND response_code < 500, server_err = COUNT(*) WHERE 500 <= response_code AND response_code < 600, total_count = COUNT(*) ``` We could translate the WHERE expression into an `ExpressionEvaluator` and run it, then plug it into the filtering support added in this PR. The actual filtering is done by creating a `FilteredAggregatorFunction` which wraps a regular `AggregatorFunction` first executing the filter against the incoming `Page` and then passing the resulting mask to the `AggregatorFunction`. We've then added a `mask` to `AggregatorFunction#process` which each aggregation function must use for filtering. We keep the unfiltered behavior by sending a constant block with `true` in it. Each agg detects this and takes an "unfiltered" path, preserving the original performance. Importantly, when you don't turn this on it doesn't effect performance: ``` (blockType) (grouping) (op) Score Error -> Score Error Units vector_longs none count 0.007 ± 0.001 -> 0.007 ± 0.001 ns/op vector_longs none min 0.123 ± 0.004 -> 0.128 ± 0.005 ns/op vector_longs longs count 4.311 ± 0.192 -> 4.218 ± 0.053 ns/op vector_longs longs min 5.476 ± 0.077 -> 5.451 ± 0.074 ns/op ```
Pinging @elastic/es-analytical-engine (Team:Analytics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
try { | ||
aggregator.processPage(inputPage); | ||
try ( | ||
BooleanVector noMasking = driverContext().blockFactory().newConstantBooleanVector(true, inputPage.getPositionCount()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the point of this PR, we should be able to test masking here. Maybe making another test for it.
Should we do it now? Or in other PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah! I was going to do it in a follow-up. But, yeah. Soon!
builder.beginControlFlow("if (vector != null)").addStatement("addRawVector(vector)"); | ||
builder.nextControlFlow("else").addStatement("addRawBlock(block)").endControlFlow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe it's me, but I think this is easier to read if every statement is in a new line. So you can "read" the code within quotes from top to bottom.
builder.beginControlFlow("if (vector != null)").addStatement("addRawVector(vector)");
builder.nextControlFlow("else").addStatement("addRawBlock(block)").endControlFlow();
VS
builder.beginControlFlow("if (vector != null)");
builder.addStatement("addRawVector(vector)");
builder.nextControlFlow("else");
builder.addStatement("addRawBlock(block)");
builder.endControlFlow();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can do that!
Adds support to the compute engine for filtering which positions are processed by ungrouping aggs. This should allow syntax like: ``` | STATS success = COUNT(*) WHERE 200 <= response_code AND response_code < 300, redirect = COUNT(*) WHERE 300 <= response_code AND response_code < 400, client_err = COUNT(*) WHERE 400 <= response_code AND response_code < 500, server_err = COUNT(*) WHERE 500 <= response_code AND response_code < 600, total_count = COUNT(*) ``` We could translate the WHERE expression into an `ExpressionEvaluator` and run it, then plug it into the filtering support added in this PR. The actual filtering is done by creating a `FilteredAggregatorFunction` which wraps a regular `AggregatorFunction` first executing the filter against the incoming `Page` and then passing the resulting mask to the `AggregatorFunction`. We've then added a `mask` to `AggregatorFunction#process` which each aggregation function must use for filtering. We keep the unfiltered behavior by sending a constant block with `true` in it. Each agg detects this and takes an "unfiltered" path, preserving the original performance. Importantly, when you don't turn this on it doesn't effect performance: ``` (blockType) (grouping) (op) Score Error -> Score Error Units vector_longs none count 0.007 ± 0.001 -> 0.007 ± 0.001 ns/op vector_longs none min 0.123 ± 0.004 -> 0.128 ± 0.005 ns/op vector_longs longs count 4.311 ± 0.192 -> 4.218 ± 0.053 ns/op vector_longs longs min 5.476 ± 0.077 -> 5.451 ± 0.074 ns/op ```
💚 Backport successful
|
Adds support to the compute engine for filtering which positions are processed by ungrouping aggs. This should allow syntax like: ``` | STATS success = COUNT(*) WHERE 200 <= response_code AND response_code < 300, redirect = COUNT(*) WHERE 300 <= response_code AND response_code < 400, client_err = COUNT(*) WHERE 400 <= response_code AND response_code < 500, server_err = COUNT(*) WHERE 500 <= response_code AND response_code < 600, total_count = COUNT(*) ``` We could translate the WHERE expression into an `ExpressionEvaluator` and run it, then plug it into the filtering support added in this PR. The actual filtering is done by creating a `FilteredAggregatorFunction` which wraps a regular `AggregatorFunction` first executing the filter against the incoming `Page` and then passing the resulting mask to the `AggregatorFunction`. We've then added a `mask` to `AggregatorFunction#process` which each aggregation function must use for filtering. We keep the unfiltered behavior by sending a constant block with `true` in it. Each agg detects this and takes an "unfiltered" path, preserving the original performance. Importantly, when you don't turn this on it doesn't effect performance: ``` (blockType) (grouping) (op) Score Error -> Score Error Units vector_longs none count 0.007 ± 0.001 -> 0.007 ± 0.001 ns/op vector_longs none min 0.123 ± 0.004 -> 0.128 ± 0.005 ns/op vector_longs longs count 4.311 ± 0.192 -> 4.218 ± 0.053 ns/op vector_longs longs min 5.476 ± 0.077 -> 5.451 ± 0.074 ns/op ```
…tion-ironbank-ubi * upstream/main: (302 commits) Deduplicate BucketOrder when deserializing (elastic#112707) Introduce test utils for ingest pipelines (elastic#112733) [Test] Account for auto-repairing for shard gen file (elastic#112778) Do not throw in task enqueued by CancellableRunner (elastic#112780) Mute org.elasticsearch.script.StatsSummaryTests testEqualsAndHashCode elastic#112439 Mute org.elasticsearch.repositories.blobstore.testkit.integrity.RepositoryVerifyIntegrityIT testTransportException elastic#112779 Use a dedicated test executor in MockTransportService (elastic#112748) Estimate segment field usages (elastic#112760) (Doc+) Inference Pipeline ignores Mapping Analyzers (elastic#112522) Fix verifyVersions task (elastic#112765) (Doc+) Terminating Exit Codes (elastic#112530) (Doc+) CAT Nodes default columns (elastic#112715) [DOCS] Augment installation warnings (elastic#112756) Mute org.elasticsearch.repositories.blobstore.testkit.integrity.RepositoryVerifyIntegrityIT testCorruption elastic#112769 Bump Elasticsearch to a minimum of JDK 21 (elastic#112252) ESQL: Compute support for filtering ungrouped aggs (elastic#112717) Bump Elasticsearch version to 9.0.0 (elastic#112570) add CDR related data streams to kibana_system priviliges (elastic#112655) Support widening of numeric types in union-types (elastic#112610) Introduce data stream options and failure store configuration classes (elastic#109515) ...
Adds support to the compute engine for filtering which positions are processed by ungrouping aggs. This should allow syntax like: ``` | STATS success = COUNT(*) WHERE 200 <= response_code AND response_code < 300, redirect = COUNT(*) WHERE 300 <= response_code AND response_code < 400, client_err = COUNT(*) WHERE 400 <= response_code AND response_code < 500, server_err = COUNT(*) WHERE 500 <= response_code AND response_code < 600, total_count = COUNT(*) ``` We could translate the WHERE expression into an `ExpressionEvaluator` and run it, then plug it into the filtering support added in this PR. The actual filtering is done by creating a `FilteredAggregatorFunction` which wraps a regular `AggregatorFunction` first executing the filter against the incoming `Page` and then passing the resulting mask to the `AggregatorFunction`. We've then added a `mask` to `AggregatorFunction#process` which each aggregation function must use for filtering. We keep the unfiltered behavior by sending a constant block with `true` in it. Each agg detects this and takes an "unfiltered" path, preserving the original performance. Importantly, when you don't turn this on it doesn't effect performance: ``` (blockType) (grouping) (op) Score Error -> Score Error Units vector_longs none count 0.007 ± 0.001 -> 0.007 ± 0.001 ns/op vector_longs none min 0.123 ± 0.004 -> 0.128 ± 0.005 ns/op vector_longs longs count 4.311 ± 0.192 -> 4.218 ± 0.053 ns/op vector_longs longs min 5.476 ± 0.077 -> 5.451 ± 0.074 ns/op ```
Adds support to the compute engine for filtering which positions are processed by ungrouping aggs. This should allow syntax like:
We could translate the WHERE expression into an
ExpressionEvaluator
and run it, then plug it into the filtering support added in this PR.The actual filtering is done by creating a
FilteredAggregatorFunction
which wraps a regularAggregatorFunction
first executing the filter against the incomingPage
and then passing the resulting mask to theAggregatorFunction
. We've then added amask
toAggregatorFunction#process
which each aggregation function must use for filtering.We keep the unfiltered behavior by sending a constant block with
true
in it. Each agg detects this and takes an "unfiltered" path, preserving the original performance.Importantly, when you don't turn this on it doesn't effect performance: