Skip to content

Commit

Permalink
ESQL: Compact topn (elastic#99316)
Browse files Browse the repository at this point in the history
This lowers topn's memory usage somewhat and makes it easier to track
the memory usage. That looks like:

```
"status" : {
  "occupied_rows" : 10000,
  "ram_bytes_used" : 255392224,
  "ram_used" : "243.5mb"
}
```

In some cases the memory usage savings is significant. In an example
with many, many keys the memory usage of each row drops from `58kb` to
`25kb`. This is a little degenerate though and I expect the savings to
normally be on the order of 10%.

The real advantage is memory tracking. It's *easy* to track used memory.
And, in a followup, it should be fairly easy to track circuit break the
used memory.

Mostly this is done by adding new abstractions and moving existing
abstractions to top level classes with tests and stuff.

* `TopNEncoder` is now a top level class. It has grown the ability to *decode* values as well as encode them. And it has grown "unsortable" versions which don't write their values such that sorting the bytes sorts the values. We use the "unsortable" versions when writing values.
* `KeyExtractor` extracts keys from the blocks and writes them to the row's `BytesRefBuilder`. This is basically objects replacing one of switch statements in `RowFactory`. They are more scattered but easier to test, and hopefully `TopNOperator` is more readable with this behavior factored out. Also! Most implementations are automatically generated.
* `ValueExtractor` extracts values from the blocks and writes them to the row's `BytesRefBuilder`. This replaces the other switch statement in `RowFactory` for the same reasons, except instead of writing to many arrays it writes to a `BytesRefBuilder` just like the key as compactly as it can manage.

The memory savings comes from three changes: 1. Lower overhead for
storing values by encoding them rather than using many primitive arrays.
2. Encode the value count as a vint rather than a whole int. Usually
there are very few rows and vint encodes that quite nicely. 3. Don't
write values that are in the key for single-valued fields. Instead we
read them from the key. That's going to be very very common.

This is unlikely to be faster than the old code. I haven't really tried
for speed. Just memory usage and accountability. Once we get good
accounting we can try and make this faster. I expect we'll have to
figure out the megamorphic invocations I've added. But, for now, they
help more than they hurt.
  • Loading branch information
nik9000 authored and piergm committed Sep 14, 2023
1 parent 7ba5754 commit 78a530c
Show file tree
Hide file tree
Showing 57 changed files with 4,339 additions and 1,214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.TopNOperator;
import org.elasticsearch.compute.operator.topn.TopNEncoder;
import org.elasticsearch.compute.operator.topn.TopNOperator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -77,8 +79,27 @@ private static Operator operator(String data, int topCount) {
case TWO_LONGS, LONGS_AND_BYTES_REFS -> 2;
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
};
List<ElementType> elementTypes = switch (data) {
case LONGS -> List.of(ElementType.LONG);
case INTS -> List.of(ElementType.INT);
case DOUBLES -> List.of(ElementType.DOUBLE);
case BOOLEANS -> List.of(ElementType.BOOLEAN);
case BYTES_REFS -> List.of(ElementType.BYTES_REF);
case TWO_LONGS -> List.of(ElementType.INT, ElementType.INT);
case LONGS_AND_BYTES_REFS -> List.of(ElementType.INT, ElementType.BYTES_REF);
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
};
List<TopNEncoder> encoders = switch (data) {
case LONGS, INTS, DOUBLES, BOOLEANS -> List.of(TopNEncoder.DEFAULT_SORTABLE);
case BYTES_REFS -> List.of(TopNEncoder.UTF8);
case TWO_LONGS -> List.of(TopNEncoder.DEFAULT_SORTABLE, TopNEncoder.DEFAULT_SORTABLE);
case LONGS_AND_BYTES_REFS -> List.of(TopNEncoder.DEFAULT_SORTABLE, TopNEncoder.UTF8);
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
};
return new TopNOperator(
topCount,
elementTypes,
encoders,
IntStream.range(0, count).mapToObj(c -> new TopNOperator.SortOrder(c, false, false)).toList(),
16 * 1024
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.ValueSourceInfo;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.compute.operator.TopNOperator;
import org.elasticsearch.compute.operator.topn.TopNOperator;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/99316.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99316
summary: "ESQL: Compact topn"
area: ES|QL
type: enhancement
issues: []
78 changes: 78 additions & 0 deletions x-pack/plugin/esql/compute/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,82 @@ tasks.named('stringTemplates').configure {
it.inputFile = multivalueDedupeInputFile
it.outputFile = "org/elasticsearch/compute/operator/MultivalueDedupeBytesRef.java"
}
File keyExtractorInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/operator/topn/X-KeyExtractor.java.st")
template {
it.properties = bytesRefProperties
it.inputFile = keyExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/KeyExtractorForBytesRef.java"
}
template {
it.properties = booleanProperties
it.inputFile = keyExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/KeyExtractorForBoolean.java"
}
template {
it.properties = intProperties
it.inputFile = keyExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/KeyExtractorForInt.java"
}
template {
it.properties = longProperties
it.inputFile = keyExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/KeyExtractorForLong.java"
}
template {
it.properties = doubleProperties
it.inputFile = keyExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/KeyExtractorForDouble.java"
}
File valueExtractorInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/operator/topn/X-ValueExtractor.java.st")
template {
it.properties = bytesRefProperties
it.inputFile = valueExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ValueExtractorForBytesRef.java"
}
template {
it.properties = booleanProperties
it.inputFile = valueExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ValueExtractorForBoolean.java"
}
template {
it.properties = intProperties
it.inputFile = valueExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ValueExtractorForInt.java"
}
template {
it.properties = longProperties
it.inputFile = valueExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ValueExtractorForLong.java"
}
template {
it.properties = doubleProperties
it.inputFile = valueExtractorInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ValueExtractorForDouble.java"
}
File resultBuilderInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/operator/topn/X-ResultBuilder.java.st")
template {
it.properties = bytesRefProperties
it.inputFile = resultBuilderInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ResultBuilderForBytesRef.java"
}
template {
it.properties = booleanProperties
it.inputFile = resultBuilderInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ResultBuilderForBoolean.java"
}
template {
it.properties = intProperties
it.inputFile = resultBuilderInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ResultBuilderForInt.java"
}
template {
it.properties = longProperties
it.inputFile = resultBuilderInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ResultBuilderForLong.java"
}
template {
it.properties = doubleProperties
it.inputFile = resultBuilderInputFile
it.outputFile = "org/elasticsearch/compute/operator/topn/ResultBuilderForDouble.java"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BooleanVector;

abstract class KeyExtractorForBoolean implements KeyExtractor {
static KeyExtractorForBoolean extractorFor(TopNEncoder encoder, boolean ascending, byte nul, byte nonNul, BooleanBlock block) {
BooleanVector v = block.asVector();
if (v != null) {
return new KeyExtractorForBoolean.ForVector(encoder, nul, nonNul, v);
}
if (ascending) {
return block.mvOrdering() == Block.MvOrdering.ASCENDING
? new KeyExtractorForBoolean.MinForAscending(encoder, nul, nonNul, block)
: new KeyExtractorForBoolean.MinForUnordered(encoder, nul, nonNul, block);
}
return block.mvOrdering() == Block.MvOrdering.ASCENDING
? new KeyExtractorForBoolean.MaxForAscending(encoder, nul, nonNul, block)
: new KeyExtractorForBoolean.MaxForUnordered(encoder, nul, nonNul, block);
}

private final byte nul;
private final byte nonNul;

KeyExtractorForBoolean(TopNEncoder encoder, byte nul, byte nonNul) {
assert encoder == TopNEncoder.DEFAULT_SORTABLE;
this.nul = nul;
this.nonNul = nonNul;
}

protected final int nonNul(BytesRefBuilder key, boolean value) {
key.append(nonNul);
TopNEncoder.DEFAULT_SORTABLE.encodeBoolean(value, key);
return Byte.BYTES + 1;
}

protected final int nul(BytesRefBuilder key) {
key.append(nul);
return 1;
}

static class ForVector extends KeyExtractorForBoolean {
private final BooleanVector vector;

ForVector(TopNEncoder encoder, byte nul, byte nonNul, BooleanVector vector) {
super(encoder, nul, nonNul);
this.vector = vector;
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
return nonNul(key, vector.getBoolean(position));
}
}

static class MinForAscending extends KeyExtractorForBoolean {
private final BooleanBlock block;

MinForAscending(TopNEncoder encoder, byte nul, byte nonNul, BooleanBlock block) {
super(encoder, nul, nonNul);
this.block = block;
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
if (block.isNull(position)) {
return nul(key);
}
return nonNul(key, block.getBoolean(block.getFirstValueIndex(position)));
}
}

static class MaxForAscending extends KeyExtractorForBoolean {
private final BooleanBlock block;

MaxForAscending(TopNEncoder encoder, byte nul, byte nonNul, BooleanBlock block) {
super(encoder, nul, nonNul);
this.block = block;
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
if (block.isNull(position)) {
return nul(key);
}
return nonNul(key, block.getBoolean(block.getFirstValueIndex(position) + block.getValueCount(position) - 1));
}
}

static class MinForUnordered extends KeyExtractorForBoolean {
private final BooleanBlock block;

MinForUnordered(TopNEncoder encoder, byte nul, byte nonNul, BooleanBlock block) {
super(encoder, nul, nonNul);
this.block = block;
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
int size = block.getValueCount(position);
if (size == 0) {
return nul(key);
}
int start = block.getFirstValueIndex(position);
int end = start + size;
for (int i = start; i < end; i++) {
if (block.getBoolean(i) == false) {
return nonNul(key, false);
}
}
return nonNul(key, true);
}
}

static class MaxForUnordered extends KeyExtractorForBoolean {
private final BooleanBlock block;

MaxForUnordered(TopNEncoder encoder, byte nul, byte nonNul, BooleanBlock block) {
super(encoder, nul, nonNul);
this.block = block;
}

@Override
public int writeKey(BytesRefBuilder key, int position) {
int size = block.getValueCount(position);
if (size == 0) {
return nul(key);
}
int start = block.getFirstValueIndex(position);
int end = start + size;
for (int i = start; i < end; i++) {
if (block.getBoolean(i)) {
return nonNul(key, true);
}
}
return nonNul(key, false);
}
}
}
Loading

0 comments on commit 78a530c

Please sign in to comment.