Skip to content

Commit

Permalink
ESQL: Fix filtered grouping on ords (elastic#115312)
Browse files Browse the repository at this point in the history
This fixes filtered aggs when they are grouped on a field with ordinals.
This looks like:
```
| STATS max = max(salary) WHERE salary > 0 BY job_positions
```
when the `job_positions` field is a keyword field with doc values. In
that case we use a faster group-by-segment-ordinals algorithm that needs
to be able to merge the results of aggregators from multiple segments.
This previously failed with a `ClassCastException` because of a mistake.

Also! the group-by-segment-ordinals algorithm wasn't properly releasing
the closure used to add inputs, causing a breaker size leak. This wasn't
really leaking memory, but leaking *tracking* of memory.

Closes elastic#114897
  • Loading branch information
nik9000 authored and georgewallace committed Oct 25, 2024
1 parent 835d994 commit c1a6f7c
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 7 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/115312.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 115312
summary: "ESQL: Fix filtered grouping on ords"
area: ES|QL
type: bug
issues:
- 114897
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void addIntermediateInput(int positionOffset, IntVector groupIdVector, Pa

@Override
public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) {
next.addIntermediateRowInput(groupId, input, position);
next.addIntermediateRowInput(groupId, ((FilteredGroupingAggregatorFunction) input).next(), position);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ static final class OrdinalSegmentAggregator implements Releasable, SeenGroupIds
}

void addInput(IntVector docs, Page page) {
GroupingAggregatorFunction.AddInput[] prepared = new GroupingAggregatorFunction.AddInput[aggregators.size()];
try {
GroupingAggregatorFunction.AddInput[] prepared = new GroupingAggregatorFunction.AddInput[aggregators.size()];
for (int i = 0; i < prepared.length; i++) {
prepared[i] = aggregators.get(i).prepareProcessPage(this, page);
}
Expand All @@ -392,7 +392,7 @@ void addInput(IntVector docs, Page page) {
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
page.releaseBlocks();
Releasables.close(page::releaseBlocks, Releasables.wrap(prepared));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
public class FilteredAggregatorFunctionTests extends AggregatorFunctionTestCase {
private final List<Exception> unclosed = Collections.synchronizedList(new ArrayList<>());

// TODO some version of this test that applies across all aggs
@Override
protected AggregatorFunctionSupplier aggregatorFunction(List<Integer> inputChannels) {
return new FilteredAggregatorFunctionSupplier(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.compute.operator.LongIntBlockSourceOperator;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Tuple;
import org.junit.After;

Expand All @@ -31,7 +33,6 @@
public class FilteredGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase {
private final List<Exception> unclosed = Collections.synchronizedList(new ArrayList<>());

// TODO some version of this test that applies across all aggs
@Override
protected AggregatorFunctionSupplier aggregatorFunction(List<Integer> inputChannels) {
return new FilteredAggregatorFunctionSupplier(
Expand Down Expand Up @@ -104,6 +105,42 @@ protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
);
}

/**
* Tests {@link GroupingAggregator#addIntermediateRow} by building results using the traditional
* add mechanism and using {@link GroupingAggregator#addIntermediateRow} then asserting that they
* produce the same output.
*/
public void testAddIntermediateRowInput() {
DriverContext ctx = driverContext();
AggregatorFunctionSupplier supplier = aggregatorFunction(channels(AggregatorMode.SINGLE));
Block[] results = new Block[2];
try (
GroupingAggregatorFunction main = supplier.groupingAggregator(ctx);
GroupingAggregatorFunction leaf = supplier.groupingAggregator(ctx);
SourceOperator source = simpleInput(ctx.blockFactory(), 10);
) {
Page p;
while ((p = source.getOutput()) != null) {
try (
IntVector group = ctx.blockFactory().newConstantIntVector(0, p.getPositionCount());
GroupingAggregatorFunction.AddInput addInput = leaf.prepareProcessPage(null, p)
) {
addInput.add(0, group);
} finally {
p.releaseBlocks();
}
}
main.addIntermediateRowInput(0, leaf, 0);
try (IntVector selected = ctx.blockFactory().newConstantIntVector(0, 1)) {
main.evaluateFinal(results, 0, selected, ctx);
leaf.evaluateFinal(results, 1, selected, ctx);
}
assertThat(results[0], equalTo(results[1]));
} finally {
Releasables.close(results);
}
}

@After
public void checkUnclosed() {
for (Exception tracker : unclosed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,17 @@ protected final Operator.OperatorFactory simpleWithMode(AggregatorMode mode) {
return simpleWithMode(mode, Function.identity());
}

protected List<Integer> channels(AggregatorMode mode) {
return mode.isInputPartial() ? range(1, 1 + aggregatorIntermediateBlockCount()).boxed().toList() : List.of(1);
}

private Operator.OperatorFactory simpleWithMode(
AggregatorMode mode,
Function<AggregatorFunctionSupplier, AggregatorFunctionSupplier> wrap
) {
List<Integer> channels = mode.isInputPartial() ? range(1, 1 + aggregatorIntermediateBlockCount()).boxed().toList() : List.of(1);
int emitChunkSize = between(100, 200);

AggregatorFunctionSupplier supplier = wrap.apply(aggregatorFunction(channels));
AggregatorFunctionSupplier supplier = wrap.apply(aggregatorFunction(channels(mode)));
if (randomBoolean()) {
supplier = chunkGroups(emitChunkSize, supplier);
}
Expand Down
126 changes: 126 additions & 0 deletions x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -2529,3 +2529,129 @@ FROM employees | eval x = [1,2,3], y = 5 + 6 | stats m = max(y) by y+1
m:integer | y+1:integer
11 | 12
;

filterIsAlwaysTrue
required_capability: per_agg_filtering
FROM employees
| STATS max = max(salary) WHERE salary > 0
;

max:integer
74999
;

filterIsAlwaysFalse
required_capability: per_agg_filtering
FROM employees
| STATS max = max(salary) WHERE first_name == ""
;

max:integer
null
;

filterSometimesMatches
required_capability: per_agg_filtering
FROM employees
| STATS max = max(salary) WHERE first_name IS NULL
;

max:integer
70011
;

groupingFilterIsAlwaysTrue
required_capability: per_agg_filtering
FROM employees
| MV_EXPAND job_positions
| STATS max = max(salary) WHERE salary > 0 BY job_positions = SUBSTRING(job_positions, 1, 1)
| SORT job_positions
| LIMIT 4
;

max:integer | job_positions:keyword
74970 | A
58121 | B
74999 | D
58715 | H
;

groupingFilterIsAlwaysFalse
required_capability: per_agg_filtering
FROM employees
| MV_EXPAND job_positions
| STATS max = max(salary) WHERE first_name == "" BY job_positions = SUBSTRING(job_positions, 1, 1)
| SORT job_positions
| LIMIT 4
;

max:integer | job_positions:keyword
null | A
null | B
null | D
null | H
;

groupingFilterSometimesMatches
required_capability: per_agg_filtering
FROM employees
| MV_EXPAND job_positions
| STATS max = max(salary) WHERE first_name IS NULL BY job_positions = SUBSTRING(job_positions, 1, 1)
| SORT job_positions
| LIMIT 4
;

max:integer | job_positions:keyword
62233 | A
39878 | B
67492 | D
null | H
;

groupingByOrdinalsFilterIsAlwaysTrue
required_capability: per_agg_filtering
required_capability: per_agg_filtering_ords
FROM employees
| STATS max = max(salary) WHERE salary > 0 BY job_positions
| SORT job_positions
| LIMIT 4
;

max:integer | job_positions:keyword
74970 | Accountant
69904 | Architect
58121 | Business Analyst
74999 | Data Scientist
;

groupingByOrdinalsFilterIsAlwaysFalse
required_capability: per_agg_filtering
required_capability: per_agg_filtering_ords
FROM employees
| STATS max = max(salary) WHERE first_name == "" BY job_positions
| SORT job_positions
| LIMIT 4
;

max:integer | job_positions:keyword
null | Accountant
null | Architect
null | Business Analyst
null | Data Scientist
;

groupingByOrdinalsFilterSometimesMatches
required_capability: per_agg_filtering
required_capability: per_agg_filtering_ords
FROM employees
| STATS max = max(salary) WHERE first_name IS NULL BY job_positions
| SORT job_positions
| LIMIT 4
;

max:integer | job_positions:keyword
39878 | Accountant
62233 | Architect
39878 | Business Analyst
67492 | Data Scientist
;
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,11 @@ public enum Cap {
*/
PER_AGG_FILTERING,

/**
* Fix {@link #PER_AGG_FILTERING} grouped by ordinals.
*/
PER_AGG_FILTERING_ORDS,

/**
* Fix for https://github.com/elastic/elasticsearch/issues/114714
*/
Expand Down

0 comments on commit c1a6f7c

Please sign in to comment.