Skip to content

Commit

Permalink
ESQL: Handle allocation errors inside topn
Browse files Browse the repository at this point in the history
This properly handles allocation errors inside of topn by making
`Block.Builder` and `Vector.Builder` `Releasable`. The "new way" to
deal with block factories is like this:
```
try (var b = IntBlock.builder(3, blockFactory) {
  b.append(1);
  b.append(2);
  b.append(3);
  return b.build();
}
```

If anything goes wrong the block factory's `close` method will be called
by the `try` block and all of the circuit breaking that it reserves will
be released.

For this all to work well `Block.Builder`s have to be one-shot. In other
words, you can only call `.build` on them one time. That shifts the
accounting from the builder into the block. It is an error to call
`build` twice.
  • Loading branch information
nik9000 committed Sep 26, 2023
1 parent 01e3649 commit 15735e0
Show file tree
Hide file tree
Showing 44 changed files with 835 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private static void close(boolean success, Releasable... releasables) {
* // the resources will be released when reaching here
* </pre>
*/
public static Releasable wrap(final Iterable<Releasable> releasables) {
public static Releasable wrap(final Iterable<? extends Releasable> releasables) {
return new Releasable() {
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.atomic.AtomicLong;

/**
* {@link CircuitBreakerService} that fails one twentieth of the time when you
* add bytes. This is useful to make sure code responds sensibly to circuit
Expand All @@ -27,31 +29,32 @@ public class CrankyCircuitBreakerService extends CircuitBreakerService {
public static final String ERROR_MESSAGE = "cranky breaker";

private final CircuitBreaker breaker = new CircuitBreaker() {
@Override
public void circuitBreak(String fieldName, long bytesNeeded) {
private final AtomicLong used = new AtomicLong();

}
@Override
public void circuitBreak(String fieldName, long bytesNeeded) {}

@Override
public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
if (ESTestCase.random().nextInt(20) == 0) {
throw new CircuitBreakingException(ERROR_MESSAGE, Durability.PERMANENT);
}
used.addAndGet(bytes);
}

@Override
public void addWithoutBreaking(long bytes) {

used.addAndGet(bytes);
}

@Override
public long getUsed() {
return 0;
return used.get();
}

@Override
public long getLimit() {
return 0;
return Long.MAX_VALUE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;

import java.util.Arrays;

/**
Expand All @@ -20,7 +22,7 @@ final class BooleanBlockBuilder extends AbstractBlockBuilder implements BooleanB
BooleanBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
int initialSize = Math.max(estimatedSize, 2);
adjustBreaker(initialSize);
adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize());
values = new boolean[initialSize];
}

Expand Down Expand Up @@ -192,8 +194,16 @@ public BooleanBlock build() {
block = new BooleanArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
built();
return block;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected void growValuesArray(int newSize) {

@Override
public BooleanVector build() {
finish();
BooleanVector vector;
if (valueCount == 1) {
vector = new ConstantBooleanVector(values[0], 1, blockFactory);
Expand All @@ -58,8 +59,16 @@ public BooleanVector build() {
}
vector = new BooleanArrayVector(values, valueCount, blockFactory);
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false);
built();
return vector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,21 @@ public BytesRefBlock build() {
block = new BytesRefArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
built();
return block;
}

@Override
public void extraClose() {
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ protected void growValuesArray(int newSize) {

@Override
public BytesRefVector build() {
finish();
BytesRefVector vector;
if (valueCount == 1) {
vector = new ConstantBytesRefVector(BytesRef.deepCopyOf(values.get(0, new BytesRef())), 1, blockFactory);
Expand All @@ -62,8 +63,21 @@ public BytesRefVector build() {
estimatedBytes = values.ramBytesUsed();
vector = new BytesRefArrayVector(values, valueCount, blockFactory);
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false);
built();
return vector;
}

@Override
public void extraClose() {
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;

import java.util.Arrays;

/**
Expand All @@ -20,7 +22,7 @@ final class DoubleBlockBuilder extends AbstractBlockBuilder implements DoubleBlo
DoubleBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
int initialSize = Math.max(estimatedSize, 2);
adjustBreaker(initialSize);
adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize());
values = new double[initialSize];
}

Expand Down Expand Up @@ -192,8 +194,16 @@ public DoubleBlock build() {
block = new DoubleArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
built();
return block;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected void growValuesArray(int newSize) {

@Override
public DoubleVector build() {
finish();
DoubleVector vector;
if (valueCount == 1) {
vector = new ConstantDoubleVector(values[0], 1, blockFactory);
Expand All @@ -58,8 +59,16 @@ public DoubleVector build() {
}
vector = new DoubleArrayVector(values, valueCount, blockFactory);
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false);
built();
return vector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;

import java.util.Arrays;

/**
Expand All @@ -20,7 +22,7 @@ final class IntBlockBuilder extends AbstractBlockBuilder implements IntBlock.Bui
IntBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
int initialSize = Math.max(estimatedSize, 2);
adjustBreaker(initialSize);
adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize());
values = new int[initialSize];
}

Expand Down Expand Up @@ -192,8 +194,16 @@ public IntBlock build() {
block = new IntArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
built();
return block;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected void growValuesArray(int newSize) {

@Override
public IntVector build() {
finish();
IntVector vector;
if (valueCount == 1) {
vector = new ConstantIntVector(values[0], 1, blockFactory);
Expand All @@ -58,8 +59,16 @@ public IntVector build() {
}
vector = new IntArrayVector(values, valueCount, blockFactory);
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false);
built();
return vector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;

import java.util.Arrays;

/**
Expand All @@ -20,7 +22,7 @@ final class LongBlockBuilder extends AbstractBlockBuilder implements LongBlock.B
LongBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
int initialSize = Math.max(estimatedSize, 2);
adjustBreaker(initialSize);
adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize());
values = new long[initialSize];
}

Expand Down Expand Up @@ -192,8 +194,16 @@ public LongBlock build() {
block = new LongArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
built();
return block;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected void growValuesArray(int newSize) {

@Override
public LongVector build() {
finish();
LongVector vector;
if (valueCount == 1) {
vector = new ConstantLongVector(values[0], 1, blockFactory);
Expand All @@ -58,8 +59,16 @@ public LongVector build() {
}
vector = new LongArrayVector(values, valueCount, blockFactory);
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false);
built();
return vector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,9 @@ public BooleanBlock build() {
public String toString() {
return "ResultBuilderForBoolean[inKey=" + inKey + "]";
}

@Override
public void close() {
builder.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public BytesRefBlock build() {
public String toString() {
return "ResultBuilderForBytesRef[inKey=" + inKey + "]";
}

@Override
public void close() {
builder.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,9 @@ public DoubleBlock build() {
public String toString() {
return "ResultBuilderForDouble[inKey=" + inKey + "]";
}

@Override
public void close() {
builder.close();
}
}
Loading

0 comments on commit 15735e0

Please sign in to comment.