Skip to content

Commit

Permalink
more fixes and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisHegarty committed Sep 29, 2023
1 parent 4896ab7 commit b27bafa
Show file tree
Hide file tree
Showing 17 changed files with 504 additions and 397 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BooleanVector;
import org.elasticsearch.compute.data.IntArrayVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
Expand Down Expand Up @@ -41,22 +40,28 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
BooleanBlock block = page.getBlock(channel);
BooleanVector vector = block.asVector();
if (vector == null) {
addInput.add(0, add(block));
try (IntBlock groupIds = add(block)) {
addInput.add(0, groupIds);
}
} else {
addInput.add(0, add(vector));
try (IntVector groupIds = add(vector)) {
addInput.add(0, groupIds);
}
}
}

private IntVector add(BooleanVector vector) {
int[] groups = new int[vector.getPositionCount()];
for (int i = 0; i < vector.getPositionCount(); i++) {
groups[i] = MultivalueDedupeBoolean.hashOrd(everSeen, vector.getBoolean(i));
int positions = vector.getPositionCount();
try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) {
for (int i = 0; i < positions; i++) {
builder.appendInt(MultivalueDedupeBoolean.hashOrd(everSeen, vector.getBoolean(i)));
}
return builder.build();
}
return new IntArrayVector(groups, groups.length);
}

private IntBlock add(BooleanBlock block) {
return new MultivalueDedupeBoolean(block).hash(everSeen);
return new MultivalueDedupeBoolean(block).hash(everSeen); // TODO: block builder
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.aggregation.SeenGroupIds;
import org.elasticsearch.compute.data.BytesRefArrayVector;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.BytesRefVector;
import org.elasticsearch.compute.data.IntArrayVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
Expand Down Expand Up @@ -58,22 +56,28 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
BytesRefBlock block = page.getBlock(channel);
BytesRefVector vector = block.asVector();
if (vector == null) {
addInput.add(0, add(block));
try (IntBlock groupIds = add(block)) {
addInput.add(0, groupIds);
}
} else {
addInput.add(0, add(vector));
try (IntVector groupIds = add(vector)) {
addInput.add(0, groupIds);
}
}
}

private IntVector add(BytesRefVector vector) {
int[] groups = new int[vector.getPositionCount()];
for (int i = 0; i < vector.getPositionCount(); i++) {
groups[i] = Math.toIntExact(hashOrdToGroupNullReserved(bytesRefHash.add(vector.getBytesRef(i, bytes))));
int positions = vector.getPositionCount();
try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) {
for (int i = 0; i < positions; i++) {
builder.appendInt(Math.toIntExact(hashOrdToGroupNullReserved(bytesRefHash.add(vector.getBytesRef(i, bytes)))));
}
return builder.build();
}
return new IntArrayVector(groups, vector.getPositionCount());
}

private IntBlock add(BytesRefBlock block) {
MultivalueDedupe.HashResult result = new MultivalueDedupeBytesRef(block).hash(bytesRefHash);
MultivalueDedupe.HashResult result = new MultivalueDedupeBytesRef(block).hash(bytesRefHash); // TODO: block builder
seenNull |= result.sawNull();
return result.ords();
}
Expand All @@ -87,21 +91,22 @@ public BytesRefBlock[] getKeys() {
// TODO replace with takeBytesRefsOwnership ?!

if (seenNull) {
BytesRefBlock.Builder builder = BytesRefBlock.newBlockBuilder(Math.toIntExact(bytesRefHash.size() + 1));
builder.appendNull();
BytesRef spare = new BytesRef();
for (long i = 0; i < bytesRefHash.size(); i++) {
builder.appendBytesRef(bytesRefHash.get(i, spare));
try (var builder = blockFactory.newBytesRefBlockBuilder(Math.toIntExact(bytesRefHash.size() + 1))) {
builder.appendNull();
BytesRef spare = new BytesRef();
for (long i = 0; i < bytesRefHash.size(); i++) {
builder.appendBytesRef(bytesRefHash.get(i, spare));
}
return new BytesRefBlock[] { builder.build() };
}
return new BytesRefBlock[] { builder.build() };
}

final int size = Math.toIntExact(bytesRefHash.size());
try (BytesStreamOutput out = new BytesStreamOutput()) {
bytesRefHash.getBytesRefs().writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
return new BytesRefBlock[] {
new BytesRefArrayVector(new BytesRefArray(in, BigArrays.NON_RECYCLING_INSTANCE), size).asBlock() };
blockFactory.newBytesRefArrayVector(new BytesRefArray(in, BigArrays.NON_RECYCLING_INSTANCE), size).asBlock() };
}
} catch (IOException e) {
throw new IllegalStateException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.aggregation.SeenGroupIds;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.DoubleArrayBlock;
import org.elasticsearch.compute.data.DoubleArrayVector;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.DoubleVector;
import org.elasticsearch.compute.data.IntArrayVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
Expand Down Expand Up @@ -54,22 +51,28 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
DoubleBlock block = page.getBlock(channel);
DoubleVector vector = block.asVector();
if (vector == null) {
addInput.add(0, add(block));
try (IntBlock groupIds = add(block)) {
addInput.add(0, groupIds);
}
} else {
addInput.add(0, add(vector));
try (IntVector groupIds = add(vector)) {
addInput.add(0, groupIds);
}
}
}

private IntVector add(DoubleVector vector) {
int[] groups = new int[vector.getPositionCount()];
for (int i = 0; i < vector.getPositionCount(); i++) {
groups[i] = Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(Double.doubleToLongBits(vector.getDouble(i)))));
int positions = vector.getPositionCount();
try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) {
for (int i = 0; i < positions; i++) {
builder.appendInt(Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(Double.doubleToLongBits(vector.getDouble(i))))));
}
return builder.build();
}
return new IntArrayVector(groups, groups.length);
}

private IntBlock add(DoubleBlock block) {
MultivalueDedupe.HashResult result = new MultivalueDedupeDouble(block).hash(longHash);
MultivalueDedupe.HashResult result = new MultivalueDedupeDouble(block).hash(longHash); // TODO: block builder
seenNull |= result.sawNull();
return result.ords();
}
Expand All @@ -85,7 +88,7 @@ public DoubleBlock[] getKeys() {
BitSet nulls = new BitSet(1);
nulls.set(0);
return new DoubleBlock[] {
new DoubleArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING) };
blockFactory.newDoubleArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING) };
}

final int size = Math.toIntExact(longHash.size());
Expand All @@ -95,7 +98,7 @@ public DoubleBlock[] getKeys() {
}

// TODO claim the array and wrap?
return new DoubleBlock[] { new DoubleArrayVector(keys, keys.length).asBlock() };
return new DoubleBlock[] { blockFactory.newDoubleArrayVector(keys, keys.length).asBlock() };
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,23 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
}
} else {
try (IntVector groupIds = add(vector)) {
addInput.add(0, add(groupIds));
addInput.add(0, groupIds);
}
}
}

private IntVector add(IntVector vector) {
long preAdjustBytes = blockFactory.preAdjustBreakerForInt(vector.getPositionCount());
int[] groups = new int[vector.getPositionCount()];
for (int i = 0; i < vector.getPositionCount(); i++) {
groups[i] = Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(vector.getInt(i))));
int positions = vector.getPositionCount();
try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) {
for (int i = 0; i < positions; i++) {
builder.appendInt(Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(vector.getInt(i)))));
}
return builder.build();
}
return blockFactory.newIntArrayVector(groups, groups.length, preAdjustBytes);
}

private IntBlock add(IntBlock block) {
MultivalueDedupe.HashResult result = new MultivalueDedupeInt(block).hash(longHash);
MultivalueDedupe.HashResult result = new MultivalueDedupeInt(block).hash(longHash); // TODO: block builder
seenNull |= result.sawNull();
return result.ords();
}
Expand All @@ -83,7 +84,8 @@ public IntBlock[] getKeys() {
}
BitSet nulls = new BitSet(1);
nulls.set(0);
return new IntBlock[] { blockFactory.newIntArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING) };
return new IntBlock[] {
blockFactory.newIntArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING) };
}
final int size = Math.toIntExact(longHash.size());
final int[] keys = new int[size];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.aggregation.SeenGroupIds;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.IntArrayVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongArrayBlock;
import org.elasticsearch.compute.data.LongArrayVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
Expand Down Expand Up @@ -54,22 +51,28 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
LongBlock block = page.getBlock(channel);
LongVector vector = block.asVector();
if (vector == null) {
addInput.add(0, add(block));
try (IntBlock groupIds = add(block)) {
addInput.add(0, groupIds);
}
} else {
addInput.add(0, add(vector));
try (IntVector groupIds = add(vector)) {
addInput.add(0, groupIds);
}
}
}

private IntVector add(LongVector vector) {
int[] groups = new int[vector.getPositionCount()];
for (int i = 0; i < vector.getPositionCount(); i++) {
groups[i] = Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(vector.getLong(i))));
int positions = vector.getPositionCount();
try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) {
for (int i = 0; i < positions; i++) {
builder.appendInt(Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(vector.getLong(i)))));
}
return builder.build();
}
return new IntArrayVector(groups, groups.length);
}

private IntBlock add(LongBlock block) {
MultivalueDedupe.HashResult result = new MultivalueDedupeLong(block).hash(longHash);
MultivalueDedupe.HashResult result = new MultivalueDedupeLong(block).hash(longHash); // TODO: block builder
seenNull |= result.sawNull();
return result.ords();
}
Expand All @@ -85,7 +88,7 @@ public LongBlock[] getKeys() {
BitSet nulls = new BitSet(1);
nulls.set(0);
return new LongBlock[] {
new LongArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING) };
blockFactory.newLongArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING) };
}

final int size = Math.toIntExact(longHash.size());
Expand All @@ -95,7 +98,7 @@ public LongBlock[] getKeys() {
}

// TODO call something like takeKeyOwnership to claim the keys array directly
return new LongBlock[] { new LongArrayVector(keys, keys.length).asBlock() };
return new LongBlock[] { blockFactory.newLongArrayVector(keys, keys.length).asBlock() };
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.aggregation.SeenGroupIds;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.IntArrayVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

/**
Expand Down Expand Up @@ -52,24 +52,30 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
LongVector vector1 = block1.asVector();
LongVector vector2 = block2.asVector();
if (vector1 != null && vector2 != null) {
addInput.add(0, add(vector1, vector2));
try (IntVector groupIds = add(vector1, vector2)) {
addInput.add(0, groupIds);
}
} else {
new AddBlock(block1, block2, addInput).add();
try (var addBlock = new AddBlock(block1, block2, addInput)) {
addBlock.add();
}
}
}

private IntVector add(LongVector vector1, LongVector vector2) {
int positions = vector1.getPositionCount();
final int[] ords = new int[positions];
for (int i = 0; i < positions; i++) {
ords[i] = Math.toIntExact(hashOrdToGroup(hash.add(vector1.getLong(i), vector2.getLong(i))));
try (var builder = IntVector.newVectorFixedBuilder(positions, blockFactory)) {
for (int i = 0; i < positions; i++) {
builder.appendInt(Math.toIntExact(hashOrdToGroup(hash.add(vector1.getLong(i), vector2.getLong(i)))));
}
return builder.build();
}
return new IntArrayVector(ords, positions);
}

private static final long[] EMPTY = new long[0];

private class AddBlock extends AbstractAddBlock {
// TODO: this uses the non-breaking block factory - update to use this blockFactory
private class AddBlock extends AbstractAddBlock implements Releasable {
private final LongBlock block1;
private final LongBlock block2;

Expand Down Expand Up @@ -131,6 +137,11 @@ void add() {
}
emitOrds();
}

@Override
public void close() {
Releasables.closeExpectNoException(block1, block2);
}
}

static class AbstractAddBlock {
Expand Down Expand Up @@ -186,8 +197,8 @@ static int add(long[] seen, int nextSeen, long v) {
@Override
public Block[] getKeys() {
int positions = (int) hash.size();
LongVector.Builder keys1 = LongVector.newVectorBuilder(positions);
LongVector.Builder keys2 = LongVector.newVectorBuilder(positions);
LongVector.Builder keys1 = blockFactory.newLongVectorBuilder(positions);
LongVector.Builder keys2 = blockFactory.newLongVectorBuilder(positions);
for (long i = 0; i < positions; i++) {
keys1.appendLong(hash.getKey1(i));
keys2.appendLong(hash.getKey2(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.HashAggregationOperator;
import org.elasticsearch.compute.operator.MultivalueDedupe;
import org.elasticsearch.core.Releasables;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -202,7 +203,7 @@ public Block[] getKeys() {
for (int g = 0; g < builders.length; g++) {
ElementType elementType = groups[g].spec.elementType();
decoders[g] = BatchEncoder.decoder(elementType);
builders[g] = elementType.newBlockBuilder(size);
builders[g] = elementType.newBlockBuilder(size, blockFactory);
}

BytesRef[] values = new BytesRef[(int) Math.min(100, bytesRefHash.size())];
Expand Down Expand Up @@ -236,6 +237,7 @@ public Block[] getKeys() {
for (int g = 0; g < keyBlocks.length; g++) {
keyBlocks[g] = builders[g].build();
}
Releasables.closeExpectNoException(builders);
return keyBlocks;
}

Expand Down
Loading

0 comments on commit b27bafa

Please sign in to comment.