Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TDigestState.read CB leaks #114303

Merged
merged 8 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/114303.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 114303
summary: Fix TDigestState.read CB leaks
area: ES|QL
type: bug
issues:
- 114194
2 changes: 0 additions & 2 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,6 @@ tests:
- class: org.elasticsearch.action.bulk.IncrementalBulkIT
method: testIncrementalBulkLowWatermarkBackOff
issue: https://github.com/elastic/elasticsearch/issues/114182
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
issue: https://github.com/elastic/elasticsearch/issues/114194
- class: org.elasticsearch.xpack.ilm.ExplainLifecycleIT
method: testStepInfoPreservedOnAutoRetry
issue: https://github.com/elastic/elasticsearch/issues/114220
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static TDigestState create(CircuitBreaker breaker, double compression) {
}
}

static TDigestState create(CircuitBreaker breaker, Type type, double compression) {
static TDigestState createOfType(CircuitBreaker breaker, Type type, double compression) {
breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "tdigest-state-create-with-type");
try {
return new TDigestState(breaker, type, compression);
Expand Down Expand Up @@ -210,14 +210,19 @@ public static TDigestState read(CircuitBreaker breaker, StreamInput in) throws I
breaker.addWithoutBreaking(-SHALLOW_SIZE);
throw e;
}
int n = in.readVInt();
if (size > 0) {
state.tdigest.reserve(size);
}
for (int i = 0; i < n; i++) {
state.add(in.readDouble(), in.readVLong());
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer a try / finally block and use a boolean variable to control the outcome of the execution as we do in other places. See

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Also, changed the internal catch to a finally too. merging both try-finally in one made the code a bit more complex, so left them separated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks you, that's looks good.

int n = in.readVInt();
if (size > 0) {
state.tdigest.reserve(size);
}
for (int i = 0; i < n; i++) {
state.add(in.readDouble(), in.readVLong());
}
return state;
} catch (Exception e) {
Releasables.close(state);
throw e;
}
return state;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.Arrays;

import static org.hamcrest.Matchers.equalTo;
Expand All @@ -32,14 +36,75 @@ public TDigestStateReleasingTests(TDigestState.Type digestType) {
this.digestType = digestType;
}

public void testCreateOfType() {
testCircuitBreakerTrip(circuitBreaker -> TDigestState.createOfType(circuitBreaker, digestType, 100));
}

public void testCreateUsingParamsFrom() {
testCircuitBreakerTrip(circuitBreaker -> {
try (TDigestState example = TDigestState.createOfType(newLimitedBreaker(ByteSizeValue.ofMb(100)), digestType, 100)) {
return TDigestState.createUsingParamsFrom(example);
}
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use the cranky breaker service it'll blow up randomly and you can assert that you always freed the memory, no matter the random explosion. I rather like it for this sort of thing.

Which one of your tests hits a the breaker exception you are guarding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which one of your tests hits a the breaker exception you are guarding?

The testReadWithData(). The reserve is only called when it has data in it. So I added that one explicitly.

About the cranky breaker, I'm not sure we want it to break randomly. This has multiple potential breaking points I wanted to test (Add memory and release on error, on every constructor).

The cranky breaks 1/20 times. If we're fine with this one (The full suite takes around 1 second on my machine), I would leave it as-is. It tests every potential breaking point (Divided in 16 random seeds).
Is it too much? Not sure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the cranky breaker, I'm not sure we want it to break randomly. This has multiple potential breaking points I wanted to test (Add memory and release on error, on every constructor).

That's where the cranky is really good at.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just updated it to use the cranky breaker. It will fail ~1/5 times, so I guess it's ok. I suppose even 1/20 times would be ok for something like this.
I guess I was reinventing the wheel here XD

}

/**
* This test doesn't use the {@code digestType} param.
*/
public void testCreate() {
testCircuitBreakerTrip(circuitBreaker -> TDigestState.create(circuitBreaker, 100));
}

/**
* This test doesn't use the {@code digestType} param.
*/
public void testCreateOptimizedForAccuracy() {
testCircuitBreakerTrip(circuitBreaker -> TDigestState.createOptimizedForAccuracy(circuitBreaker, 100));
}

public void testRead() throws IOException {
try (
TDigestState state = TDigestState.createOfType(newLimitedBreaker(ByteSizeValue.ofMb(100)), digestType, 100);
BytesStreamOutput output = new BytesStreamOutput()
) {
TDigestState.write(state, output);

testCircuitBreakerTrip(circuitBreaker -> {
try (StreamInput input = output.bytes().streamInput()) {
return TDigestState.read(circuitBreaker, input);
}
});
}
}

public void testReadWithData() throws IOException {
try (
TDigestState state = TDigestState.createOfType(newLimitedBreaker(ByteSizeValue.ofMb(100)), digestType, 100);
BytesStreamOutput output = new BytesStreamOutput()
) {
for (int i = 0; i < 1000; i++) {
state.add(randomDoubleBetween(-Double.MAX_VALUE, Double.MAX_VALUE, true));
}

TDigestState.write(state, output);

testCircuitBreakerTrip(circuitBreaker -> {
try (StreamInput input = output.bytes().streamInput()) {
return TDigestState.read(circuitBreaker, input);
}
});
}
}

/**
* Tests that a circuit breaker trip leaves no unreleased memory.
*/
public void testCircuitBreakerTrip() {
public <E extends Exception> void testCircuitBreakerTrip(CheckedFunction<CircuitBreaker, TDigestState, E> tDigestStateFactory)
throws E {
for (int bytes = randomIntBetween(0, 16); bytes < 50_000; bytes += 17) {
CircuitBreaker breaker = newLimitedBreaker(ByteSizeValue.ofBytes(bytes));

try (TDigestState state = TDigestState.create(breaker, digestType, 100)) {
try (TDigestState state = tDigestStateFactory.apply(breaker)) {
// Add some data to make it trip. It won't work in all digest types
for (int i = 0; i < 100; i++) {
state.add(randomDoubleBetween(-Double.MAX_VALUE, Double.MAX_VALUE, true));
Expand Down