Skip to content

Commit

Permalink
Add a Factory for building blocks and Vectors (elastic#99657)
Browse files Browse the repository at this point in the history
This commit adds a BlockFactory - an extra level of indirection when building blocks. The factory couples circuit breaking when building, allowing for incrementing the breaker as blocks and Vectors are built.

This PR adds the infrastructure to allow us to move the operators and implementations over to the factory, rather than actually moving all there at once.
  • Loading branch information
ChrisHegarty authored Sep 22, 2023
1 parent dd1eb20 commit 21d9de0
Show file tree
Hide file tree
Showing 118 changed files with 2,089 additions and 393 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.benchmark.compute.operator;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.AggregatorMode;
Expand All @@ -23,6 +24,7 @@
import org.elasticsearch.compute.aggregation.SumLongAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleArrayVector;
Expand Down Expand Up @@ -139,10 +141,11 @@ private static Operator operator(String grouping, String op, String dataType) {
);
default -> throw new IllegalArgumentException("unsupported grouping [" + grouping + "]");
};
DriverContext driverContext = driverContext();
return new HashAggregationOperator(
List.of(supplier(op, dataType, groups.size()).groupingAggregatorFactory(AggregatorMode.SINGLE)),
() -> BlockHash.build(groups, BIG_ARRAYS, 16 * 1024, false),
new DriverContext(BigArrays.NON_RECYCLING_INSTANCE)
driverContext
);
}

Expand Down Expand Up @@ -576,4 +579,11 @@ private static void run(String grouping, String op, String blockType, int opCoun
operator.finish();
checkExpected(grouping, op, blockType, dataType, operator.getOutput(), opCount);
}

static DriverContext driverContext() {
return new DriverContext(
BigArrays.NON_RECYCLING_INSTANCE,
BlockFactory.getInstance(new NoopCircuitBreaker("noop"), BigArrays.NON_RECYCLING_INSTANCE)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@

package org.elasticsearch.benchmark.compute.operator;

import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BooleanVector;
import org.elasticsearch.compute.data.IntBlock;
Expand Down Expand Up @@ -262,6 +264,9 @@ private static void run(String operation) {
}

static DriverContext driverContext() {
return new DriverContext(BigArrays.NON_RECYCLING_INSTANCE);
return new DriverContext(
BigArrays.NON_RECYCLING_INSTANCE,
BlockFactory.getInstance(new NoopCircuitBreaker("noop"), BigArrays.NON_RECYCLING_INSTANCE)
);
}
}
2 changes: 1 addition & 1 deletion x-pack/plugin/esql/compute/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ tasks.named('stringTemplates').configure {
var longProperties = prop("Long", "long", "LONG", "Long.BYTES")
var doubleProperties = prop("Double", "double", "DOUBLE", "Double.BYTES")
var bytesRefProperties = prop("BytesRef", "BytesRef", "BYTES_REF", "org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF")
var booleanProperties = prop("Boolean", "boolean", "BOOLEAN", "Boolean.BYTES")
var booleanProperties = prop("Boolean", "boolean", "BOOLEAN", "Byte.BYTES")
// primitive vectors
File vectorInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st")
template {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,18 @@ public final class BooleanArrayBlock extends AbstractArrayBlock implements Boole
private final boolean[] values;

public BooleanArrayBlock(boolean[] values, int positionCount, int[] firstValueIndexes, BitSet nulls, MvOrdering mvOrdering) {
super(positionCount, firstValueIndexes, nulls, mvOrdering);
this(values, positionCount, firstValueIndexes, nulls, mvOrdering, BlockFactory.getNonBreakingInstance());
}

public BooleanArrayBlock(
boolean[] values,
int positionCount,
int[] firstValueIndexes,
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
super(positionCount, firstValueIndexes, nulls, mvOrdering, blockFactory);
this.values = values;
}

Expand Down Expand Up @@ -58,7 +69,7 @@ public BooleanBlock expand() {
return new BooleanArrayVector(values, end).asBlock();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new BooleanArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED);
return new BooleanArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(boolean[] values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down Expand Up @@ -98,6 +109,6 @@ public String toString() {

@Override
public void close() {
// no-op
blockFactory.adjustBreaker(-ramBytesUsed(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ public final class BooleanArrayVector extends AbstractVector implements BooleanV
private final boolean[] values;

public BooleanArrayVector(boolean[] values, int positionCount) {
super(positionCount);
this(values, positionCount, BlockFactory.getNonBreakingInstance());
}

public BooleanArrayVector(boolean[] values, int positionCount, BlockFactory blockFactory) {
super(positionCount, blockFactory);
this.values = values;
}

Expand Down Expand Up @@ -78,8 +82,4 @@ public String toString() {
return getClass().getSimpleName() + "[positions=" + getPositionCount() + ", values=" + Arrays.toString(values) + ']';
}

@Override
public void close() {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ public final class BooleanBigArrayVector extends AbstractVector implements Boole
private final BitArray values;

public BooleanBigArrayVector(BitArray values, int positionCount) {
super(positionCount);
this(values, positionCount, BlockFactory.getNonBreakingInstance());
}

public BooleanBigArrayVector(BitArray values, int positionCount, BlockFactory blockFactory) {
super(positionCount, blockFactory);
this.values = values;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,24 @@ static int hash(BooleanBlock block) {
return result;
}

/** Returns a builder using the {@link BlockFactory#getNonBreakingInstance block factory}. */
// Eventually, we want to remove this entirely, always passing an explicit BlockFactory
static Builder newBlockBuilder(int estimatedSize) {
return new BooleanBlockBuilder(estimatedSize);
return newBlockBuilder(estimatedSize, BlockFactory.getNonBreakingInstance());
}

static Builder newBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
return blockFactory.newBooleanBlockBuilder(estimatedSize);
}

/** Returns a block using the {@link BlockFactory#getNonBreakingInstance block factory}. */
// Eventually, we want to remove this entirely, always passing an explicit BlockFactory
static BooleanBlock newConstantBlockWith(boolean value, int positions) {
return new ConstantBooleanVector(value, positions).asBlock();
return newConstantBlockWith(value, positions, BlockFactory.getNonBreakingInstance());
}

static BooleanBlock newConstantBlockWith(boolean value, int positions, BlockFactory blockFactory) {
return blockFactory.newConstantBooleanBlockWith(value, positions);
}

sealed interface Builder extends Block.Builder permits BooleanBlockBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ final class BooleanBlockBuilder extends AbstractBlockBuilder implements BooleanB

private boolean[] values;

BooleanBlockBuilder(int estimatedSize) {
values = new boolean[Math.max(estimatedSize, 2)];
BooleanBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
int initialSize = Math.max(estimatedSize, 2);
adjustBreaker(initialSize);
values = new boolean[initialSize];
}

@Override
Expand All @@ -31,6 +34,11 @@ public BooleanBlockBuilder appendBoolean(boolean value) {
return this;
}

@Override
protected int elementSize() {
return Byte.BYTES;
}

@Override
protected int valuesLength() {
return values.length;
Expand Down Expand Up @@ -171,17 +179,21 @@ public BooleanBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) {
@Override
public BooleanBlock build() {
finish();
BooleanBlock block;
if (hasNonNullValue && positionCount == 1 && valueCount == 1) {
return new ConstantBooleanVector(values[0], 1).asBlock();
block = new ConstantBooleanVector(values[0], 1, blockFactory).asBlock();
} else {
if (values.length - valueCount > 1024 || valueCount < (values.length / 2)) {
values = Arrays.copyOf(values, valueCount);
}
if (isDense() && singleValued()) {
return new BooleanArrayVector(values, positionCount).asBlock();
block = new BooleanArrayVector(values, positionCount, blockFactory).asBlock();
} else {
return new BooleanArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering);
block = new BooleanArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true);
return block;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,14 @@ default void writeTo(StreamOutput out) throws IOException {
}
}

/** Returns a builder using the {@link BlockFactory#getNonBreakingInstance block factory}. */
// Eventually, we want to remove this entirely, always passing an explicit BlockFactory
static Builder newVectorBuilder(int estimatedSize) {
return new BooleanVectorBuilder(estimatedSize);
return newVectorBuilder(estimatedSize, BlockFactory.getNonBreakingInstance());
}

static Builder newVectorBuilder(int estimatedSize, BlockFactory blockFactory) {
return blockFactory.newBooleanVectorBuilder(estimatedSize);
}

sealed interface Builder extends Vector.Builder permits BooleanVectorBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.core.Releasables;

/**
Expand All @@ -16,12 +15,10 @@
*/
public final class BooleanVectorBlock extends AbstractVectorBlock implements BooleanBlock {

private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(BooleanVectorBlock.class);

private final BooleanVector vector;

BooleanVectorBlock(BooleanVector vector) {
super(vector.getPositionCount());
super(vector.getPositionCount(), vector.blockFactory());
this.vector = vector;
}

Expand Down Expand Up @@ -52,7 +49,7 @@ public BooleanBlock filter(int... positions) {

@Override
public long ramBytesUsed() {
return RAM_BYTES_USED + RamUsageEstimator.sizeOf(vector);
return vector.ramBytesUsed();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ final class BooleanVectorBuilder extends AbstractVectorBuilder implements Boolea

private boolean[] values;

BooleanVectorBuilder(int estimatedSize) {
BooleanVectorBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
int initialSize = Math.max(estimatedSize, 2);
adjustBreaker(initialSize);
values = new boolean[Math.max(estimatedSize, 2)];
}

Expand All @@ -29,6 +32,11 @@ public BooleanVectorBuilder appendBoolean(boolean value) {
return this;
}

@Override
protected int elementSize() {
return Byte.BYTES;
}

@Override
protected int valuesLength() {
return values.length;
Expand All @@ -41,12 +49,17 @@ protected void growValuesArray(int newSize) {

@Override
public BooleanVector build() {
BooleanVector vector;
if (valueCount == 1) {
return new ConstantBooleanVector(values[0], 1);
}
if (values.length - valueCount > 1024 || valueCount < (values.length / 2)) {
values = Arrays.copyOf(values, valueCount);
vector = new ConstantBooleanVector(values[0], 1, blockFactory);
} else {
if (values.length - valueCount > 1024 || valueCount < (values.length / 2)) {
values = Arrays.copyOf(values, valueCount);
}
vector = new BooleanArrayVector(values, valueCount, blockFactory);
}
return new BooleanArrayVector(values, valueCount);
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true);
return vector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.util.BytesRefArray;
import org.elasticsearch.core.Releasables;

import java.util.BitSet;
import java.util.stream.IntStream;
Expand All @@ -25,7 +26,18 @@ public final class BytesRefArrayBlock extends AbstractArrayBlock implements Byte
private final BytesRefArray values;

public BytesRefArrayBlock(BytesRefArray values, int positionCount, int[] firstValueIndexes, BitSet nulls, MvOrdering mvOrdering) {
super(positionCount, firstValueIndexes, nulls, mvOrdering);
this(values, positionCount, firstValueIndexes, nulls, mvOrdering, BlockFactory.getNonBreakingInstance());
}

public BytesRefArrayBlock(
BytesRefArray values,
int positionCount,
int[] firstValueIndexes,
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
super(positionCount, firstValueIndexes, nulls, mvOrdering, blockFactory);
this.values = values;
}

Expand Down Expand Up @@ -59,7 +71,7 @@ public BytesRefBlock expand() {
return new BytesRefArrayVector(values, end).asBlock();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new BytesRefArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED);
return new BytesRefArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(BytesRefArray values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down Expand Up @@ -99,6 +111,7 @@ public String toString() {

@Override
public void close() {
// no-op
blockFactory.adjustBreaker(-(ramBytesUsed() - values.ramBytesUsed()), true);
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ public final class BytesRefArrayVector extends AbstractVector implements BytesRe
private final BytesRefArray values;

public BytesRefArrayVector(BytesRefArray values, int positionCount) {
super(positionCount);
this(values, positionCount, BlockFactory.getNonBreakingInstance());
}

public BytesRefArrayVector(BytesRefArray values, int positionCount, BlockFactory blockFactory) {
super(positionCount, blockFactory);
this.values = values;
}

Expand Down Expand Up @@ -81,6 +85,7 @@ public String toString() {

@Override
public void close() {
blockFactory.adjustBreaker(-BASE_RAM_BYTES_USED, true);
Releasables.closeExpectNoException(values);
}
}
Loading

0 comments on commit 21d9de0

Please sign in to comment.