Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Compute support for filtering grouping aggs #112476

Merged
merged 4 commits into from
Sep 9, 2024

Conversation

nik9000
Copy link
Member

@nik9000 nik9000 commented Sep 3, 2024

Adds support to the compute engine for filtering which positions are processed by grouping aggs. This should allow syntax like

| STATS
       success = COUNT(*) WHERE 200 <= response_code AND response_code < 300,
      redirect = COUNT(*) WHERE 300 <= response_code AND response_code < 400,
    client_err = COUNT(*) WHERE 400 <= response_code AND response_code < 500,
    server_err = COUNT(*) WHERE 500 <= response_code AND response_code < 600,
   total_count = COUNT(*)
  BY hostname

We could translate the WHERE expression into an ExpressionEvaluator and run it, then plug it into the filtering support added in this PR.

The actual filtering is done by creating a
FilteredGroupingAggregatorFunction which runs wraps a regular GroupingAggregatorFunction first executing the filter against the incoming Page and then nulling any positions in the group that don't match. Then passing the resulting groups into the real aggregator. When the real grouping aggregator implementation sees null value for groups it skips collecting that position.

We had to make two changes to every agg for this to work:

  1. Add a method to force local group tracking mode on any aggregator. Previously this was only required if the agg encountered null values, but when we're filtering aggs we can no longer trust the seen parameter we get when building the result. This local group tracking mode let's us track what we've actually seen locally.
  2. Add Releasable to the AddInput thing we use to handle chunked pages in grouping aggs. This is required because the results of the filter must be closed on completion.

Both of these are fairly trivial changes, but require touching every aggregation.

Adds support to the compute engine for filtering which positions are
processed by grouping aggs. This should allow syntax like
```
| STATS
       success = COUNT(*) WHERE 200 <= response_code AND response_code < 300,
      redirect = COUNT(*) WHERE 300 <= response_code AND response_code < 400,
    client_err = COUNT(*) WHERE 400 <= response_code AND response_code < 500,
    server_err = COUNT(*) WHERE 500 <= response_code AND response_code < 600,
   total_count = COUNT(*)
  BY hostname
```

We could translate the WHERE expression into an `ExpressionEvaluator`
and run it, then plug it into the filtering support added in this PR.

The actual filtering is done by creating a
`FilteredGroupingAggregatorFunction` which runs wraps a regular
`GroupingAggregatorFunction` first executing the filter against the
incoming `Page` and then `null`ing any positions in the group that don't
match. Then passing the resulting groups into the real aggregator.
When the real grouping aggregator implementation sees `null` value for
groups it skips collecting that position.

We had to make two changes to every agg for this to work:
1. Add a method to force local group tracking mode on any aggregator.
   Previously this was only required if the agg encountered `null`
   values, but when we're filtering aggs we can no longer trust the
   `seen` parameter we get when building the result. This local group
   tracking mode let's us track what we've actually seen locally.
2. Add `Releasable` to the `AddInput` thing we use to handle chunked
   pages in grouping aggs. This is required because the results of the
   filter must be closed on completion.

Both of these are fairly trivial changes, but require touching every
aggregation.
@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Sep 3, 2024
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)


@Override
public AggregatorFunction aggregator(DriverContext driverContext) {
throw new UnsupportedOperationException("TODO");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked #111439

public AddInput prepareProcessPage(SeenGroupIds seenGroupIds, Page page) {
try (BooleanBlock filterResult = ((BooleanBlock) filter.eval(page))) {
ToMask mask = filterResult.toMask();
// TODO warn on mv fields
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked #111439

@@ -150,18 +150,23 @@ private void end() {
hashStart = System.nanoTime();
aggregationNanos += hashStart - aggStart;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just implementing close and calling it properly.

public class FilteredGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase {
private final List<Exception> unclosed = Collections.synchronizedList(new ArrayList<>());

// TODO some version of this test that applies across all aggs
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked #111439

@costin
Copy link
Member

costin commented Sep 3, 2024

Not sure if this is the right time, wondering what's the performance impact on the aggs with these changes? aka what do the microbenchs show.

Separately, to confirm each filter will create another filtering mask while reusing the underlying grouping data - that is, the number of filters is not going to increase the number of transient data, correct?
Keeping the filters separately is useful when dealing with different aggregate that use the same filters - this way the planner can determine the duplicate filter evaluation and do it only once regardless of how many aggs want to use it.

Copy link
Contributor

@ivancea ivancea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intimidating PR at first; quite simple at the end. I guess we have to rethink removing the autogenerated files :hehe:

LGTM, looking forward for the next steps!

* track which group ids have been seen, even if that increases the
* overhead.
*/
void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the idea behind the SeenGroupIds parameter? Is it just a "In case you weren't tracking them, here you have all the seen groups until now"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's precisely that. Lots of aggs, like MAX don't track what they've seen unless they have to. This is, sadly, another time when they have to track it.


@Override
public GroupingAggregatorFunction groupingAggregator(DriverContext driverContext) {
GroupingAggregatorFunction next = this.next.groupingAggregator(driverContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should this assignation be moved inside the try, like with the filter?
I guess this shouldn't fail, but it looks odd being the only piece not in the try. And it's very similar to filter.get(...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could assign it to null and overwrite it. I don't believe it's needed, but it doesn't hurt. If this throws then the next will just be null and it's the responsibility of the method call itself to clean anything.

Comment on lines 43 to 83
new EvalOperator.ExpressionEvaluator.Factory() {
@Override
public EvalOperator.ExpressionEvaluator get(DriverContext context) {
Exception tracker = new Exception(Integer.toString(unclosed.size()));
unclosed.add(tracker);
return new EvalOperator.ExpressionEvaluator() {
@Override
public Block eval(Page page) {
IntBlock ints = page.getBlock(inputChannels.get(0));
try (
BooleanVector.FixedBuilder result = context.blockFactory()
.newBooleanVectorFixedBuilder(ints.getPositionCount())
) {
position: for (int p = 0; p < ints.getPositionCount(); p++) {
int start = ints.getFirstValueIndex(p);
int end = start + ints.getValueCount(p);
for (int i = start; i < end; i++) {
if (ints.getInt(i) > 0) {
result.appendBoolean(p, true);
continue position;
}
}
result.appendBoolean(p, false);
}
return result.build().asBlock();
}
}

@Override
public void close() {
if (unclosed.remove(tracker) == false) {
throw new IllegalStateException("close failure!");
}
}

@Override
public String toString() {
return "any > 0";
}
};
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to confirm this: We have this big chunk of code instead of an evaluator(new GreaterThan(...)) just because of the "unclosed" tracking?

If that's it, I wonder if it would be worth it to move this to a named, nested non-static class at the end. To simplify reading this, and document what this class is about

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't share code with the ESQL GreaterThan because:

  1. It's an any greater than. That makes the test more interesting.
  2. It can't see that code.
  3. The "unclosed" thing

But I can totally move this to a static class at the end of the file. Or a top level class. But a static class in the file feels a little better.

public abstract class GroupingAggregatorFunctionTestCase extends ForkingOperatorTestCase {
protected abstract AggregatorFunctionSupplier aggregatorFunction(List<Integer> inputChannels);

protected final int aggregatorIntermediateBlockCount() {
try (var agg = aggregatorFunction(List.of()).aggregator(driverContext())) {
try (var agg = aggregatorFunction(List.of()).groupingAggregator(driverContext())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this worked because all our aggregators have the same amount of intermediate blocks (?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And because they actually implemented aggregator. I've left that as a TODO for this PR. I didn't mind flipping this. And, yeah, they do have the same intermediate block layout. It'd be funky for them not to.

@nik9000 nik9000 requested a review from a team as a code owner September 9, 2024 16:46
@nik9000 nik9000 added the auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) label Sep 9, 2024
@elasticsearchmachine elasticsearchmachine merged commit 72248e3 into elastic:main Sep 9, 2024
15 checks passed
@nik9000 nik9000 deleted the filtered_aggs_1 branch September 9, 2024 18:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) >non-issue Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.16.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants