Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate BucketOrder when deserializing #112707

Merged
merged 5 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/112707.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 112707
summary: Deduplicate `BucketOrder` when deserializing
area: Aggregations
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.search.aggregations.Aggregator.BucketComparator;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
Expand All @@ -27,6 +28,7 @@
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.ToLongFunction;
Expand All @@ -36,6 +38,7 @@
*/
public abstract class InternalOrder extends BucketOrder {
// TODO merge the contents of this file into BucketOrder. The way it is now is relic.

/**
* {@link Bucket} ordering strategy to sort by a sub-aggregation.
*/
Expand Down Expand Up @@ -468,6 +471,8 @@ private static boolean isOrder(BucketOrder order, BucketOrder expected) {
*/
public static class Streams {

private static final BucketOrderDeduplicator bucketOrderDeduplicator = new BucketOrderDeduplicator();

/**
* Read a {@link BucketOrder} from a {@link StreamInput}.
*
Expand All @@ -489,14 +494,14 @@ public static BucketOrder readOrder(StreamInput in) throws IOException {
case Aggregation.ID:
boolean asc = in.readBoolean();
String key = in.readString();
return new Aggregation(key, asc);
return bucketOrderDeduplicator.deduplicate(new Aggregation(key, asc));
case CompoundOrder.ID:
int size = in.readVInt();
List<BucketOrder> compoundOrder = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
compoundOrder.add(Streams.readOrder(in));
}
return new CompoundOrder(compoundOrder, false);
return bucketOrderDeduplicator.deduplicate(new CompoundOrder(compoundOrder, false));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ESQL uses a wrapper around the StreamInput that keeps the cache in a regular old variable rather than a static. I'd prefer that if we can manage it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved it as a wrapper of StreamInput by (ab)using the fact that aggregations are deserialize using DelayableWritable. I have to introduce an interface so we can deduplicate when it is found.

default:
throw new RuntimeException("unknown order id [" + id + "]");
}
Expand Down Expand Up @@ -595,4 +600,28 @@ public static BucketOrder parseOrderParam(XContentParser parser) throws IOExcept
};
}
}

/**
* A cache for deserializing {@link BucketOrder}.
*/
private static class BucketOrderDeduplicator {
// This cache should be enough to deserialize one request
private static final int MAX_SIZE = 16;

private final Map<BucketOrder, BucketOrder> map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(16);

private BucketOrderDeduplicator() {}

public BucketOrder deduplicate(BucketOrder bucketOrder) {
final BucketOrder res = map.get(bucketOrder);
if (res != null) {
return res;
}
if (map.size() >= MAX_SIZE) {
map.clear();
}
map.put(bucketOrder, bucketOrder);
return bucketOrder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder;
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
Expand Down Expand Up @@ -116,4 +117,15 @@ protected BucketOrder mutateInstance(BucketOrder instance) {
}
}

public void testInternalOrderDeduplicated() throws IOException {
BucketOrder testInstance = createTestInstance();
try (BytesStreamOutput output = new BytesStreamOutput()) {
instanceWriter().write(output, testInstance);
BucketOrder order1 = instanceReader().read(output.bytes().streamInput());
instanceWriter().write(output, testInstance);
BucketOrder order2 = instanceReader().read(output.bytes().streamInput());
assertSame(order1, order2);
}
}

}