Skip to content

Commit

Permalink
Deduplicate Empty InternalAggregations (#58386) (#59032)
Browse files Browse the repository at this point in the history
Working through a heap dump for an unrelated issue I found that we can easily rack up
tens of MBs of duplicate empty instances in some cases.
I moved to a static constructor to guard against that in all cases.
  • Loading branch information
original-brownbear authored Jul 4, 2020
1 parent 2fdd8f3 commit 071d8b2
Show file tree
Hide file tree
Showing 51 changed files with 127 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
listener.onResponse(new SearchResponse(new InternalSearchResponse(
new SearchHits(
new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f),
new InternalAggregations(Collections.emptyList()),
InternalAggregations.EMPTY,
new Suggest(Collections.emptyList()),
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1),
"", 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
InternalAggregations reduced =
InternalAggregations.topLevelReduce(aggs, aggReduceContextBuilder.forPartialReduction());
reducedAggs = aggsBuffer[0] = DelayableWriteable.referencing(reduced)
.asSerialized(InternalAggregations::new, namedWriteableRegistry);
.asSerialized(InternalAggregations::readFrom, namedWriteableRegistry);
long previousBufferSize = aggsCurrentBufferSize;
aggsMaxBufferSize = Math.max(aggsMaxBufferSize, aggsCurrentBufferSize);
aggsCurrentBufferSize = aggsBuffer[0].ramBytesUsed();
Expand All @@ -729,7 +729,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
}
final int i = index++;
if (hasAggs) {
aggsBuffer[i] = querySearchResult.consumeAggs().asSerialized(InternalAggregations::new, namedWriteableRegistry);
aggsBuffer[i] = querySearchResult.consumeAggs().asSerialized(InternalAggregations::readFrom, namedWriteableRegistry);
aggsCurrentBufferSize += aggsBuffer[i].ramBytesUsed();
}
if (hasTopDocs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ protected final InternalAggregations buildEmptySubAggregations() {
for (Aggregator aggregator : subAggregators) {
aggs.add(aggregator.buildEmptyAggregation());
}
return new InternalAggregations(aggs);
return InternalAggregations.from(aggs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public final class InternalAggregations extends Aggregations implements Writeabl
/**
* Constructs a new aggregation.
*/
public InternalAggregations(List<InternalAggregation> aggregations) {
private InternalAggregations(List<InternalAggregation> aggregations) {
super(aggregations);
this.pipelineTreeForBwcSerialization = null;
}
Expand All @@ -85,19 +85,26 @@ public InternalAggregations(List<InternalAggregation> aggregations, Supplier<Pip
this.pipelineTreeForBwcSerialization = pipelineTreeSource;
}

public InternalAggregations(StreamInput in) throws IOException {
super(in.readList(stream -> in.readNamedWriteable(InternalAggregation.class)));
public static InternalAggregations from(List<InternalAggregation> aggregations) {
if (aggregations.isEmpty()) {
return EMPTY;
}
return new InternalAggregations(aggregations);
}

public static InternalAggregations readFrom(StreamInput in) throws IOException {
final InternalAggregations res = from(in.readList(stream -> in.readNamedWriteable(InternalAggregation.class)));
if (in.getVersion().before(Version.V_7_8_0) && in.getVersion().onOrAfter(Version.V_6_7_0)) {
in.readNamedWriteableList(PipelineAggregator.class);
/*
* Setting the pipeline tree source to null is here is correct but
* only because we don't immediately pass the InternalAggregations
* off to another node. Instead, we always reduce together with
* many aggregations and that always adds the tree read from the
* current request.
*/
in.readNamedWriteableList(PipelineAggregator.class);
}
/*
* Setting the pipeline tree source to null is here is correct but
* only because we don't immediately pass the InternalAggregations
* off to another node. Instead, we always reduce together with
* many aggregations and that always adds the tree read from the
* current request.
*/
pipelineTreeForBwcSerialization = null;
return res;
}

@Override
Expand Down Expand Up @@ -206,10 +213,10 @@ public static InternalAggregations topLevelReduce(List<InternalAggregations> agg

for (PipelineAggregator pipelineAggregator : context.pipelineTreeRoot().aggregators()) {
SiblingPipelineAggregator sib = (SiblingPipelineAggregator) pipelineAggregator;
InternalAggregation newAgg = sib.doReduce(new InternalAggregations(reducedInternalAggs), context);
InternalAggregation newAgg = sib.doReduce(from(reducedInternalAggs), context);
reducedInternalAggs.add(newAgg);
}
return new InternalAggregations(reducedInternalAggs);
return from(reducedInternalAggs);
}
return reduced;
}
Expand Down Expand Up @@ -258,7 +265,6 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
* Version of {@link #reduce(List, ReduceContext, Function)} for nodes inside the aggregation tree.
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
return reduce(aggregationsList, context, InternalAggregations::new);
return reduce(aggregationsList, context, InternalAggregations::from);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private List<B> reducePipelineBuckets(ReduceContext reduceContext, PipelineTree
PipelineTree subTree = pipelineTree.subTree(agg.getName());
aggs.add(((InternalAggregation)agg).reducePipelines((InternalAggregation)agg, reduceContext, subTree));
}
reducedBuckets.add(createBucket(new InternalAggregations(aggs), bucket));
reducedBuckets.add(createBucket(InternalAggregations.from(aggs), bucket));
}
return reducedBuckets;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ protected final InternalAggregations[] buildSubAggsForBuckets(long[] bucketOrdsT
slice[i] = aggregations[i][ord];
}
final int thisOrd = ord;
result[ord] = new InternalAggregations(new AbstractList<InternalAggregation>() {
result[ord] = InternalAggregations.from(new AbstractList<InternalAggregation>() {
@Override
public InternalAggregation get(int index) {
return aggregations[index][thisOrd];
Expand Down Expand Up @@ -353,17 +353,6 @@ protected interface ResultBuilderForVariable<B> {
InternalAggregation build(long owninigBucketOrd, List<B> buckets);
}

/**
* Utility method to build empty aggregations of the sub aggregators.
*/
protected final InternalAggregations bucketEmptyAggregations() {
final InternalAggregation[] aggregations = new InternalAggregation[subAggregators.length];
for (int i = 0; i < subAggregators.length; i++) {
aggregations[i] = subAggregators[i].buildEmptyAggregation();
}
return new InternalAggregations(Arrays.asList(aggregations));
}

@Override
public final void close() {
try (Releasable releasable = docCounts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected InternalSingleBucketAggregation(String name, long docCount, InternalAg
protected InternalSingleBucketAggregation(StreamInput in) throws IOException {
super(in);
docCount = in.readVLong();
aggregations = new InternalAggregations(in);
aggregations = InternalAggregations.readFrom(in);
}

@Override
Expand Down Expand Up @@ -128,7 +128,7 @@ public final InternalAggregation reducePipelines(
PipelineTree subTree = pipelineTree.subTree(agg.getName());
aggs.add(((InternalAggregation)agg).reducePipelines((InternalAggregation)agg, reduceContext, subTree));
}
InternalAggregations reducedSubAggs = new InternalAggregations(aggs);
InternalAggregations reducedSubAggs = InternalAggregations.from(aggs);
reduced = create(reducedSubAggs);
}
return super.reducePipelines(reduced, reduceContext, pipelineTree);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public InternalBucket(String key, long docCount, InternalAggregations aggregatio
public InternalBucket(StreamInput in) throws IOException {
key = in.readOptionalString();
docCount = in.readVLong();
aggregations = new InternalAggregations(in);
aggregations = InternalAggregations.readFrom(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern
InternalBucket(StreamInput in, List<String> sourceNames, List<DocValueFormat> formats, int[] reverseMuls) throws IOException {
this.key = new CompositeKey(in);
this.docCount = in.readVLong();
this.aggregations = new InternalAggregations(in);
this.aggregations = InternalAggregations.readFrom(in);
this.reverseMuls = reverseMuls;
this.sourceNames = sourceNames;
this.formats = formats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public InternalBucket(StreamInput in, boolean keyed) throws IOException {
this.keyed = keyed;
key = in.readOptionalString();
docCount = in.readVLong();
aggregations = new InternalAggregations(in);
aggregations = InternalAggregations.readFrom(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public InternalGeoGridBucket(long hashAsLong, long docCount, InternalAggregation
public InternalGeoGridBucket(StreamInput in) throws IOException {
hashAsLong = in.readLong();
docCount = in.readVLong();
aggregations = new InternalAggregations(in);
aggregations = InternalAggregations.readFrom(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Bucket(StreamInput in, DocValueFormat format) throws IOException {
this.format = format;
key = in.readLong();
docCount = in.readVLong();
aggregations = new InternalAggregations(in);
aggregations = InternalAggregations.readFrom(in);
}

@Override
Expand Down Expand Up @@ -163,7 +163,7 @@ static class BucketInfo {
roundingInfos[i] = new RoundingInfo(in);
}
roundingIdx = in.readVInt();
emptySubAggregations = new InternalAggregations(in);
emptySubAggregations = InternalAggregations.readFrom(in);
}

void writeTo(StreamOutput out) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Bucket(StreamInput in, boolean keyed, DocValueFormat format) throws IOExc
this.keyed = keyed;
key = in.readLong();
docCount = in.readVLong();
aggregations = new InternalAggregations(in);
aggregations = InternalAggregations.readFrom(in);
}

@Override
Expand Down Expand Up @@ -174,7 +174,7 @@ static class EmptyBucketInfo {

EmptyBucketInfo(StreamInput in) throws IOException {
rounding = Rounding.read(in);
subAggregations = new InternalAggregations(in);
subAggregations = InternalAggregations.readFrom(in);
bounds = in.readOptionalWriteable(ExtendedBounds::new);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Bucket(StreamInput in, boolean keyed, DocValueFormat format) throws IOExc
this.keyed = keyed;
key = in.readDouble();
docCount = in.readVLong();
aggregations = new InternalAggregations(in);
aggregations = InternalAggregations.readFrom(in);
}

@Override
Expand Down Expand Up @@ -166,7 +166,7 @@ static class EmptyBucketInfo {
}

EmptyBucketInfo(StreamInput in) throws IOException {
this(in.readDouble(), in.readDouble(), in.readDouble(), in.readDouble(), new InternalAggregations(in));
this(in.readDouble(), in.readDouble(), in.readDouble(), in.readDouble(), InternalAggregations.readFrom(in));
}

public void writeTo(StreamOutput out) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public Bucket(StreamInput in, DocValueFormat format) throws IOException {
centroid = in.readDouble();
docCount = in.readVLong();
bounds = new BucketBounds(in);
aggregations = new InternalAggregations(in);
aggregations = InternalAggregations.readFrom(in);
}

@Override
Expand Down Expand Up @@ -207,7 +207,7 @@ static class EmptyBucketInfo {
}

EmptyBucketInfo(StreamInput in) throws IOException {
this(new InternalAggregations(in));
this(InternalAggregations.readFrom(in));
}

public void writeTo(StreamOutput out) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private static Bucket createFromStream(StreamInput in, DocValueFormat format, bo
BytesRef from = in.readBoolean() ? in.readBytesRef() : null;
BytesRef to = in.readBoolean() ? in.readBytesRef() : null;
long docCount = in.readLong();
InternalAggregations aggregations = new InternalAggregations(in);
InternalAggregations aggregations = InternalAggregations.readFrom(in);

return new Bucket(format, keyed, key, from, to, docCount, aggregations);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static class Bucket extends InternalRange.Bucket {

public Bucket(String key, double from, double to, long docCount, List<InternalAggregation> aggregations, boolean keyed,
DocValueFormat formatter) {
super(key, from, to, docCount, new InternalAggregations(aggregations), keyed, formatter);
super(key, from, to, docCount, InternalAggregations.from(aggregations), keyed, formatter);
}

public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValueType;
Expand All @@ -35,10 +34,6 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke

static class Bucket extends InternalRange.Bucket {

Bucket(String key, double from, double to, long docCount, List<InternalAggregation> aggregations, boolean keyed) {
this(key, from, to, docCount, new InternalAggregations(aggregations), keyed);
}

Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed) {
super(key, from, to, docCount, aggregations, keyed, DocValueFormat.RAW);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public InternalRange(StreamInput in) throws IOException {
? in.readString()
: in.readOptionalString();
ranges.add(getFactory().createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(),
new InternalAggregations(in), keyed, format));
InternalAggregations.readFrom(in), keyed, format));
}
this.ranges = ranges;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected Bucket(long docCount, InternalAggregations aggregations, DocValueForma
protected Bucket(StreamInput in, DocValueFormat formatter) throws IOException {
this.format = formatter;
docCount = in.readVLong();
aggregations = new InternalAggregations(in);
aggregations = InternalAggregations.readFrom(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected Bucket(StreamInput in, DocValueFormat formatter, boolean showDocCountE
if (showDocCountError) {
docCountError = in.readLong();
}
aggregations = new InternalAggregations(in);
aggregations = InternalAggregations.readFrom(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static class Bucket extends InternalSignificantTerms.Bucket<Bucket> {
supersetDf = in.readVLong();
term = in.readLong();
score = in.readDouble();
aggregations = new InternalAggregations(in);
aggregations = InternalAggregations.readFrom(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Bucket(StreamInput in, long subsetSize, long supersetSize, DocValueFormat
subsetDf = in.readVLong();
supersetDf = in.readVLong();
score = in.readDouble();
aggregations = new InternalAggregations(in);
aggregations = InternalAggregations.readFrom(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext

InternalSimpleValue simpleValue = new InternalSimpleValue(name(), returned.doubleValue(), formatter, metadata());
aggs.add(simpleValue);
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(new InternalAggregations(aggs),
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(InternalAggregations.from(aggs),
bucket);
newBuckets.add(newBucket);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
aggs.add(new InternalSimpleValue(name(), sum, formatter, metadata()));
Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs));
Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
newBuckets.add(newBucket);
}
return factory.createAggregation(newBuckets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
return (InternalAggregation) p;
}).collect(Collectors.toList());
aggs.add(new InternalDerivative(name(), gradient, xDiff, formatter, metadata()));
Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs));
Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
newBuckets.add(newBucket);
} else {
newBuckets.add(bucket);
Expand Down
Loading

0 comments on commit 071d8b2

Please sign in to comment.