Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate BucketOrder when deserializing #112707

Merged
merged 5 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/112707.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 112707
summary: Deduplicate `BucketOrder` when deserializing
area: Aggregations
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;

/**
* A holder for {@link Writeable}s that delays reading the underlying object
Expand Down Expand Up @@ -230,11 +232,72 @@ private static <T> T deserialize(
) throws IOException {
try (
StreamInput in = registry == null
? serialized.streamInput()
: new NamedWriteableAwareStreamInput(serialized.streamInput(), registry)
? new DeduplicateStreamInput(serialized.streamInput(), new DeduplicatorCache())
: new DeduplicateNamedWriteableAwareStreamInput(serialized.streamInput(), registry, new DeduplicatorCache())
) {
in.setTransportVersion(serializedAtVersion);
return reader.read(in);
}
}

/** An object implementing this interface can deduplicate instance of the provided objects.*/
public interface Deduplicator {
<T> T deduplicate(T object);
}

private static class DeduplicateStreamInput extends FilterStreamInput implements Deduplicator {

private final Deduplicator deduplicator;

private DeduplicateStreamInput(StreamInput delegate, Deduplicator deduplicator) {
super(delegate);
this.deduplicator = deduplicator;
}

@Override
public <T> T deduplicate(T object) {
return deduplicator.deduplicate(object);
}
}

private static class DeduplicateNamedWriteableAwareStreamInput extends NamedWriteableAwareStreamInput implements Deduplicator {

private final Deduplicator deduplicator;

private DeduplicateNamedWriteableAwareStreamInput(
StreamInput delegate,
NamedWriteableRegistry registry,
Deduplicator deduplicator
) {
super(delegate, registry);
this.deduplicator = deduplicator;
}

@Override
public <T> T deduplicate(T object) {
return deduplicator.deduplicate(object);
}
}

/**
* Implementation of a {@link Deduplicator} cache. It can hold up to 1024 instances.
*/
private static class DeduplicatorCache implements Deduplicator {

private static final int MAX_SIZE = 1024;
// lazily init
private Map<Object, Object> cache = null;

@SuppressWarnings("unchecked")
@Override
public <T> T deduplicate(T object) {
if (cache == null) {
cache = new HashMap<>();
cache.put(object, object);
} else if (cache.size() < MAX_SIZE) {
object = (T) cache.computeIfAbsent(object, o -> o);
}
return object;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.search.aggregations;

import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
Expand Down Expand Up @@ -36,6 +37,7 @@
*/
public abstract class InternalOrder extends BucketOrder {
// TODO merge the contents of this file into BucketOrder. The way it is now is relic.

/**
* {@link Bucket} ordering strategy to sort by a sub-aggregation.
*/
Expand Down Expand Up @@ -476,6 +478,10 @@ public static class Streams {
* @throws IOException on error reading from the stream.
*/
public static BucketOrder readOrder(StreamInput in) throws IOException {
return readOrder(in, true);
}

private static BucketOrder readOrder(StreamInput in, boolean dedupe) throws IOException {
byte id = in.readByte();
switch (id) {
case COUNT_DESC_ID:
Expand All @@ -489,12 +495,18 @@ public static BucketOrder readOrder(StreamInput in) throws IOException {
case Aggregation.ID:
boolean asc = in.readBoolean();
String key = in.readString();
if (dedupe && in instanceof DelayableWriteable.Deduplicator bo) {
return bo.deduplicate(new Aggregation(key, asc));
}
return new Aggregation(key, asc);
case CompoundOrder.ID:
int size = in.readVInt();
List<BucketOrder> compoundOrder = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
compoundOrder.add(Streams.readOrder(in));
compoundOrder.add(Streams.readOrder(in, false));
}
if (dedupe && in instanceof DelayableWriteable.Deduplicator bo) {
return bo.deduplicate(new CompoundOrder(compoundOrder, false));
}
return new CompoundOrder(compoundOrder, false);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.test.TransportVersionUtils;

import java.io.IOException;
import java.util.Objects;

import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -57,19 +58,23 @@ public int hashCode() {
}

private static class NamedHolder implements Writeable {
private final Example e;
private final Example e1;
private final Example e2;

NamedHolder(Example e) {
this.e = e;
this.e1 = e;
this.e2 = e;
}

NamedHolder(StreamInput in) throws IOException {
e = in.readNamedWriteable(Example.class);
e1 = ((DelayableWriteable.Deduplicator) in).deduplicate(in.readNamedWriteable(Example.class));
e2 = ((DelayableWriteable.Deduplicator) in).deduplicate(in.readNamedWriteable(Example.class));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(e);
out.writeNamedWriteable(e1);
out.writeNamedWriteable(e2);
}

@Override
Expand All @@ -78,12 +83,12 @@ public boolean equals(Object obj) {
return false;
}
NamedHolder other = (NamedHolder) obj;
return e.equals(other.e);
return e1.equals(other.e1) && e2.equals(other.e2);
}

@Override
public int hashCode() {
return e.hashCode();
return Objects.hash(e1, e2);
}
}

Expand Down Expand Up @@ -130,6 +135,9 @@ public void testRoundTripFromDelayedWithNamedWriteable() throws IOException {
DelayableWriteable<NamedHolder> original = DelayableWriteable.referencing(n).asSerialized(NamedHolder::new, writableRegistry());
assertTrue(original.isSerialized());
roundTripTestCase(original, NamedHolder::new);
NamedHolder copy = original.expand();
// objects have been deduplicated
assertSame(copy.e1, copy.e2);
}

public void testRoundTripFromDelayedFromOldVersion() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.FilterStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder;
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
Expand Down Expand Up @@ -116,4 +120,32 @@ protected BucketOrder mutateInstance(BucketOrder instance) {
}
}

public void testInternalOrderDeduplicated() throws IOException {
BucketOrder testInstance = createTestInstance();
try (BytesStreamOutput output = new BytesStreamOutput()) {
instanceWriter().write(output, testInstance);
if (testInstance instanceof CompoundOrder || testInstance instanceof InternalOrder.Aggregation) {
assertNotSame(testInstance, instanceReader().read(output.bytes().streamInput()));
}
StreamInput dedupe = new DeduplicatorStreamInput(output.bytes().streamInput(), testInstance);
assertSame(testInstance, instanceReader().read(dedupe));
}
}

private static class DeduplicatorStreamInput extends FilterStreamInput implements DelayableWriteable.Deduplicator {

private final BucketOrder order;

protected DeduplicatorStreamInput(StreamInput delegate, BucketOrder order) {
super(delegate);
this.order = order;
}

@SuppressWarnings("unchecked")
@Override
public <T> T deduplicate(T object) {
return (T) order;
}
}

}