Skip to content

Commit

Permalink
Deduplicate BucketOrder when deserializing (#112707) (#112789)
Browse files Browse the repository at this point in the history
Deduplicate BucketOrder object by wrapping the StreamInput generated by DelayableWritable objects.
  • Loading branch information
iverase authored Sep 12, 2024
1 parent 4f17b4a commit 6067464
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 9 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/112707.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 112707
summary: Deduplicate `BucketOrder` when deserializing
area: Aggregations
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;

/**
* A holder for {@link Writeable}s that delays reading the underlying object
Expand Down Expand Up @@ -230,11 +232,72 @@ private static <T> T deserialize(
) throws IOException {
try (
StreamInput in = registry == null
? serialized.streamInput()
: new NamedWriteableAwareStreamInput(serialized.streamInput(), registry)
? new DeduplicateStreamInput(serialized.streamInput(), new DeduplicatorCache())
: new DeduplicateNamedWriteableAwareStreamInput(serialized.streamInput(), registry, new DeduplicatorCache())
) {
in.setTransportVersion(serializedAtVersion);
return reader.read(in);
}
}

/** An object implementing this interface can deduplicate instance of the provided objects.*/
public interface Deduplicator {
<T> T deduplicate(T object);
}

private static class DeduplicateStreamInput extends FilterStreamInput implements Deduplicator {

private final Deduplicator deduplicator;

private DeduplicateStreamInput(StreamInput delegate, Deduplicator deduplicator) {
super(delegate);
this.deduplicator = deduplicator;
}

@Override
public <T> T deduplicate(T object) {
return deduplicator.deduplicate(object);
}
}

private static class DeduplicateNamedWriteableAwareStreamInput extends NamedWriteableAwareStreamInput implements Deduplicator {

private final Deduplicator deduplicator;

private DeduplicateNamedWriteableAwareStreamInput(
StreamInput delegate,
NamedWriteableRegistry registry,
Deduplicator deduplicator
) {
super(delegate, registry);
this.deduplicator = deduplicator;
}

@Override
public <T> T deduplicate(T object) {
return deduplicator.deduplicate(object);
}
}

/**
* Implementation of a {@link Deduplicator} cache. It can hold up to 1024 instances.
*/
private static class DeduplicatorCache implements Deduplicator {

private static final int MAX_SIZE = 1024;
// lazily init
private Map<Object, Object> cache = null;

@SuppressWarnings("unchecked")
@Override
public <T> T deduplicate(T object) {
if (cache == null) {
cache = new HashMap<>();
cache.put(object, object);
} else if (cache.size() < MAX_SIZE) {
object = (T) cache.computeIfAbsent(object, o -> o);
}
return object;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.search.aggregations;

import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
Expand Down Expand Up @@ -36,6 +37,7 @@
*/
public abstract class InternalOrder extends BucketOrder {
// TODO merge the contents of this file into BucketOrder. The way it is now is relic.

/**
* {@link Bucket} ordering strategy to sort by a sub-aggregation.
*/
Expand Down Expand Up @@ -476,6 +478,10 @@ public static class Streams {
* @throws IOException on error reading from the stream.
*/
public static BucketOrder readOrder(StreamInput in) throws IOException {
return readOrder(in, true);
}

private static BucketOrder readOrder(StreamInput in, boolean dedupe) throws IOException {
byte id = in.readByte();
switch (id) {
case COUNT_DESC_ID:
Expand All @@ -489,12 +495,18 @@ public static BucketOrder readOrder(StreamInput in) throws IOException {
case Aggregation.ID:
boolean asc = in.readBoolean();
String key = in.readString();
if (dedupe && in instanceof DelayableWriteable.Deduplicator bo) {
return bo.deduplicate(new Aggregation(key, asc));
}
return new Aggregation(key, asc);
case CompoundOrder.ID:
int size = in.readVInt();
List<BucketOrder> compoundOrder = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
compoundOrder.add(Streams.readOrder(in));
compoundOrder.add(Streams.readOrder(in, false));
}
if (dedupe && in instanceof DelayableWriteable.Deduplicator bo) {
return bo.deduplicate(new CompoundOrder(compoundOrder, false));
}
return new CompoundOrder(compoundOrder, false);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.test.TransportVersionUtils;

import java.io.IOException;
import java.util.Objects;

import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -57,19 +58,23 @@ public int hashCode() {
}

private static class NamedHolder implements Writeable {
private final Example e;
private final Example e1;
private final Example e2;

NamedHolder(Example e) {
this.e = e;
this.e1 = e;
this.e2 = e;
}

NamedHolder(StreamInput in) throws IOException {
e = in.readNamedWriteable(Example.class);
e1 = ((DelayableWriteable.Deduplicator) in).deduplicate(in.readNamedWriteable(Example.class));
e2 = ((DelayableWriteable.Deduplicator) in).deduplicate(in.readNamedWriteable(Example.class));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(e);
out.writeNamedWriteable(e1);
out.writeNamedWriteable(e2);
}

@Override
Expand All @@ -78,12 +83,12 @@ public boolean equals(Object obj) {
return false;
}
NamedHolder other = (NamedHolder) obj;
return e.equals(other.e);
return e1.equals(other.e1) && e2.equals(other.e2);
}

@Override
public int hashCode() {
return e.hashCode();
return Objects.hash(e1, e2);
}
}

Expand Down Expand Up @@ -130,6 +135,9 @@ public void testRoundTripFromDelayedWithNamedWriteable() throws IOException {
DelayableWriteable<NamedHolder> original = DelayableWriteable.referencing(n).asSerialized(NamedHolder::new, writableRegistry());
assertTrue(original.isSerialized());
roundTripTestCase(original, NamedHolder::new);
NamedHolder copy = original.expand();
// objects have been deduplicated
assertSame(copy.e1, copy.e2);
}

public void testRoundTripFromDelayedFromOldVersion() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.FilterStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder;
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
Expand Down Expand Up @@ -116,4 +120,32 @@ protected BucketOrder mutateInstance(BucketOrder instance) {
}
}

public void testInternalOrderDeduplicated() throws IOException {
BucketOrder testInstance = createTestInstance();
try (BytesStreamOutput output = new BytesStreamOutput()) {
instanceWriter().write(output, testInstance);
if (testInstance instanceof CompoundOrder || testInstance instanceof InternalOrder.Aggregation) {
assertNotSame(testInstance, instanceReader().read(output.bytes().streamInput()));
}
StreamInput dedupe = new DeduplicatorStreamInput(output.bytes().streamInput(), testInstance);
assertSame(testInstance, instanceReader().read(dedupe));
}
}

private static class DeduplicatorStreamInput extends FilterStreamInput implements DelayableWriteable.Deduplicator {

private final BucketOrder order;

protected DeduplicatorStreamInput(StreamInput delegate, BucketOrder order) {
super(delegate);
this.order = order;
}

@SuppressWarnings("unchecked")
@Override
public <T> T deduplicate(T object) {
return (T) order;
}
}

}

0 comments on commit 6067464

Please sign in to comment.