Skip to content

Commit

Permalink
Deduplicate the list of names when deserializing InternalTopMetrics (e…
Browse files Browse the repository at this point in the history
…lastic#116298)

use deduplication infrastructure to deduplicate the names of metrics in InternalTopMetrics.
  • Loading branch information
iverase committed Nov 7, 2024
1 parent beb8f3c commit 60d861d
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.fielddata.IndexFieldData;
Expand All @@ -74,7 +77,6 @@
import static org.hamcrest.Matchers.equalTo;

public class LuceneTests extends ESTestCase {
private static final NamedWriteableRegistry EMPTY_REGISTRY = new NamedWriteableRegistry(Collections.emptyList());

public void testCleanIndex() throws IOException {
MockDirectoryWrapper dir = newMockDirectory();
Expand Down Expand Up @@ -566,7 +568,6 @@ public void testSortFieldSerialization() throws IOException {
Tuple<SortField, SortField> sortFieldTuple = randomSortField();
SortField deserialized = copyInstance(
sortFieldTuple.v1(),
EMPTY_REGISTRY,
Lucene::writeSortField,
Lucene::readSortField,
TransportVersionUtils.randomVersion(random())
Expand All @@ -578,14 +579,25 @@ public void testSortValueSerialization() throws IOException {
Object sortValue = randomSortValue();
Object deserialized = copyInstance(
sortValue,
EMPTY_REGISTRY,
Lucene::writeSortValue,
Lucene::readSortValue,
TransportVersionUtils.randomVersion(random())
);
assertEquals(sortValue, deserialized);
}

private static <T> T copyInstance(T original, Writeable.Writer<T> writer, Writeable.Reader<T> reader, TransportVersion version)
throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setTransportVersion(version);
writer.write(output, original);
try (StreamInput in = output.bytes().streamInput()) {
in.setTransportVersion(version);
return reader.read(in);
}
}
}

public static Object randomSortValue() {
return switch (randomIntBetween(0, 9)) {
case 0 -> null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -1830,7 +1831,7 @@ public static <C extends NamedWriteable, T extends C> C copyNamedWriteable(
);
}

protected static <T> T copyInstance(
protected static <T extends Writeable> T copyInstance(
T original,
NamedWriteableRegistry namedWriteableRegistry,
Writeable.Writer<T> writer,
Expand All @@ -1840,9 +1841,20 @@ protected static <T> T copyInstance(
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setTransportVersion(version);
writer.write(output, original);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
in.setTransportVersion(version);
return reader.read(in);
if (randomBoolean()) {
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
in.setTransportVersion(version);
return reader.read(in);
}
} else {
BytesReference bytesReference = output.copyBytes();
output.reset();
output.writeBytesReference(bytesReference);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
in.setTransportVersion(version);
DelayableWriteable<T> delayableWriteable = DelayableWriteable.delayed(reader, in);
return delayableWriteable.expand();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.analytics.topmetrics;

import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -68,7 +69,12 @@ static InternalTopMetrics buildEmptyAggregation(String name, List<String> metric
public InternalTopMetrics(StreamInput in) throws IOException {
super(in);
sortOrder = SortOrder.readFromStream(in);
metricNames = in.readStringCollectionAsList();
final List<String> metricNames = in.readStringCollectionAsList();
if (in instanceof DelayableWriteable.Deduplicator bo) {
this.metricNames = bo.deduplicate(metricNames);
} else {
this.metricNames = metricNames;
}
size = in.readVInt();
topMetrics = in.readCollectionAsList(TopMetric::new);
}
Expand Down

0 comments on commit 60d861d

Please sign in to comment.