Skip to content

Commit

Permalink
Release memory held by aggs on failure (#72966)
Browse files Browse the repository at this point in the history
If consuming a query result were disrupted by circuit breaker we would
leak memory for aggs in buffered query results, fixed.

Relates #62439 and #72309

Closes #72923
  • Loading branch information
henningandersen authored May 12, 2021
1 parent 823abb5 commit 9ab8fc5
Showing 1 changed file with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ public void consume(QuerySearchResult result, Runnable next) {
addEstimateAndMaybeBreak(aggsSize);
} catch (Exception exc) {
result.releaseAggs();
buffer.forEach(QuerySearchResult::releaseAggs);
buffer.clear();
onMergeFailure(exc);
next.run();
return;
Expand Down Expand Up @@ -402,17 +404,20 @@ protected void doRun() {
final MergeResult thisMergeResult = mergeResult;
long estimatedTotalSize = (thisMergeResult != null ? thisMergeResult.estimatedSize : 0) + task.aggsBufferSize;
final MergeResult newMerge;
final QuerySearchResult[] toConsume = task.consumeBuffer();
if (toConsume == null) {
return;
}
try {
final QuerySearchResult[] toConsume = task.consumeBuffer();
if (toConsume == null) {
return;
}
long estimatedMergeSize = estimateRamBytesUsedForReduce(estimatedTotalSize);
addEstimateAndMaybeBreak(estimatedMergeSize);
estimatedTotalSize += estimatedMergeSize;
++ numReducePhases;
newMerge = partialReduce(toConsume, task.emptyResults, topDocsStats, thisMergeResult, numReducePhases);
} catch (Exception t) {
for (QuerySearchResult result : toConsume) {
result.releaseAggs();
}
onMergeFailure(t);
return;
}
Expand Down Expand Up @@ -507,7 +512,12 @@ public void consumeListener() {
}

public synchronized void cancel() {
consumeBuffer();
QuerySearchResult[] buffer = consumeBuffer();
if (buffer != null) {
for (QuerySearchResult result : buffer) {
result.releaseAggs();
}
}
consumeListener();
}
}
Expand Down

0 comments on commit 9ab8fc5

Please sign in to comment.