Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Handle allocation errors inside topn #99931

Merged
merged 6 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private static void close(boolean success, Releasable... releasables) {
* // the resources will be released when reaching here
* </pre>
*/
public static Releasable wrap(final Iterable<Releasable> releasables) {
public static Releasable wrap(final Iterable<? extends Releasable> releasables) {
return new Releasable() {
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,14 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public long ramBytesUsed() {
return BASE_RAM_BYTES_USED + startOffsets.ramBytesUsed() + bytes.ramBytesUsed();
return BASE_RAM_BYTES_USED + bigArraysRamBytesUsed();
}

/**
* Memory used by the {@link BigArrays} portion of this {@link BytesRefArray}.
*/
public long bigArraysRamBytesUsed() {
return startOffsets.ramBytesUsed() + bytes.ramBytesUsed();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,9 @@ public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws Circu
while (true) {
long old = used.get();
long total = old + bytes;
if (total < 0) {
throw new AssertionError("total must be >= 0 but was [" + total + "]");
}
if (total > max.getBytes()) {
throw new CircuitBreakingException(ERROR_MESSAGE, bytes, max.getBytes(), Durability.TRANSIENT);
}
Expand All @@ -689,7 +692,10 @@ public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws Circu

@Override
public void addWithoutBreaking(long bytes) {
used.addAndGet(bytes);
long total = used.addAndGet(bytes);
if (total < 0) {
throw new AssertionError("total must be >= 0 but was [" + total + "]");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.atomic.AtomicLong;

/**
* {@link CircuitBreakerService} that fails one twentieth of the time when you
* add bytes. This is useful to make sure code responds sensibly to circuit
Expand All @@ -27,31 +29,32 @@ public class CrankyCircuitBreakerService extends CircuitBreakerService {
public static final String ERROR_MESSAGE = "cranky breaker";

private final CircuitBreaker breaker = new CircuitBreaker() {
@Override
public void circuitBreak(String fieldName, long bytesNeeded) {
private final AtomicLong used = new AtomicLong();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm modifying this so I can assert that we release all memory after we break.


}
@Override
public void circuitBreak(String fieldName, long bytesNeeded) {}

@Override
public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
if (ESTestCase.random().nextInt(20) == 0) {
throw new CircuitBreakingException(ERROR_MESSAGE, Durability.PERMANENT);
}
used.addAndGet(bytes);
}

@Override
public void addWithoutBreaking(long bytes) {

used.addAndGet(bytes);
}

@Override
public long getUsed() {
return 0;
return used.get();
}

@Override
public long getLimit() {
return 0;
return Long.MAX_VALUE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public BooleanBlock expand() {
public static long ramBytesEstimated(boolean[] values, int[] firstValueIndexes, BitSet nullsMask) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask) + RamUsageEstimator.shallowSizeOfInstance(MvOrdering.class);
// TODO mvordering is shared
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, of course. 👍

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;

import java.util.Arrays;

/**
Expand All @@ -20,7 +22,7 @@ final class BooleanBlockBuilder extends AbstractBlockBuilder implements BooleanB
BooleanBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
int initialSize = Math.max(estimatedSize, 2);
adjustBreaker(initialSize);
adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were pretty far off so I took the liberty of making them a bit more accurate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Thanks.

values = new boolean[initialSize];
}

Expand Down Expand Up @@ -192,8 +194,16 @@ public BooleanBlock build() {
block = new BooleanArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
built();
return block;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected void growValuesArray(int newSize) {

@Override
public BooleanVector build() {
finish();
BooleanVector vector;
if (valueCount == 1) {
vector = new ConstantBooleanVector(values[0], 1, blockFactory);
Expand All @@ -58,8 +59,16 @@ public BooleanVector build() {
}
vector = new BooleanArrayVector(values, valueCount, blockFactory);
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false);
built();
return vector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public BytesRefBlock expand() {
public static long ramBytesEstimated(BytesRefArray values, int[] firstValueIndexes, BitSet nullsMask) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask) + RamUsageEstimator.shallowSizeOfInstance(MvOrdering.class);
// TODO mvordering is shared
}

@Override
Expand Down Expand Up @@ -111,7 +112,7 @@ public String toString() {

@Override
public void close() {
blockFactory.adjustBreaker(-(ramBytesUsed() - values.ramBytesUsed()), true);
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public String toString() {

@Override
public void close() {
blockFactory.adjustBreaker(-BASE_RAM_BYTES_USED, true);
blockFactory.adjustBreaker(-ramBytesUsed() + values.bigArraysRamBytesUsed(), true);
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,19 +193,42 @@ public BytesRefBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) {
public BytesRefBlock build() {
finish();
BytesRefBlock block;
assert estimatedBytes == 0 || firstValueIndexes != null;
if (hasNonNullValue && positionCount == 1 && valueCount == 1) {
block = new ConstantBytesRefVector(BytesRef.deepCopyOf(values.get(0, new BytesRef())), 1, blockFactory).asBlock();
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
Releasables.closeExpectNoException(values);
} else {
estimatedBytes += values.ramBytesUsed();
if (isDense() && singleValued()) {
block = new BytesRefArrayVector(values, positionCount, blockFactory).asBlock();
} else {
block = new BytesRefArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes - values.bigArraysRamBytesUsed(), false);
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true);
values = null;
built();
return block;
}

@Override
public void extraClose() {
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,40 @@ protected void growValuesArray(int newSize) {

@Override
public BytesRefVector build() {
finish();
BytesRefVector vector;
assert estimatedBytes == 0;
if (valueCount == 1) {
vector = new ConstantBytesRefVector(BytesRef.deepCopyOf(values.get(0, new BytesRef())), 1, blockFactory);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed(), false);
Releasables.closeExpectNoException(values);
} else {
estimatedBytes = values.ramBytesUsed();
vector = new BytesRefArrayVector(values, valueCount, blockFactory);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - values.bigArraysRamBytesUsed(), false);
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true);
values = null;
built();
return vector;
}

@Override
public void extraClose() {
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public DoubleBlock expand() {
public static long ramBytesEstimated(double[] values, int[] firstValueIndexes, BitSet nullsMask) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask) + RamUsageEstimator.shallowSizeOfInstance(MvOrdering.class);
// TODO mvordering is shared
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;

import java.util.Arrays;

/**
Expand All @@ -20,7 +22,7 @@ final class DoubleBlockBuilder extends AbstractBlockBuilder implements DoubleBlo
DoubleBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
int initialSize = Math.max(estimatedSize, 2);
adjustBreaker(initialSize);
adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize());
values = new double[initialSize];
}

Expand Down Expand Up @@ -192,8 +194,16 @@ public DoubleBlock build() {
block = new DoubleArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
built();
return block;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected void growValuesArray(int newSize) {

@Override
public DoubleVector build() {
finish();
DoubleVector vector;
if (valueCount == 1) {
vector = new ConstantDoubleVector(values[0], 1, blockFactory);
Expand All @@ -58,8 +59,16 @@ public DoubleVector build() {
}
vector = new DoubleArrayVector(values, valueCount, blockFactory);
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(vector.ramBytesUsed() - estimatedBytes, false);
built();
return vector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public IntBlock expand() {
public static long ramBytesEstimated(int[] values, int[] firstValueIndexes, BitSet nullsMask) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask) + RamUsageEstimator.shallowSizeOfInstance(MvOrdering.class);
// TODO mvordering is shared
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;

import java.util.Arrays;

/**
Expand All @@ -20,7 +22,7 @@ final class IntBlockBuilder extends AbstractBlockBuilder implements IntBlock.Bui
IntBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
int initialSize = Math.max(estimatedSize, 2);
adjustBreaker(initialSize);
adjustBreaker(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + initialSize * elementSize());
values = new int[initialSize];
}

Expand Down Expand Up @@ -192,8 +194,16 @@ public IntBlock build() {
block = new IntArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
}
// update the breaker with the actual bytes used.
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, true);
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(block.ramBytesUsed() - estimatedBytes, false);
built();
return block;
}
}
Loading