Skip to content

Commit

Permalink
Ensure release vector builder in QuantileStates (elastic#100693)
Browse files Browse the repository at this point in the history
Ensure that we always release the vector builders in case we hit 
the breaker in QuantileStates.
  • Loading branch information
dnhatn authored Oct 11, 2023
1 parent b95149c commit 6eb4c8e
Showing 1 changed file with 40 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,58 +161,61 @@ TDigestState getOrNull(int position) {
@Override
public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) {
assert blocks.length >= offset + 1;
var builder = BytesRefBlock.newBlockBuilder(selected.getPositionCount(), driverContext.blockFactory());
for (int i = 0; i < selected.getPositionCount(); i++) {
int group = selected.getInt(i);
TDigestState state;
if (group < digests.size()) {
state = getOrNull(group);
if (state == null) {
try (var builder = BytesRefBlock.newBlockBuilder(selected.getPositionCount(), driverContext.blockFactory())) {
for (int i = 0; i < selected.getPositionCount(); i++) {
int group = selected.getInt(i);
TDigestState state;
if (group < digests.size()) {
state = getOrNull(group);
if (state == null) {
state = TDigestState.create(DEFAULT_COMPRESSION);
}
} else {
state = TDigestState.create(DEFAULT_COMPRESSION);
}
} else {
state = TDigestState.create(DEFAULT_COMPRESSION);
builder.appendBytesRef(serializeDigest(state));
}
builder.appendBytesRef(serializeDigest(state));
blocks[offset] = builder.build();
}
blocks[offset] = builder.build();
}

Block evaluateMedianAbsoluteDeviation(IntVector selected, DriverContext driverContext) {
assert percentile == MEDIAN : "Median must be 50th percentile [percentile = " + percentile + "]";
final DoubleBlock.Builder builder = DoubleBlock.newBlockBuilder(selected.getPositionCount(), driverContext.blockFactory());
for (int i = 0; i < selected.getPositionCount(); i++) {
int si = selected.getInt(i);
if (si >= digests.size()) {
builder.appendNull();
continue;
}
final TDigestState digest = digests.get(si);
if (digest != null && digest.size() > 0) {
builder.appendDouble(InternalMedianAbsoluteDeviation.computeMedianAbsoluteDeviation(digest));
} else {
builder.appendNull();
try (DoubleBlock.Builder builder = DoubleBlock.newBlockBuilder(selected.getPositionCount(), driverContext.blockFactory())) {
for (int i = 0; i < selected.getPositionCount(); i++) {
int si = selected.getInt(i);
if (si >= digests.size()) {
builder.appendNull();
continue;
}
final TDigestState digest = digests.get(si);
if (digest != null && digest.size() > 0) {
builder.appendDouble(InternalMedianAbsoluteDeviation.computeMedianAbsoluteDeviation(digest));
} else {
builder.appendNull();
}
}
return builder.build();
}
return builder.build();
}

Block evaluatePercentile(IntVector selected, DriverContext driverContext) {
final DoubleBlock.Builder builder = DoubleBlock.newBlockBuilder(selected.getPositionCount(), driverContext.blockFactory());
for (int i = 0; i < selected.getPositionCount(); i++) {
int si = selected.getInt(i);
if (si >= digests.size()) {
builder.appendNull();
continue;
}
final TDigestState digest = digests.get(si);
if (percentile != null && digest != null && digest.size() > 0) {
builder.appendDouble(digest.quantile(percentile / 100));
} else {
builder.appendNull();
try (DoubleBlock.Builder builder = DoubleBlock.newBlockBuilder(selected.getPositionCount(), driverContext.blockFactory())) {
for (int i = 0; i < selected.getPositionCount(); i++) {
int si = selected.getInt(i);
if (si >= digests.size()) {
builder.appendNull();
continue;
}
final TDigestState digest = digests.get(si);
if (percentile != null && digest != null && digest.size() > 0) {
builder.appendDouble(digest.quantile(percentile / 100));
} else {
builder.appendNull();
}
}
return builder.build();
}
return builder.build();
}

@Override
Expand Down

0 comments on commit 6eb4c8e

Please sign in to comment.