Skip to content

Commit

Permalink
ESQL: More tracking in BlockHash impls (elastic#101488)
Browse files Browse the repository at this point in the history
This adds memory tracking to more parts of the `BlockHash`
implementations. It also reworks the tests so they assert based on
callbacks which is more like how they are used in production.
  • Loading branch information
nik9000 authored Oct 28, 2023
1 parent 93b620e commit bd22bc9
Show file tree
Hide file tree
Showing 13 changed files with 1,053 additions and 1,006 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/101488.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 101488
summary: "ESQL: More tracking in `BlockHash` impls"
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {

private IntVector add(BooleanVector vector) {
int positions = vector.getPositionCount();
try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) {
try (var builder = blockFactory.newIntVectorFixedBuilder(positions)) {
for (int i = 0; i < positions; i++) {
builder.appendInt(MultivalueDedupeBoolean.hashOrd(everSeen, vector.getBoolean(i)));
}
Expand All @@ -75,28 +75,30 @@ private IntBlock add(BooleanBlock block) {

@Override
public BooleanBlock[] getKeys() {
BooleanBlock.Builder builder = BooleanBlock.newBlockBuilder(everSeen.length);
if (everSeen[NULL_ORD]) {
builder.appendNull();
}
if (everSeen[FALSE_ORD]) {
builder.appendBoolean(false);
}
if (everSeen[TRUE_ORD]) {
builder.appendBoolean(true);
try (BooleanBlock.Builder builder = blockFactory.newBooleanBlockBuilder(everSeen.length)) {
if (everSeen[NULL_ORD]) {
builder.appendNull();
}
if (everSeen[FALSE_ORD]) {
builder.appendBoolean(false);
}
if (everSeen[TRUE_ORD]) {
builder.appendBoolean(true);
}
return new BooleanBlock[] { builder.build() };
}
return new BooleanBlock[] { builder.build() };
}

@Override
public IntVector nonEmpty() {
IntVector.Builder builder = IntVector.newVectorBuilder(everSeen.length);
for (int i = 0; i < everSeen.length; i++) {
if (everSeen[i]) {
builder.appendInt(i);
try (IntVector.Builder builder = blockFactory.newIntVectorBuilder(everSeen.length)) {
for (int i = 0; i < everSeen.length; i++) {
if (everSeen[i]) {
builder.appendInt(i);
}
}
return builder.build();
}
return builder.build();
}

public BitArray seenGroupIds(BigArrays bigArrays) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
addInput.add(0, groupIds);
}
} else {
try (IntBlock groupIds = add(bytesVector).asBlock()) {
addInput.add(0, groupIds.asVector());
try (IntVector groupIds = add(bytesVector)) {
addInput.add(0, groupIds);
}
}
}
}

private IntVector add(BytesRefVector vector) {
int positions = vector.getPositionCount();
try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) {
try (var builder = blockFactory.newIntVectorFixedBuilder(positions)) {
for (int i = 0; i < positions; i++) {
builder.appendInt(Math.toIntExact(hashOrdToGroupNullReserved(bytesRefHash.add(vector.getBytesRef(i, bytes)))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
if (vector1 != null && vector2 != null) {
addInput.add(0, add(vector1, vector2));
} else {
new AddBlock(block1, block2, addInput).add();
try (AddWork work = new AddWork(block1, block2, addInput)) {
work.add();
}
}
}

Expand All @@ -91,12 +93,12 @@ public IntVector add(BytesRefVector vector1, LongVector vector2) {

private static final long[] EMPTY = new long[0];

private class AddBlock extends LongLongBlockHash.AbstractAddBlock {
private class AddWork extends LongLongBlockHash.AbstractAddBlock {
private final BytesRefBlock block1;
private final LongBlock block2;

AddBlock(BytesRefBlock block1, LongBlock block2, GroupingAggregatorFunction.AddInput addInput) {
super(emitBatchSize, addInput);
AddWork(BytesRefBlock block1, LongBlock block2, GroupingAggregatorFunction.AddInput addInput) {
super(blockFactory, emitBatchSize, addInput);
this.block1 = block1;
this.block2 = block2;
}
Expand Down Expand Up @@ -165,18 +167,29 @@ void add() {
@Override
public Block[] getKeys() {
int positions = (int) finalHash.size();
BytesRefVector.Builder keys1 = BytesRefVector.newVectorBuilder(positions);
LongVector.Builder keys2 = LongVector.newVectorBuilder(positions);
BytesRef scratch = new BytesRef();
for (long i = 0; i < positions; i++) {
keys2.appendLong(finalHash.getKey2(i));
long h1 = finalHash.getKey1(i);
keys1.appendBytesRef(bytesHash.get(h1, scratch));
BytesRefVector k1 = null;
LongVector k2 = null;
try (
BytesRefVector.Builder keys1 = blockFactory.newBytesRefVectorBuilder(positions);
LongVector.Builder keys2 = blockFactory.newLongVectorBuilder(positions)
) {
BytesRef scratch = new BytesRef();
for (long i = 0; i < positions; i++) {
keys2.appendLong(finalHash.getKey2(i));
long h1 = finalHash.getKey1(i);
keys1.appendBytesRef(bytesHash.get(h1, scratch));
}
k1 = keys1.build();
k2 = keys2.build();
} finally {
if (k2 == null) {
Releasables.closeExpectNoException(k1);
}
}
if (reverseOutput) {
return new Block[] { keys2.build().asBlock(), keys1.build().asBlock() };
return new Block[] { k2.asBlock(), k1.asBlock() };
} else {
return new Block[] { keys1.build().asBlock(), keys2.build().asBlock() };
return new Block[] { k1.asBlock(), k2.asBlock() };
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
addInput.add(0, groupIds);
}
} else {
try (IntBlock groupIds = add(doubleVector).asBlock()) {
addInput.add(0, groupIds.asVector());
try (IntVector groupIds = add(doubleVector)) {
addInput.add(0, groupIds);
}
}
}
}

private IntVector add(DoubleVector vector) {
int positions = vector.getPositionCount();
try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) {
try (var builder = blockFactory.newIntVectorFixedBuilder(positions)) {
for (int i = 0; i < positions; i++) {
builder.appendInt(Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(Double.doubleToLongBits(vector.getDouble(i))))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
addInput.add(0, groupIds);
}
} else {
try (IntBlock groupIds = add(intVector).asBlock()) {
addInput.add(0, groupIds.asVector());
try (IntVector groupIds = add(intVector)) {
addInput.add(0, groupIds);
}
}
}
}

private IntVector add(IntVector vector) {
int positions = vector.getPositionCount();
try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) {
try (var builder = blockFactory.newIntVectorFixedBuilder(positions)) {
for (int i = 0; i < positions; i++) {
builder.appendInt(Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(vector.getInt(i)))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
addInput.add(0, groupIds);
}
} else {
try (IntBlock groupIds = add(longVector).asBlock()) { // Ugh!!
addInput.add(0, groupIds.asVector());
try (IntVector groupIds = add(longVector)) {
addInput.add(0, groupIds);
}
}
}
}

private IntVector add(LongVector vector) {
int positions = vector.getPositionCount();
try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) {
try (var builder = blockFactory.newIntVectorFixedBuilder(positions)) {
for (int i = 0; i < positions; i++) {
builder.appendInt(Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(vector.getLong(i)))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.aggregation.SeenGroupIds;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
Expand Down Expand Up @@ -74,13 +75,12 @@ private IntVector add(LongVector vector1, LongVector vector2) {

private static final long[] EMPTY = new long[0];

// TODO: this uses the non-breaking block factory - update to use this blockFactory
private class AddBlock extends AbstractAddBlock implements Releasable {
private class AddBlock extends AbstractAddBlock {
private final LongBlock block1;
private final LongBlock block2;

AddBlock(LongBlock block1, LongBlock block2, GroupingAggregatorFunction.AddInput addInput) {
super(emitBatchSize, addInput);
super(blockFactory, emitBatchSize, addInput);
this.block1 = block1;
this.block2 = block2;
}
Expand Down Expand Up @@ -137,26 +137,23 @@ void add() {
}
emitOrds();
}

@Override
public void close() {
Releasables.closeExpectNoException(block1, block2);
}
}

static class AbstractAddBlock {
static class AbstractAddBlock implements Releasable {
private final BlockFactory blockFactory;
private final int emitBatchSize;
private final GroupingAggregatorFunction.AddInput addInput;

private int positionOffset = 0;
private int added = 0;
protected IntBlock.Builder ords;

AbstractAddBlock(int emitBatchSize, GroupingAggregatorFunction.AddInput addInput) {
AbstractAddBlock(BlockFactory blockFactory, int emitBatchSize, GroupingAggregatorFunction.AddInput addInput) {
this.blockFactory = blockFactory;
this.emitBatchSize = emitBatchSize;
this.addInput = addInput;

this.ords = IntBlock.newBlockBuilder(emitBatchSize);
this.ords = blockFactory.newIntBlockBuilder(emitBatchSize);
}

protected final void addedValue(int position) {
Expand All @@ -174,13 +171,20 @@ protected final void addedValueInMultivaluePosition(int position) {
}

protected final void emitOrds() {
addInput.add(positionOffset, ords.build());
try (IntBlock ordsBlock = ords.build()) {
addInput.add(positionOffset, ordsBlock);
}
}

private void rollover(int position) {
emitOrds();
positionOffset = position;
ords = IntBlock.newBlockBuilder(emitBatchSize); // TODO add a clear method to the builder?
ords = blockFactory.newIntBlockBuilder(emitBatchSize); // TODO add a clear method to the builder?
}

@Override
public final void close() {
ords.close();
}
}

Expand All @@ -197,13 +201,24 @@ static int add(long[] seen, int nextSeen, long v) {
@Override
public Block[] getKeys() {
int positions = (int) hash.size();
LongVector.Builder keys1 = blockFactory.newLongVectorBuilder(positions);
LongVector.Builder keys2 = blockFactory.newLongVectorBuilder(positions);
for (long i = 0; i < positions; i++) {
keys1.appendLong(hash.getKey1(i));
keys2.appendLong(hash.getKey2(i));
LongVector k1 = null;
LongVector k2 = null;
try (
LongVector.Builder keys1 = blockFactory.newLongVectorBuilder(positions);
LongVector.Builder keys2 = blockFactory.newLongVectorBuilder(positions)
) {
for (long i = 0; i < positions; i++) {
keys1.appendLong(hash.getKey1(i));
keys2.appendLong(hash.getKey2(i));
}
k1 = keys1.build();
k2 = keys2.build();
} finally {
if (k2 == null) {
Releasables.close(k1);
}
}
return new Block[] { keys1.build().asBlock(), keys2.build().asBlock() };
return new Block[] { k1.asBlock(), k2.asBlock() };
}

@Override
Expand Down
Loading

0 comments on commit bd22bc9

Please sign in to comment.