diff --git a/docs/changelog/112707.yaml b/docs/changelog/112707.yaml new file mode 100644 index 0000000000000..9f16cfcd2b6f2 --- /dev/null +++ b/docs/changelog/112707.yaml @@ -0,0 +1,5 @@ +pr: 112707 +summary: Deduplicate `BucketOrder` when deserializing +area: Aggregations +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java index 4b3683edf7307..8140649c9fb4c 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java @@ -15,6 +15,8 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.HashMap; +import java.util.Map; /** * A holder for {@link Writeable}s that delays reading the underlying object @@ -230,11 +232,72 @@ private static T deserialize( ) throws IOException { try ( StreamInput in = registry == null - ? serialized.streamInput() - : new NamedWriteableAwareStreamInput(serialized.streamInput(), registry) + ? new DeduplicateStreamInput(serialized.streamInput(), new DeduplicatorCache()) + : new DeduplicateNamedWriteableAwareStreamInput(serialized.streamInput(), registry, new DeduplicatorCache()) ) { in.setTransportVersion(serializedAtVersion); return reader.read(in); } } + + /** An object implementing this interface can deduplicate instance of the provided objects.*/ + public interface Deduplicator { + T deduplicate(T object); + } + + private static class DeduplicateStreamInput extends FilterStreamInput implements Deduplicator { + + private final Deduplicator deduplicator; + + private DeduplicateStreamInput(StreamInput delegate, Deduplicator deduplicator) { + super(delegate); + this.deduplicator = deduplicator; + } + + @Override + public T deduplicate(T object) { + return deduplicator.deduplicate(object); + } + } + + private static class DeduplicateNamedWriteableAwareStreamInput extends NamedWriteableAwareStreamInput implements Deduplicator { + + private final Deduplicator deduplicator; + + private DeduplicateNamedWriteableAwareStreamInput( + StreamInput delegate, + NamedWriteableRegistry registry, + Deduplicator deduplicator + ) { + super(delegate, registry); + this.deduplicator = deduplicator; + } + + @Override + public T deduplicate(T object) { + return deduplicator.deduplicate(object); + } + } + + /** + * Implementation of a {@link Deduplicator} cache. It can hold up to 1024 instances. + */ + private static class DeduplicatorCache implements Deduplicator { + + private static final int MAX_SIZE = 1024; + // lazily init + private Map cache = null; + + @SuppressWarnings("unchecked") + @Override + public T deduplicate(T object) { + if (cache == null) { + cache = new HashMap<>(); + cache.put(object, object); + } else if (cache.size() < MAX_SIZE) { + object = (T) cache.computeIfAbsent(object, o -> o); + } + return object; + } + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java index 482d915560d04..9dcaa691219ef 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java @@ -8,6 +8,7 @@ package org.elasticsearch.search.aggregations; import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.io.stream.DelayableWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.DeprecationLogger; @@ -36,6 +37,7 @@ */ public abstract class InternalOrder extends BucketOrder { // TODO merge the contents of this file into BucketOrder. The way it is now is relic. + /** * {@link Bucket} ordering strategy to sort by a sub-aggregation. */ @@ -476,6 +478,10 @@ public static class Streams { * @throws IOException on error reading from the stream. */ public static BucketOrder readOrder(StreamInput in) throws IOException { + return readOrder(in, true); + } + + private static BucketOrder readOrder(StreamInput in, boolean dedupe) throws IOException { byte id = in.readByte(); switch (id) { case COUNT_DESC_ID: @@ -489,12 +495,18 @@ public static BucketOrder readOrder(StreamInput in) throws IOException { case Aggregation.ID: boolean asc = in.readBoolean(); String key = in.readString(); + if (dedupe && in instanceof DelayableWriteable.Deduplicator bo) { + return bo.deduplicate(new Aggregation(key, asc)); + } return new Aggregation(key, asc); case CompoundOrder.ID: int size = in.readVInt(); List compoundOrder = new ArrayList<>(size); for (int i = 0; i < size; i++) { - compoundOrder.add(Streams.readOrder(in)); + compoundOrder.add(Streams.readOrder(in, false)); + } + if (dedupe && in instanceof DelayableWriteable.Deduplicator bo) { + return bo.deduplicate(new CompoundOrder(compoundOrder, false)); } return new CompoundOrder(compoundOrder, false); default: diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java index 7e42653952a94..52541f1366236 100644 --- a/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java +++ b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.test.TransportVersionUtils; import java.io.IOException; +import java.util.Objects; import static java.util.Collections.singletonList; import static org.hamcrest.Matchers.equalTo; @@ -57,19 +58,23 @@ public int hashCode() { } private static class NamedHolder implements Writeable { - private final Example e; + private final Example e1; + private final Example e2; NamedHolder(Example e) { - this.e = e; + this.e1 = e; + this.e2 = e; } NamedHolder(StreamInput in) throws IOException { - e = in.readNamedWriteable(Example.class); + e1 = ((DelayableWriteable.Deduplicator) in).deduplicate(in.readNamedWriteable(Example.class)); + e2 = ((DelayableWriteable.Deduplicator) in).deduplicate(in.readNamedWriteable(Example.class)); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(e); + out.writeNamedWriteable(e1); + out.writeNamedWriteable(e2); } @Override @@ -78,12 +83,12 @@ public boolean equals(Object obj) { return false; } NamedHolder other = (NamedHolder) obj; - return e.equals(other.e); + return e1.equals(other.e1) && e2.equals(other.e2); } @Override public int hashCode() { - return e.hashCode(); + return Objects.hash(e1, e2); } } @@ -130,6 +135,9 @@ public void testRoundTripFromDelayedWithNamedWriteable() throws IOException { DelayableWriteable original = DelayableWriteable.referencing(n).asSerialized(NamedHolder::new, writableRegistry()); assertTrue(original.isSerialized()); roundTripTestCase(original, NamedHolder::new); + NamedHolder copy = original.expand(); + // objects have been deduplicated + assertSame(copy.e1, copy.e2); } public void testRoundTripFromDelayedFromOldVersion() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalOrderTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalOrderTests.java index b6cdf1a2825d8..1d21a1fe173df 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalOrderTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalOrderTests.java @@ -7,6 +7,10 @@ */ package org.elasticsearch.search.aggregations; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.DelayableWriteable; +import org.elasticsearch.common.io.stream.FilterStreamInput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder; import org.elasticsearch.test.AbstractXContentSerializingTestCase; @@ -116,4 +120,32 @@ protected BucketOrder mutateInstance(BucketOrder instance) { } } + public void testInternalOrderDeduplicated() throws IOException { + BucketOrder testInstance = createTestInstance(); + try (BytesStreamOutput output = new BytesStreamOutput()) { + instanceWriter().write(output, testInstance); + if (testInstance instanceof CompoundOrder || testInstance instanceof InternalOrder.Aggregation) { + assertNotSame(testInstance, instanceReader().read(output.bytes().streamInput())); + } + StreamInput dedupe = new DeduplicatorStreamInput(output.bytes().streamInput(), testInstance); + assertSame(testInstance, instanceReader().read(dedupe)); + } + } + + private static class DeduplicatorStreamInput extends FilterStreamInput implements DelayableWriteable.Deduplicator { + + private final BucketOrder order; + + protected DeduplicatorStreamInput(StreamInput delegate, BucketOrder order) { + super(delegate); + this.order = order; + } + + @SuppressWarnings("unchecked") + @Override + public T deduplicate(T object) { + return (T) order; + } + } + }