diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index dd399e406177d..42c29f8498811 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -70,6 +70,7 @@ import org.opensearch.search.aggregations.MultiBucketCollector; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.BucketsAggregator; +import org.opensearch.search.aggregations.bucket.missing.MissingOrder; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.searchafter.SearchAfterBuilder; import org.opensearch.search.sort.SortAndFormats; @@ -89,6 +90,7 @@ final class CompositeAggregator extends BucketsAggregator { private final int size; private final List sourceNames; private final int[] reverseMuls; + private final MissingOrder[] missingOrders; private final List formats; private final CompositeKey rawAfterKey; @@ -117,6 +119,7 @@ final class CompositeAggregator extends BucketsAggregator { this.size = size; this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).collect(Collectors.toList()); this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray(); + this.missingOrders = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::missingOrder).toArray(MissingOrder[]::new); this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList()); this.sources = new SingleDimensionValuesSource[sourceConfigs.length]; // check that the provided size is not greater than the search.max_buckets setting @@ -189,7 +192,14 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I CompositeKey key = queue.toCompositeKey(slot); InternalAggregations aggs = subAggsForBuckets[slot]; int docCount = queue.getDocCount(slot); - buckets[queue.size()] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs); + buckets[queue.size()] = new InternalComposite.InternalBucket( + sourceNames, + formats, + key, + reverseMuls, + missingOrders, + docCount, + aggs); } CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null; return new InternalAggregation[] { @@ -201,6 +211,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I Arrays.asList(buckets), lastBucket, reverseMuls, + missingOrders, earlyTerminated, metadata() ) }; @@ -208,7 +219,17 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I @Override public InternalAggregation buildEmptyAggregation() { - return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), null, reverseMuls, false, metadata()); + return new InternalComposite( + name, + size, + sourceNames, + formats, + Collections.emptyList(), + null, + reverseMuls, + missingOrders, + false, + metadata()); } private void finishLeaf() { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java index 64eb437147510..761bee844762e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java @@ -34,6 +34,7 @@ import org.apache.lucene.util.BytesRef; import org.opensearch.LegacyESVersion; +import org.opensearch.Version; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.xcontent.XContentBuilder; @@ -43,6 +44,7 @@ import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.InternalMultiBucketAggregation; import org.opensearch.search.aggregations.KeyComparable; +import org.opensearch.search.aggregations.bucket.missing.MissingOrder; import java.io.IOException; import java.util.AbstractMap; @@ -64,6 +66,7 @@ public class InternalComposite extends InternalMultiBucketAggregation buckets; private final CompositeKey afterKey; private final int[] reverseMuls; + private final MissingOrder[] missingOrders; private final List sourceNames; private final List formats; @@ -77,6 +80,7 @@ public class InternalComposite extends InternalMultiBucketAggregation buckets, CompositeKey afterKey, int[] reverseMuls, + MissingOrder[] missingOrders, boolean earlyTerminated, Map metadata ) { @@ -87,6 +91,7 @@ public class InternalComposite extends InternalMultiBucketAggregation new InternalBucket(input, sourceNames, formats, reverseMuls)); + if (in.getVersion().onOrAfter(Version.V_2_0_0)) { + this.missingOrders = in.readArray(MissingOrder::readFromStream, MissingOrder[]::new); + } else { + this.missingOrders = new MissingOrder[reverseMuls.length]; + Arrays.fill(this.missingOrders, MissingOrder.DEFAULT); + } + this.buckets = in.readList((input) -> new InternalBucket(input, sourceNames, formats, reverseMuls, missingOrders)); this.afterKey = in.readBoolean() ? new CompositeKey(in) : null; this.earlyTerminated = in.getVersion().onOrAfter(LegacyESVersion.V_7_6_0) ? in.readBoolean() : false; } @@ -112,6 +123,9 @@ protected void doWriteTo(StreamOutput out) throws IOException { out.writeNamedWriteable(format); } out.writeIntArray(reverseMuls); + if (out.getVersion().onOrAfter(Version.V_2_0_0)) { + out.writeArray((output, order) -> order.writeTo(output), missingOrders); + } out.writeList(buckets); out.writeBoolean(afterKey != null); if (afterKey != null) { @@ -140,7 +154,17 @@ public InternalComposite create(List newBuckets) { * keep the afterKey of the original aggregation in order * to be able to retrieve the next page even if all buckets have been filtered. */ - return new InternalComposite(name, size, sourceNames, formats, newBuckets, afterKey, reverseMuls, earlyTerminated, getMetadata()); + return new InternalComposite( + name, + size, + sourceNames, + formats, + newBuckets, + afterKey, + reverseMuls, + missingOrders, + earlyTerminated, + getMetadata()); } @Override @@ -150,6 +174,7 @@ public InternalBucket createBucket(InternalAggregations aggregations, InternalBu prototype.formats, prototype.key, prototype.reverseMuls, + prototype.missingOrders, prototype.docCount, aggregations ); @@ -235,7 +260,17 @@ public InternalAggregation reduce(List aggregations, Reduce lastKey = lastBucket.getRawKey(); } reduceContext.consumeBucketsAndMaybeBreak(result.size()); - return new InternalComposite(name, size, sourceNames, reducedFormats, result, lastKey, reverseMuls, earlyTerminated, metadata); + return new InternalComposite( + name, + size, + sourceNames, + reducedFormats, + result, + lastKey, + reverseMuls, + missingOrders, + earlyTerminated, + metadata); } @Override @@ -253,7 +288,7 @@ protected InternalBucket reduceBucket(List buckets, ReduceContex * just whatever formats make sense for *its* index. This can be real * trouble when the index doing the reducing is unmapped. */ List reducedFormats = buckets.get(0).formats; - return new InternalBucket(sourceNames, reducedFormats, buckets.get(0).key, reverseMuls, docCount, aggs); + return new InternalBucket(sourceNames, reducedFormats, buckets.get(0).key, reverseMuls, missingOrders, docCount, aggs); } @Override @@ -266,12 +301,13 @@ public boolean equals(Object obj) { return Objects.equals(size, that.size) && Objects.equals(buckets, that.buckets) && Objects.equals(afterKey, that.afterKey) - && Arrays.equals(reverseMuls, that.reverseMuls); + && Arrays.equals(reverseMuls, that.reverseMuls) + && Arrays.equals(missingOrders, that.missingOrders); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), size, buckets, afterKey, Arrays.hashCode(reverseMuls)); + return Objects.hash(super.hashCode(), size, buckets, afterKey, Arrays.hashCode(reverseMuls), Arrays.hashCode(missingOrders)); } private static class BucketIterator implements Comparable { @@ -301,6 +337,7 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern private final long docCount; private final InternalAggregations aggregations; private final transient int[] reverseMuls; + private final transient MissingOrder[] missingOrders; private final transient List sourceNames; private final transient List formats; @@ -309,6 +346,7 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern List formats, CompositeKey key, int[] reverseMuls, + MissingOrder[] missingOrders, long docCount, InternalAggregations aggregations ) { @@ -316,15 +354,22 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern this.docCount = docCount; this.aggregations = aggregations; this.reverseMuls = reverseMuls; + this.missingOrders = missingOrders; this.sourceNames = sourceNames; this.formats = formats; } - InternalBucket(StreamInput in, List sourceNames, List formats, int[] reverseMuls) throws IOException { + InternalBucket( + StreamInput in, + List sourceNames, + List formats, + int[] reverseMuls, + MissingOrder[] missingOrders) throws IOException { this.key = new CompositeKey(in); this.docCount = in.readVLong(); this.aggregations = InternalAggregations.readFrom(in); this.reverseMuls = reverseMuls; + this.missingOrders = missingOrders; this.sourceNames = sourceNames; this.formats = formats; } @@ -400,13 +445,15 @@ List getFormats() { @Override public int compareKey(InternalBucket other) { for (int i = 0; i < key.size(); i++) { - if (key.get(i) == null) { - if (other.key.get(i) == null) { + // lambda function require final variable. + final int index = i; + int result = missingOrders[i].compare(() -> key.get(index) == null, () -> other.key.get(index) == null, reverseMuls[i]); + if (MissingOrder.unknownOrder(result) == false) { + if (result == 0) { continue; + } else { + return result; } - return -1 * reverseMuls[i]; - } else if (other.key.get(i) == null) { - return reverseMuls[i]; } assert key.get(i).getClass() == other.key.get(i).getClass(); @SuppressWarnings("unchecked") diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/InternalCompositeTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/InternalCompositeTests.java index 311c688f23ff6..bfa2a93d0ac3c 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/InternalCompositeTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/InternalCompositeTests.java @@ -39,6 +39,7 @@ import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.ParsedAggregation; +import org.opensearch.search.aggregations.bucket.missing.MissingOrder; import org.opensearch.test.InternalMultiBucketAggregationTestCase; import org.junit.After; @@ -71,6 +72,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa private List sourceNames; private List formats; private int[] reverseMuls; + private MissingOrder[] missingOrders; private int[] types; private int size; @@ -100,10 +102,12 @@ public void setUp() throws Exception { sourceNames = new ArrayList<>(); formats = new ArrayList<>(); reverseMuls = new int[numFields]; + missingOrders = new MissingOrder[numFields]; types = new int[numFields]; for (int i = 0; i < numFields; i++) { sourceNames.add("field_" + i); reverseMuls[i] = randomBoolean() ? 1 : -1; + missingOrders[i] = randomFrom(MissingOrder.values()); int type = randomIntBetween(0, 2); types[i] = type; formats.add(randomDocValueFormat(type == 0)); @@ -182,6 +186,7 @@ protected InternalComposite createTestInstance(String name, Map formats, key, reverseMuls, + missingOrders, 1L, aggregations ); @@ -189,7 +194,17 @@ protected InternalComposite createTestInstance(String name, Map } Collections.sort(buckets, (o1, o2) -> o1.compareKey(o2)); CompositeKey lastBucket = buckets.size() > 0 ? buckets.get(buckets.size() - 1).getRawKey() : null; - return new InternalComposite(name, size, sourceNames, formats, buckets, lastBucket, reverseMuls, randomBoolean(), metadata); + return new InternalComposite( + name, + size, + sourceNames, + formats, + buckets, + lastBucket, + reverseMuls, + missingOrders, + randomBoolean(), + metadata); } @Override @@ -214,6 +229,7 @@ protected InternalComposite mutateInstance(InternalComposite instance) throws IO formats, createCompositeKey(), reverseMuls, + missingOrders, randomLongBetween(1, 100), InternalAggregations.EMPTY ) @@ -239,6 +255,7 @@ protected InternalComposite mutateInstance(InternalComposite instance) throws IO buckets, lastBucket, reverseMuls, + missingOrders, randomBoolean(), metadata ); @@ -295,6 +312,7 @@ public void testReduceUnmapped() throws IOException { emptyList(), null, reverseMuls, + missingOrders, true, emptyMap() );