diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java index f01baa6cb07da..c8da213e7f530 100644 --- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java @@ -182,7 +182,7 @@ private MergeResult partialReduce(QuerySearchResult[] toConsume, aggsList.add(lastMerge.reducedAggs); } for (QuerySearchResult result : toConsume) { - aggsList.add(result.consumeAggs().expand()); + aggsList.add(result.consumeAggs()); } newAggs = InternalAggregations.topLevelReduce(aggsList, aggReduceContextBuilder.forPartialReduction()); } else { @@ -310,6 +310,7 @@ public void consume(QuerySearchResult result, Runnable next) { try { addEstimateAndMaybeBreak(aggsSize); } catch (Exception exc) { + result.releaseAggs(); onMergeFailure(exc); next.run(); return; @@ -458,7 +459,7 @@ public synchronized List consumeAggs() { aggsList.add(mergeResult.reducedAggs); } for (QuerySearchResult result : buffer) { - aggsList.add(result.consumeAggs().expand()); + aggsList.add(result.consumeAggs()); } return aggsList; } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java index 628b65daeb117..e39133c83a018 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java @@ -9,7 +9,8 @@ package org.elasticsearch.common.io.stream; import org.elasticsearch.Version; -import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.lease.Releasable; import java.io.IOException; import java.io.UncheckedIOException; @@ -32,7 +33,7 @@ * to force their buffering in serialized format by calling * {@link #asSerialized(Reader, NamedWriteableRegistry)}. */ -public abstract class DelayableWriteable implements Writeable { +public abstract class DelayableWriteable implements Writeable, Releasable { /** * Build a {@linkplain DelayableWriteable} that wraps an existing object * but is serialized so that deserializing it can be delayed. @@ -46,7 +47,7 @@ public static DelayableWriteable referencing(T referenc * when {@link #expand()} is called. */ public static DelayableWriteable delayed(Writeable.Reader reader, StreamInput in) throws IOException { - return new Serialized<>(reader, in.getVersion(), in.namedWriteableRegistry(), in.readBytesReference()); + return new Serialized<>(reader, in.getVersion(), in.namedWriteableRegistry(), in.readReleasableBytesReference()); } private DelayableWriteable() {} @@ -98,7 +99,8 @@ public Serialized asSerialized(Reader reader, NamedWriteableRegistry regis } catch (IOException e) { throw new RuntimeException("unexpected error writing writeable to buffer", e); } - return new Serialized<>(reader, Version.CURRENT, registry, buffer.bytes()); + // TODO: this path is currently not used in production code, if it ever is this should start using pooled buffers + return new Serialized<>(reader, Version.CURRENT, registry, ReleasableBytesReference.wrap(buffer.bytes())); } @Override @@ -118,19 +120,25 @@ private BytesStreamOutput writeToBuffer(Version version) throws IOException { return buffer; } } + + @Override + public void close() { + //noop + } } /** - * A {@link Writeable} stored in serialized form. + * A {@link Writeable} stored in serialized form backed by a {@link ReleasableBytesReference}. Once an instance is no longer used its + * backing memory must be manually released by invoking {@link #close()} on it. */ public static class Serialized extends DelayableWriteable { private final Writeable.Reader reader; private final Version serializedAtVersion; private final NamedWriteableRegistry registry; - private final BytesReference serialized; + private final ReleasableBytesReference serialized; - private Serialized(Writeable.Reader reader, Version serializedAtVersion, - NamedWriteableRegistry registry, BytesReference serialized) { + private Serialized(Writeable.Reader reader, Version serializedAtVersion, NamedWriteableRegistry registry, + ReleasableBytesReference serialized) { this.reader = reader; this.serializedAtVersion = serializedAtVersion; this.registry = registry; @@ -186,6 +194,11 @@ public long getSerializedSize() { // We're already serialized return serialized.length(); } + + @Override + public void close() { + serialized.close(); + } } /** diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 8786a0a7c1e8e..912149a0dcef3 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -190,13 +190,23 @@ public boolean hasAggs() { * Returns and nulls out the aggregation for this search results. This allows to free up memory once the aggregation is consumed. * @throws IllegalStateException if the aggregations have already been consumed. */ - public DelayableWriteable consumeAggs() { + public InternalAggregations consumeAggs() { if (aggregations == null) { throw new IllegalStateException("aggs already consumed"); } - DelayableWriteable aggs = aggregations; - aggregations = null; - return aggs; + try { + return aggregations.expand(); + } finally { + aggregations.close(); + aggregations = null; + } + } + + public void releaseAggs() { + if (aggregations != null) { + aggregations.close(); + aggregations = null; + } } public void aggregations(InternalAggregations aggregations) { @@ -233,8 +243,9 @@ public void consumeAll() { if (hasConsumedTopDocs() == false) { consumeTopDocs(); } - if (hasAggs()) { - consumeAggs(); + if (aggregations != null) { + aggregations.close(); + aggregations = null; } } diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index 9c569bdf37af3..bf645d137cda9 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -78,8 +78,8 @@ public void testSerialization() throws Exception { assertEquals(querySearchResult.size(), deserialized.size()); assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs()); if (deserialized.hasAggs()) { - Aggregations aggs = querySearchResult.consumeAggs().expand(); - Aggregations deserializedAggs = deserialized.consumeAggs().expand(); + Aggregations aggs = querySearchResult.consumeAggs(); + Aggregations deserializedAggs = deserialized.consumeAggs(); assertEquals(aggs.asList(), deserializedAggs.asList()); } assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly());