Skip to content

Commit

Permalink
revert changes in block hash, for now
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisHegarty committed Sep 19, 2023
1 parent b938dd6 commit c198d53
Show file tree
Hide file tree
Showing 16 changed files with 64 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private static Operator operator(String grouping, String op, String dataType) {
DriverContext driverContext = driverContext();
return new HashAggregationOperator(
List.of(supplier(op, dataType, groups.size()).groupingAggregatorFactory(AggregatorMode.SINGLE)),
() -> BlockHash.build(groups, driverContext, 16 * 1024, false),
() -> BlockHash.build(groups, BIG_ARRAYS, 16 * 1024, false),
driverContext
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.HashAggregationOperator;
import org.elasticsearch.core.Releasable;

Expand Down Expand Up @@ -70,39 +69,39 @@ public abstract sealed class BlockHash implements Releasable, SeenGroupIds //
*/
public static BlockHash build(
List<HashAggregationOperator.GroupSpec> groups,
DriverContext driverContext,
BigArrays bigArrays,
int emitBatchSize,
boolean allowBrokenOptimizations
) {
if (groups.size() == 1) {
return newForElementType(groups.get(0).channel(), groups.get(0).elementType(), driverContext);
return newForElementType(groups.get(0).channel(), groups.get(0).elementType(), bigArrays);
}
if (allowBrokenOptimizations && groups.size() == 2) {
var g1 = groups.get(0);
var g2 = groups.get(1);
if (g1.elementType() == ElementType.LONG && g2.elementType() == ElementType.LONG) {
return new LongLongBlockHash(driverContext, g1.channel(), g2.channel(), emitBatchSize);
return new LongLongBlockHash(bigArrays, g1.channel(), g2.channel(), emitBatchSize);
}
if (g1.elementType() == ElementType.BYTES_REF && g2.elementType() == ElementType.LONG) {
return new BytesRefLongBlockHash(driverContext, g1.channel(), g2.channel(), false, emitBatchSize);
return new BytesRefLongBlockHash(bigArrays, g1.channel(), g2.channel(), false, emitBatchSize);
}
if (g1.elementType() == ElementType.LONG && g2.elementType() == ElementType.BYTES_REF) {
return new BytesRefLongBlockHash(driverContext, g2.channel(), g1.channel(), true, emitBatchSize);
return new BytesRefLongBlockHash(bigArrays, g2.channel(), g1.channel(), true, emitBatchSize);
}
}
return new PackedValuesBlockHash(groups, driverContext, emitBatchSize);
return new PackedValuesBlockHash(groups, bigArrays, emitBatchSize);
}

/**
* Creates a specialized hash table that maps a {@link Block} of the given input element type to ids.
*/
private static BlockHash newForElementType(int channel, ElementType type, DriverContext driverContext) {
private static BlockHash newForElementType(int channel, ElementType type, BigArrays bigArrays) {
return switch (type) {
case BOOLEAN -> new BooleanBlockHash(channel, driverContext);
case INT -> new IntBlockHash(channel, driverContext);
case LONG -> new LongBlockHash(channel, driverContext);
case DOUBLE -> new DoubleBlockHash(channel, driverContext);
case BYTES_REF -> new BytesRefBlockHash(channel, driverContext);
case BOOLEAN -> new BooleanBlockHash(channel);
case INT -> new IntBlockHash(channel, bigArrays);
case LONG -> new LongBlockHash(channel, bigArrays);
case DOUBLE -> new DoubleBlockHash(channel, bigArrays);
case BYTES_REF -> new BytesRefBlockHash(channel, bigArrays);
default -> throw new IllegalArgumentException("unsupported grouping element type [" + type + "]");
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BooleanVector;
import org.elasticsearch.compute.data.IntArrayVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.MultivalueDedupeBoolean;

import static org.elasticsearch.compute.operator.MultivalueDedupeBoolean.FALSE_ORD;
Expand All @@ -30,11 +29,9 @@
final class BooleanBlockHash extends BlockHash {
private final int channel;
private final boolean[] everSeen = new boolean[TRUE_ORD + 1];
private final BlockFactory blockFactory;

BooleanBlockHash(int channel, DriverContext driverContext) {
BooleanBlockHash(int channel) {
this.channel = channel;
this.blockFactory = driverContext.blockFactory();
}

@Override
Expand All @@ -53,7 +50,7 @@ private IntVector add(BooleanVector vector) {
for (int i = 0; i < vector.getPositionCount(); i++) {
groups[i] = MultivalueDedupeBoolean.hashOrd(everSeen, vector.getBoolean(i));
}
return blockFactory.newIntArrayVector(groups, groups.length);
return new IntArrayVector(groups, groups.length);
}

private IntBlock add(BooleanBlock block) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.aggregation.SeenGroupIds;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefArrayVector;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.BytesRefVector;
import org.elasticsearch.compute.data.IntArrayVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.MultivalueDedupe;
import org.elasticsearch.compute.operator.MultivalueDedupeBytesRef;

Expand All @@ -45,12 +45,10 @@ final class BytesRefBlockHash extends BlockHash {
* </p>
*/
private boolean seenNull;
private final BlockFactory blockFactory;

BytesRefBlockHash(int channel, DriverContext driverContext) {
BytesRefBlockHash(int channel, BigArrays bigArrays) {
this.channel = channel;
this.blockFactory = driverContext.blockFactory();
this.bytesRefHash = new BytesRefHash(1, driverContext.bigArrays());
this.bytesRefHash = new BytesRefHash(1, bigArrays);
}

@Override
Expand All @@ -69,7 +67,7 @@ private IntVector add(BytesRefVector vector) {
for (int i = 0; i < vector.getPositionCount(); i++) {
groups[i] = Math.toIntExact(hashOrdToGroupNullReserved(bytesRefHash.add(vector.getBytesRef(i, bytes))));
}
return blockFactory.newIntArrayVector(groups, vector.getPositionCount());
return new IntArrayVector(groups, vector.getPositionCount());
}

private IntBlock add(BytesRefBlock block) {
Expand Down Expand Up @@ -101,7 +99,7 @@ public BytesRefBlock[] getKeys() {
bytesRefHash.getBytesRefs().writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
return new BytesRefBlock[] {
blockFactory.newBytesRefArrayVector(new BytesRefArray(in, BigArrays.NON_RECYCLING_INSTANCE), size).asBlock() };
new BytesRefArrayVector(new BytesRefArray(in, BigArrays.NON_RECYCLING_INSTANCE), size).asBlock() };
}
} catch (IOException e) {
throw new IllegalStateException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.aggregation.SeenGroupIds;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.BytesRefVector;
import org.elasticsearch.compute.data.IntArrayVector;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.core.Releasables;

/**
Expand All @@ -36,21 +35,19 @@ final class BytesRefLongBlockHash extends BlockHash {
private final int emitBatchSize;
private final BytesRefHash bytesHash;
private final LongLongHash finalHash;
private final BlockFactory blockFactory;

BytesRefLongBlockHash(DriverContext driverContext, int channel1, int channel2, boolean reverseOutput, int emitBatchSize) {
BytesRefLongBlockHash(BigArrays bigArrays, int channel1, int channel2, boolean reverseOutput, int emitBatchSize) {
this.channel1 = channel1;
this.channel2 = channel2;
this.reverseOutput = reverseOutput;
this.emitBatchSize = emitBatchSize;
this.blockFactory = driverContext.blockFactory();

boolean success = false;
BytesRefHash bytesHash = null;
LongLongHash longHash = null;
try {
bytesHash = new BytesRefHash(1, driverContext.bigArrays());
longHash = new LongLongHash(1, driverContext.bigArrays());
bytesHash = new BytesRefHash(1, bigArrays);
longHash = new LongLongHash(1, bigArrays);
this.bytesHash = bytesHash;
this.finalHash = longHash;
success = true;
Expand Down Expand Up @@ -88,7 +85,7 @@ public IntVector add(BytesRefVector vector1, LongVector vector2) {
long hash1 = hashOrdToGroup(bytesHash.add(vector1.getBytesRef(i, scratch)));
ords[i] = Math.toIntExact(hashOrdToGroup(finalHash.add(hash1, vector2.getLong(i))));
}
return blockFactory.newIntArrayVector(ords, positions);
return new IntArrayVector(ords, positions);
}

private static final long[] EMPTY = new long[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.aggregation.SeenGroupIds;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.DoubleArrayBlock;
import org.elasticsearch.compute.data.DoubleArrayVector;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.DoubleVector;
import org.elasticsearch.compute.data.IntArrayVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.MultivalueDedupe;
import org.elasticsearch.compute.operator.MultivalueDedupeDouble;

Expand All @@ -40,12 +41,10 @@ final class DoubleBlockHash extends BlockHash {
* </p>
*/
private boolean seenNull;
private final BlockFactory blockFactory;

DoubleBlockHash(int channel, DriverContext driverContext) {
DoubleBlockHash(int channel, BigArrays bigArrays) {
this.channel = channel;
this.blockFactory = driverContext.blockFactory();
this.longHash = new LongHash(1, driverContext.bigArrays());
this.longHash = new LongHash(1, bigArrays);
}

@Override
Expand All @@ -64,7 +63,7 @@ private IntVector add(DoubleVector vector) {
for (int i = 0; i < vector.getPositionCount(); i++) {
groups[i] = Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(Double.doubleToLongBits(vector.getDouble(i)))));
}
return blockFactory.newIntArrayVector(groups, groups.length);
return new IntArrayVector(groups, groups.length);
}

private IntBlock add(DoubleBlock block) {
Expand All @@ -83,7 +82,7 @@ public DoubleBlock[] getKeys() {
}
BitSet nulls = new BitSet(1);
nulls.set(0);
return new DoubleBlock[] { blockFactory.newDoubleArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.ASCENDING) };
return new DoubleBlock[] { new DoubleArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.ASCENDING) };
}

final int size = Math.toIntExact(longHash.size());
Expand All @@ -93,7 +92,7 @@ public DoubleBlock[] getKeys() {
}

// TODO claim the array and wrap?
return new DoubleBlock[] { blockFactory.newDoubleArrayVector(keys, keys.length).asBlock() };
return new DoubleBlock[] { new DoubleArrayVector(keys, keys.length).asBlock() };
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.aggregation.SeenGroupIds;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.IntArrayBlock;
import org.elasticsearch.compute.data.IntArrayVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.MultivalueDedupe;
import org.elasticsearch.compute.operator.MultivalueDedupeInt;
import org.elasticsearch.core.Releasables;
Expand All @@ -38,12 +38,10 @@ final class IntBlockHash extends BlockHash {
* </p>
*/
private boolean seenNull;
private final BlockFactory blockFactory;

IntBlockHash(int channel, DriverContext driverContext) {
IntBlockHash(int channel, BigArrays bigArrays) {
this.channel = channel;
this.blockFactory = driverContext.blockFactory();
this.longHash = new LongHash(1, driverContext.bigArrays());
this.longHash = new LongHash(1, bigArrays);
}

@Override
Expand All @@ -63,7 +61,7 @@ private IntVector add(IntVector vector) {
for (int i = 0; i < vector.getPositionCount(); i++) {
groups[i] = Math.toIntExact(hashOrdToGroupNullReserved(longHash.add(vector.getInt(i))));
}
return blockFactory.newIntArrayVector(groups, groups.length);
return new IntArrayVector(groups, groups.length);
}

private IntBlock add(IntBlock block) {
Expand All @@ -82,14 +80,14 @@ public IntBlock[] getKeys() {
}
BitSet nulls = new BitSet(1);
nulls.set(0);
return new IntBlock[] { blockFactory.newIntArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.ASCENDING) };
return new IntBlock[] { new IntArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.ASCENDING) };
}
final int size = Math.toIntExact(longHash.size());
final int[] keys = new int[size];
for (int i = 0; i < size; i++) {
keys[i] = (int) longHash.get(i);
}
return new IntBlock[] { blockFactory.newIntArrayVector(keys, keys.length).asBlock() };
return new IntBlock[] { new IntArrayVector(keys, keys.length).asBlock() };
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.aggregation.SeenGroupIds;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongArrayBlock;
import org.elasticsearch.compute.data.LongArrayVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.MultivalueDedupe;
import org.elasticsearch.compute.operator.MultivalueDedupeLong;

Expand All @@ -31,7 +31,6 @@
final class LongBlockHash extends BlockHash {
private final int channel;
private final LongHash longHash;
private final BlockFactory blockFactory;

/**
* Have we seen any {@code null} values?
Expand All @@ -42,10 +41,9 @@ final class LongBlockHash extends BlockHash {
*/
private boolean seenNull;

LongBlockHash(int channel, DriverContext driverContext) {
LongBlockHash(int channel, BigArrays bigArrays) {
this.channel = channel;
this.blockFactory = driverContext.blockFactory();
this.longHash = new LongHash(1, driverContext.bigArrays());
this.longHash = new LongHash(1, bigArrays);
}

@Override
Expand Down Expand Up @@ -83,7 +81,7 @@ public LongBlock[] getKeys() {
}
BitSet nulls = new BitSet(1);
nulls.set(0);
return new LongBlock[] { blockFactory.newLongArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.ASCENDING) };
return new LongBlock[] { new LongArrayBlock(keys, keys.length, null, nulls, Block.MvOrdering.ASCENDING) };
}

final int size = Math.toIntExact(longHash.size());
Expand All @@ -93,7 +91,7 @@ public LongBlock[] getKeys() {
}

// TODO call something like takeKeyOwnership to claim the keys array directly
return new LongBlock[] { blockFactory.newLongArrayVector(keys, keys.length).asBlock() };
return new LongBlock[] { new LongArrayVector(keys, keys.length).asBlock() };
}

@Override
Expand Down
Loading

0 comments on commit c198d53

Please sign in to comment.