Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: shallow copy in EXPAND via shared vecs #103681

Merged
merged 9 commits into from
Jan 3, 2024
6 changes: 6 additions & 0 deletions docs/changelog/103681.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 103681
summary: "ESQL: Expand shallow copy with vecs"
area: ES|QL
type: enhancement
issues:
- 100528
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,27 @@ final class BooleanArrayBlock extends AbstractArrayBlock implements BooleanBlock
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
this(
new BooleanArrayVector(values, firstValueIndexes == null ? positionCount : firstValueIndexes[positionCount], blockFactory),
positionCount,
firstValueIndexes,
nulls,
mvOrdering,
blockFactory
);
}

private BooleanArrayBlock(
BooleanArrayVector vector,
int positionCount,
int[] firstValueIndexes,
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
super(positionCount, firstValueIndexes, nulls, mvOrdering, blockFactory);
this.vector = new BooleanArrayVector(values, values.length, blockFactory);
this.vector = vector;
}

@Override
Expand All @@ -46,6 +64,7 @@ public boolean getBoolean(int valueIndex) {

@Override
public BooleanBlock filter(int... positions) {
// TODO use reference counting to share the vector
try (var builder = blockFactory().newBooleanBlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
Expand Down Expand Up @@ -79,21 +98,28 @@ public BooleanBlock expand() {
incRef();
return this;
}
// TODO use reference counting to share the vector
try (var builder = blockFactory().newBooleanBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendBoolean(getBoolean(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
if (nullsMask == null) {
vector.incRef();
return vector.asBlock();
}

// The following line is correct because positions with multi-values are never null.
int expandedPositionCount = vector.getPositionCount();
long bitSetRamUsedEstimate = BlockRamUsageEstimator.sizeOfBitSet(expandedPositionCount);
blockFactory().adjustBreaker(bitSetRamUsedEstimate, false);

BooleanArrayBlock expanded = new BooleanArrayBlock(
vector,
expandedPositionCount,
null,
shiftNullsToExpandedPositions(),
MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING,
blockFactory()
);
blockFactory().adjustBreaker(expanded.ramBytesUsedOnlyBlock() - bitSetRamUsedEstimate, true);
// We need to incRef after adjusting any breakers, otherwise we might leak the vector if the breaker trips.
vector.incRef();
return expanded;
}

private long ramBytesUsedOnlyBlock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,27 @@ public BooleanBigArrayBlock(
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
this(
new BooleanBigArrayVector(values, firstValueIndexes == null ? positionCount : firstValueIndexes[positionCount], blockFactory),
positionCount,
firstValueIndexes,
nulls,
mvOrdering,
blockFactory
);
}

private BooleanBigArrayBlock(
BooleanBigArrayVector vector,
int positionCount,
int[] firstValueIndexes,
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
super(positionCount, firstValueIndexes, nulls, mvOrdering, blockFactory);
this.vector = new BooleanBigArrayVector(values, (int) values.size(), blockFactory);
this.vector = vector;
}

@Override
Expand All @@ -47,6 +65,7 @@ public boolean getBoolean(int valueIndex) {

@Override
public BooleanBlock filter(int... positions) {
// TODO use reference counting to share the vector
try (var builder = blockFactory().newBooleanBlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
Expand Down Expand Up @@ -80,21 +99,28 @@ public BooleanBlock expand() {
incRef();
return this;
}
// TODO use reference counting to share the vector
try (var builder = blockFactory().newBooleanBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendBoolean(getBoolean(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
if (nullsMask == null) {
vector.incRef();
return vector.asBlock();
}

// The following line is correct because positions with multi-values are never null.
int expandedPositionCount = vector.getPositionCount();
long bitSetRamUsedEstimate = BlockRamUsageEstimator.sizeOfBitSet(expandedPositionCount);
blockFactory().adjustBreaker(bitSetRamUsedEstimate, false);

BooleanBigArrayBlock expanded = new BooleanBigArrayBlock(
vector,
expandedPositionCount,
null,
shiftNullsToExpandedPositions(),
MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING,
blockFactory()
);
blockFactory().adjustBreaker(expanded.ramBytesUsedOnlyBlock() - bitSetRamUsedEstimate, true);
// We need to incRef after adjusting any breakers, otherwise we might leak the vector if the breaker trips.
vector.incRef();
return expanded;
}

private long ramBytesUsedOnlyBlock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,27 @@ final class BytesRefArrayBlock extends AbstractArrayBlock implements BytesRefBlo
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
this(
new BytesRefArrayVector(values, firstValueIndexes == null ? positionCount : firstValueIndexes[positionCount], blockFactory),
positionCount,
firstValueIndexes,
nulls,
mvOrdering,
blockFactory
);
}

private BytesRefArrayBlock(
BytesRefArrayVector vector,
int positionCount,
int[] firstValueIndexes,
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
super(positionCount, firstValueIndexes, nulls, mvOrdering, blockFactory);
this.vector = new BytesRefArrayVector(values, (int) values.size(), blockFactory);
this.vector = vector;
}

@Override
Expand All @@ -49,6 +67,7 @@ public BytesRef getBytesRef(int valueIndex, BytesRef dest) {

@Override
public BytesRefBlock filter(int... positions) {
// TODO use reference counting to share the vector
final BytesRef scratch = new BytesRef();
try (var builder = blockFactory().newBytesRefBlockBuilder(positions.length)) {
for (int pos : positions) {
Expand Down Expand Up @@ -83,22 +102,28 @@ public BytesRefBlock expand() {
incRef();
return this;
}
// TODO use reference counting to share the vector
final BytesRef scratch = new BytesRef();
try (var builder = blockFactory().newBytesRefBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendBytesRef(getBytesRef(i, scratch));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
if (nullsMask == null) {
vector.incRef();
return vector.asBlock();
}

// The following line is correct because positions with multi-values are never null.
int expandedPositionCount = vector.getPositionCount();
long bitSetRamUsedEstimate = BlockRamUsageEstimator.sizeOfBitSet(expandedPositionCount);
blockFactory().adjustBreaker(bitSetRamUsedEstimate, false);

BytesRefArrayBlock expanded = new BytesRefArrayBlock(
vector,
expandedPositionCount,
null,
shiftNullsToExpandedPositions(),
MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING,
blockFactory()
);
blockFactory().adjustBreaker(expanded.ramBytesUsedOnlyBlock() - bitSetRamUsedEstimate, true);
// We need to incRef after adjusting any breakers, otherwise we might leak the vector if the breaker trips.
vector.incRef();
return expanded;
}

private long ramBytesUsedOnlyBlock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,27 @@ final class DoubleArrayBlock extends AbstractArrayBlock implements DoubleBlock {
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
this(
new DoubleArrayVector(values, firstValueIndexes == null ? positionCount : firstValueIndexes[positionCount], blockFactory),
positionCount,
firstValueIndexes,
nulls,
mvOrdering,
blockFactory
);
}

private DoubleArrayBlock(
DoubleArrayVector vector,
int positionCount,
int[] firstValueIndexes,
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
super(positionCount, firstValueIndexes, nulls, mvOrdering, blockFactory);
this.vector = new DoubleArrayVector(values, values.length, blockFactory);
this.vector = vector;
}

@Override
Expand All @@ -46,6 +64,7 @@ public double getDouble(int valueIndex) {

@Override
public DoubleBlock filter(int... positions) {
// TODO use reference counting to share the vector
try (var builder = blockFactory().newDoubleBlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
Expand Down Expand Up @@ -79,21 +98,28 @@ public DoubleBlock expand() {
incRef();
return this;
}
// TODO use reference counting to share the vector
try (var builder = blockFactory().newDoubleBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendDouble(getDouble(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
if (nullsMask == null) {
vector.incRef();
return vector.asBlock();
}

// The following line is correct because positions with multi-values are never null.
int expandedPositionCount = vector.getPositionCount();
long bitSetRamUsedEstimate = BlockRamUsageEstimator.sizeOfBitSet(expandedPositionCount);
blockFactory().adjustBreaker(bitSetRamUsedEstimate, false);

DoubleArrayBlock expanded = new DoubleArrayBlock(
vector,
expandedPositionCount,
null,
shiftNullsToExpandedPositions(),
MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING,
blockFactory()
);
blockFactory().adjustBreaker(expanded.ramBytesUsedOnlyBlock() - bitSetRamUsedEstimate, true);
// We need to incRef after adjusting any breakers, otherwise we might leak the vector if the breaker trips.
vector.incRef();
return expanded;
}

private long ramBytesUsedOnlyBlock() {
Expand Down
Loading