Skip to content

Commit

Permalink
ESQL: Speed up grouping by bytes (elastic#114021) (elastic#114652)
Browse files Browse the repository at this point in the history
This speeds up grouping by bytes valued fields (keyword, text, ip, and
wildcard) when the input is an ordinal block:
```
    bytes_refs 22.213 ± 0.322 -> 19.848 ± 0.205 ns/op (*maybe* real, maybe noise. still good)
       ordinal didn't exist   ->  2.988 ± 0.011 ns/op
```
I see this as 20ns -> 3ns, an 85% speed up. We never hard the ordinals
branch before so I'm expecting the same performance there - about 20ns
per op.

This also speeds up grouping by a pair of byte valued fields:
```
two_bytes_refs 83.112 ± 42.348  -> 46.521 ± 0.386 ns/op
  two_ordinals 83.531 ± 23.473  ->  8.617 ± 0.105 ns/op
```
The speed up is much better when the fields are ordinals because hashing
bytes is comparatively slow.

I believe the ordinals case is quite common. I've run into it in quite a
few profiles.
  • Loading branch information
nik9000 authored Oct 11, 2024
1 parent 0e2f832 commit 1212dee
Show file tree
Hide file tree
Showing 13 changed files with 632 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BooleanVector;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.BytesRefVector;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.AggregationOperator;
import org.elasticsearch.compute.operator.DriverContext;
Expand Down Expand Up @@ -78,7 +81,10 @@ public class AggregatorBenchmark {
private static final String DOUBLES = "doubles";
private static final String BOOLEANS = "booleans";
private static final String BYTES_REFS = "bytes_refs";
private static final String ORDINALS = "ordinals";
private static final String TWO_LONGS = "two_" + LONGS;
private static final String TWO_BYTES_REFS = "two_" + BYTES_REFS;
private static final String TWO_ORDINALS = "two_" + ORDINALS;
private static final String LONGS_AND_BYTES_REFS = LONGS + "_and_" + BYTES_REFS;
private static final String TWO_LONGS_AND_BYTES_REFS = "two_" + LONGS + "_and_" + BYTES_REFS;

Expand Down Expand Up @@ -119,7 +125,21 @@ public class AggregatorBenchmark {
}
}

@Param({ NONE, LONGS, INTS, DOUBLES, BOOLEANS, BYTES_REFS, TWO_LONGS, LONGS_AND_BYTES_REFS, TWO_LONGS_AND_BYTES_REFS })
@Param(
{
NONE,
LONGS,
INTS,
DOUBLES,
BOOLEANS,
BYTES_REFS,
ORDINALS,
TWO_LONGS,
TWO_BYTES_REFS,
TWO_ORDINALS,
LONGS_AND_BYTES_REFS,
TWO_LONGS_AND_BYTES_REFS }
)
public String grouping;

@Param({ COUNT, COUNT_DISTINCT, MIN, MAX, SUM })
Expand All @@ -144,8 +164,12 @@ private static Operator operator(DriverContext driverContext, String grouping, S
case INTS -> List.of(new BlockHash.GroupSpec(0, ElementType.INT));
case DOUBLES -> List.of(new BlockHash.GroupSpec(0, ElementType.DOUBLE));
case BOOLEANS -> List.of(new BlockHash.GroupSpec(0, ElementType.BOOLEAN));
case BYTES_REFS -> List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF));
case BYTES_REFS, ORDINALS -> List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF));
case TWO_LONGS -> List.of(new BlockHash.GroupSpec(0, ElementType.LONG), new BlockHash.GroupSpec(1, ElementType.LONG));
case TWO_BYTES_REFS, TWO_ORDINALS -> List.of(
new BlockHash.GroupSpec(0, ElementType.BYTES_REF),
new BlockHash.GroupSpec(1, ElementType.BYTES_REF)
);
case LONGS_AND_BYTES_REFS -> List.of(
new BlockHash.GroupSpec(0, ElementType.LONG),
new BlockHash.GroupSpec(1, ElementType.BYTES_REF)
Expand Down Expand Up @@ -218,6 +242,10 @@ private static void checkGrouped(String prefix, String grouping, String op, Stri
checkGroupingBlock(prefix, LONGS, page.getBlock(0));
checkGroupingBlock(prefix, LONGS, page.getBlock(1));
}
case TWO_BYTES_REFS, TWO_ORDINALS -> {
checkGroupingBlock(prefix, BYTES_REFS, page.getBlock(0));
checkGroupingBlock(prefix, BYTES_REFS, page.getBlock(1));
}
case LONGS_AND_BYTES_REFS -> {
checkGroupingBlock(prefix, LONGS, page.getBlock(0));
checkGroupingBlock(prefix, BYTES_REFS, page.getBlock(1));
Expand Down Expand Up @@ -379,7 +407,7 @@ private static void checkGroupingBlock(String prefix, String grouping, Block blo
throw new AssertionError(prefix + "bad group expected [true] but was [" + groups.getBoolean(1) + "]");
}
}
case BYTES_REFS -> {
case BYTES_REFS, ORDINALS -> {
BytesRefBlock groups = (BytesRefBlock) block;
for (int g = 0; g < GROUPS; g++) {
if (false == groups.getBytesRef(g, new BytesRef()).equals(bytesGroup(g))) {
Expand Down Expand Up @@ -508,6 +536,8 @@ private static Block dataBlock(BlockFactory blockFactory, String blockType) {
private static List<Block> groupingBlocks(String grouping, String blockType) {
return switch (grouping) {
case TWO_LONGS -> List.of(groupingBlock(LONGS, blockType), groupingBlock(LONGS, blockType));
case TWO_BYTES_REFS -> List.of(groupingBlock(BYTES_REFS, blockType), groupingBlock(BYTES_REFS, blockType));
case TWO_ORDINALS -> List.of(groupingBlock(ORDINALS, blockType), groupingBlock(ORDINALS, blockType));
case LONGS_AND_BYTES_REFS -> List.of(groupingBlock(LONGS, blockType), groupingBlock(BYTES_REFS, blockType));
case TWO_LONGS_AND_BYTES_REFS -> List.of(
groupingBlock(LONGS, blockType),
Expand Down Expand Up @@ -570,6 +600,19 @@ private static Block groupingBlock(String grouping, String blockType) {
}
yield builder.build();
}
case ORDINALS -> {
IntVector.Builder ordinals = blockFactory.newIntVectorBuilder(BLOCK_LENGTH * valuesPerGroup);
for (int i = 0; i < BLOCK_LENGTH; i++) {
for (int v = 0; v < valuesPerGroup; v++) {
ordinals.appendInt(i % GROUPS);
}
}
BytesRefVector.Builder bytes = blockFactory.newBytesRefVectorBuilder(BLOCK_LENGTH * valuesPerGroup);
for (int i = 0; i < GROUPS; i++) {
bytes.appendBytesRef(bytesGroup(i));
}
yield new OrdinalBytesRefVector(ordinals.build(), bytes.build()).asBlock();
}
default -> throw new UnsupportedOperationException("unsupported grouping [" + grouping + "]");
};
}
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/114021.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114021
summary: "ESQL: Speed up grouping by bytes"
area: ES|QL
type: enhancement
issues: []

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.Int3Hash;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.common.util.LongLongHash;
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
Expand All @@ -28,14 +29,37 @@
import java.util.List;

/**
* A specialized hash table implementation maps values of a {@link Block} to ids (in longs).
* This class delegates to {@link LongHash} or {@link BytesRefHash}.
*
* @see LongHash
* @see BytesRefHash
* Specialized hash table implementations that map rows to a <strong>set</strong>
* of bucket IDs to which they belong to implement {@code GROUP BY} expressions.
* <p>
* A row is always in at least one bucket so the results are never {@code null}.
* {@code null} valued key columns will map to some integer bucket id.
* If none of key columns are multivalued then the output is always an
* {@link IntVector}. If any of the key are multivalued then a row is
* in a bucket for each value. If more than one key is multivalued then
* the row is in the combinatorial explosion of all value combinations.
* Luckily for the number of values rows can only be in each bucket once.
* Unluckily, it's the responsibility of {@link BlockHash} to remove those
* duplicates.
* </p>
* <p>
* These classes typically delegate to some combination of {@link BytesRefHash},
* {@link LongHash}, {@link LongLongHash}, {@link Int3Hash}. They don't
* <strong>technically</strong> have to be hash tables, so long as they
* implement the deduplication semantics above and vend integer ids.
* </p>
* <p>
* The integer ids are assigned to offsets into arrays of aggregation states
* so its permissible to have gaps in the ints. But large gaps are a bad
* idea because they'll waste space in the aggregations that use these
* positions. For example, {@link BooleanBlockHash} assigns {@code 0} to
* {@code null}, {@code 1} to {@code false}, and {@code 1} to {@code true}
* and that's <strong>fine</strong> and simple and good because it'll never
* leave a big gap, even if we never see {@code null}.
* </p>
*/
public abstract sealed class BlockHash implements Releasable, SeenGroupIds //
permits BooleanBlockHash, BytesRefBlockHash, DoubleBlockHash, IntBlockHash, LongBlockHash, BytesRef3BlockHash, //
permits BooleanBlockHash, BytesRefBlockHash, DoubleBlockHash, IntBlockHash, LongBlockHash, BytesRef2BlockHash, BytesRef3BlockHash, //
NullBlockHash, PackedValuesBlockHash, BytesRefLongBlockHash, LongLongBlockHash, TimeSeriesBlockHash {

protected final BlockFactory blockFactory;
Expand Down Expand Up @@ -98,8 +122,19 @@ public static BlockHash build(List<GroupSpec> groups, BlockFactory blockFactory,
if (groups.size() == 1) {
return newForElementType(groups.get(0).channel(), groups.get(0).elementType(), blockFactory);
}
if (groups.size() == 3 && groups.stream().allMatch(g -> g.elementType == ElementType.BYTES_REF)) {
return new BytesRef3BlockHash(blockFactory, groups.get(0).channel, groups.get(1).channel, groups.get(2).channel, emitBatchSize);
if (groups.stream().allMatch(g -> g.elementType == ElementType.BYTES_REF)) {
switch (groups.size()) {
case 2:
return new BytesRef2BlockHash(blockFactory, groups.get(0).channel, groups.get(1).channel, emitBatchSize);
case 3:
return new BytesRef3BlockHash(
blockFactory,
groups.get(0).channel,
groups.get(1).channel,
groups.get(2).channel,
emitBatchSize
);
}
}
if (allowBrokenOptimizations && groups.size() == 2) {
var g1 = groups.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import static org.elasticsearch.compute.operator.mvdedupe.MultivalueDedupeBoolean.TRUE_ORD;

/**
* Maps a {@link BooleanBlock} column to group ids. Assigns group
* {@code 0} to {@code false} and group {@code 1} to {@code true}.
* Maps a {@link BooleanBlock} column to group ids. Assigns
* {@code 0} to {@code null}, {@code 1} to {@code false}, and
* {@code 2} to {@code true}.
*/
final class BooleanBlockHash extends BlockHash {
private final int channel;
Expand Down
Loading

0 comments on commit 1212dee

Please sign in to comment.