diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java index c8da213e7f530..7af3fbdf4e146 100644 --- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java @@ -311,6 +311,8 @@ public void consume(QuerySearchResult result, Runnable next) { addEstimateAndMaybeBreak(aggsSize); } catch (Exception exc) { result.releaseAggs(); + buffer.forEach(QuerySearchResult::releaseAggs); + buffer.clear(); onMergeFailure(exc); next.run(); return; @@ -402,17 +404,20 @@ protected void doRun() { final MergeResult thisMergeResult = mergeResult; long estimatedTotalSize = (thisMergeResult != null ? thisMergeResult.estimatedSize : 0) + task.aggsBufferSize; final MergeResult newMerge; + final QuerySearchResult[] toConsume = task.consumeBuffer(); + if (toConsume == null) { + return; + } try { - final QuerySearchResult[] toConsume = task.consumeBuffer(); - if (toConsume == null) { - return; - } long estimatedMergeSize = estimateRamBytesUsedForReduce(estimatedTotalSize); addEstimateAndMaybeBreak(estimatedMergeSize); estimatedTotalSize += estimatedMergeSize; ++ numReducePhases; newMerge = partialReduce(toConsume, task.emptyResults, topDocsStats, thisMergeResult, numReducePhases); } catch (Exception t) { + for (QuerySearchResult result : toConsume) { + result.releaseAggs(); + } onMergeFailure(t); return; } @@ -507,7 +512,12 @@ public void consumeListener() { } public synchronized void cancel() { - consumeBuffer(); + QuerySearchResult[] buffer = consumeBuffer(); + if (buffer != null) { + for (QuerySearchResult result : buffer) { + result.releaseAggs(); + } + } consumeListener(); } }