Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support _first and _last parameter for missing bucket ordering in composite aggregation #1942

Merged
merged 3 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@
import org.opensearch.index.mapper.StringFieldType;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;

import java.io.IOException;
import java.util.Objects;
import java.util.function.LongConsumer;

/**
Expand All @@ -68,10 +70,11 @@ class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc,
DocValueFormat format,
boolean missingBucket,
MissingOrder missingOrder,
int size,
int reverseMul
) {
super(bigArrays, format, fieldType, missingBucket, size, reverseMul);
super(bigArrays, format, fieldType, missingBucket, missingOrder, size, reverseMul);
this.breakerConsumer = breakerConsumer;
this.docValuesFunc = docValuesFunc;
this.values = bigArrays.newObjectArray(Math.min(size, 100));
Expand Down Expand Up @@ -101,10 +104,9 @@ void copyCurrent(int slot) {
@Override
int compare(int from, int to) {
if (missingBucket) {
if (values.get(from) == null) {
return values.get(to) == null ? 0 : -1 * reverseMul;
} else if (values.get(to) == null) {
return reverseMul;
int result = missingOrder.compare(() -> Objects.isNull(values.get(from)), () -> Objects.isNull(values.get(to)), reverseMul);
if (!MissingOrder.unknownOrder(result)) {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
return result;
}
}
return compareValues(values.get(from), values.get(to));
Expand All @@ -113,10 +115,9 @@ int compare(int from, int to) {
@Override
int compareCurrent(int slot) {
if (missingBucket) {
if (currentValue == null) {
return values.get(slot) == null ? 0 : -1 * reverseMul;
} else if (values.get(slot) == null) {
return reverseMul;
int result = missingOrder.compare(() -> Objects.isNull(currentValue), () -> Objects.isNull(values.get(slot)), reverseMul);
if (!MissingOrder.unknownOrder(result)) {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
return result;
}
}
return compareValues(currentValue, values.get(slot));
Expand All @@ -125,10 +126,9 @@ int compareCurrent(int slot) {
@Override
int compareCurrentWithAfter() {
if (missingBucket) {
if (currentValue == null) {
return afterValue == null ? 0 : -1 * reverseMul;
} else if (afterValue == null) {
return reverseMul;
int result = missingOrder.compare(() -> Objects.isNull(currentValue), () -> Objects.isNull(afterValue), reverseMul);
if (!MissingOrder.unknownOrder(result)) {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
return result;
}
}
return compareValues(currentValue, afterValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@

package org.opensearch.search.aggregations.bucket.composite;

import org.opensearch.Version;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.script.Script;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
Expand All @@ -49,6 +51,8 @@
import java.time.ZoneId;
import java.util.Objects;

import static org.opensearch.search.aggregations.bucket.missing.MissingOrder.fromString;

/**
* A {@link ValuesSource} builder for {@link CompositeAggregationBuilder}
*/
Expand All @@ -59,6 +63,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
private Script script = null;
private ValueType userValueTypeHint = null;
private boolean missingBucket = false;
private MissingOrder missingOrder = MissingOrder.DEFAULT;
private SortOrder order = SortOrder.ASC;
private String format = null;

Expand All @@ -76,6 +81,9 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
this.userValueTypeHint = ValueType.readFromStream(in);
}
this.missingBucket = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
this.missingOrder = MissingOrder.readFromStream(in);
}
Comment on lines +84 to +86
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to leave this feature only for 2.0.0?
I dont see any harm to include this for 1.3.0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.3.0 also works, there is no blocker.
totally agree, doc is necessary. i add javadoc on setter method. any suggestion for others?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you'd like this for 1.3.0 (I'm good w/ that) let's make the version change in a separate backportable PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer a separate backportable PR for 1.3.0.

this.order = SortOrder.readFromStream(in);
this.format = in.readOptionalString();
}
Expand All @@ -95,6 +103,9 @@ public final void writeTo(StreamOutput out) throws IOException {
userValueTypeHint.writeTo(out);
}
out.writeBoolean(missingBucket);
if (out.getVersion().onOrAfter(Version.V_2_0_0)) {
missingOrder.writeTo(out);
}
order.writeTo(out);
out.writeOptionalString(format);
innerWriteTo(out);
Expand All @@ -120,6 +131,9 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
if (format != null) {
builder.field("format", format);
}
if (!MissingOrder.isDefault(missingOrder)) {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
builder.field("missing_order", missingOrder.toString());
}
builder.field("order", order);
doXContentBody(builder, params);
builder.endObject();
Expand All @@ -142,6 +156,7 @@ public boolean equals(Object o) {
&& Objects.equals(script, that.script())
&& Objects.equals(userValueTypeHint, that.userValuetypeHint())
&& Objects.equals(missingBucket, that.missingBucket())
&& Objects.equals(missingOrder, that.missingOrder())
&& Objects.equals(order, that.order())
&& Objects.equals(format, that.format());
}
Expand Down Expand Up @@ -226,6 +241,29 @@ public boolean missingBucket() {
return missingBucket;
}

/**
* Sets the {@link MissingOrder} to use to order missing value.
*/
public AB missingOrder(MissingOrder missingOrder) {
this.missingOrder = missingOrder;
return (AB) this;
}

/**
* Sets the {@link MissingOrder} to use to order missing value.
* @param missingOrder "first", "last" or "default".
*/
public AB missingOrder(String missingOrder) {
return missingOrder(fromString(missingOrder));
}

/**
* Missing value order. {@link MissingOrder}.
*/
public MissingOrder missingOrder() {
return missingOrder;
}

/**
* Sets the {@link SortOrder} to use to sort values produced this source
*/
Expand Down Expand Up @@ -286,6 +324,9 @@ protected abstract CompositeValuesSourceConfig innerBuild(QueryShardContext quer
protected abstract ValuesSourceType getDefaultValuesSourceType();

public final CompositeValuesSourceConfig build(QueryShardContext queryShardContext) throws IOException {
if (missingBucket == false && missingOrder != MissingOrder.DEFAULT) {
throw new IllegalArgumentException("missing_order required missing_bucket is true");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new IllegalArgumentException("missing_order required missing_bucket is true");
throw new IllegalArgumentException(MissingOrder.NAME + " is required when missing_bucket is true");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validation is to make sure missing_bucket is set to true when missing_order is set to first/last, does it make sense? for example,

POST {{baseUrl}}/{{index}}/_search
Content-Type: application/json

{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "after": {"keyword": "b"},
        "sources": [{
          "keyword": {
            "terms": {
              "field": "keyword",
              "missing_bucket": false,
              "missing_order": "last"
            }
          }
        }]
      }
    }
  }
}

HTTP/1.1 400 Bad Request
content-type: application/json; charset=UTF-8
content-encoding: gzip
content-length: 261

{
  "error": {
    "root_cause": [
      {
        "type": "illegal_argument_exception",
        "reason": "missing_order require missing_bucket is true"
      }
    ],
    "type": "search_phase_execution_exception",
    "reason": "all shards failed",
    "phase": "query",
    "grouped": true,
    "failed_shards": [
      {
        "shard": 0,
        "index": "test",
        "node": "Xr7JvaEyTiqsB4aPUEtzIg",
        "reason": {
          "type": "illegal_argument_exception",
          "reason": "missing_order require missing_bucket is true"
        }
      }
    ],
    "caused_by": {
      "type": "illegal_argument_exception",
      "reason": "missing_order require missing_bucket is true",
      "caused_by": {
        "type": "illegal_argument_exception",
        "reason": "missing_order require missing_bucket is true"
      }
    }
  },
  "status": 400
}

}
ValuesSourceConfig config = ValuesSourceConfig.resolve(
queryShardContext,
userValueTypeHint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.common.util.BigArrays;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.sort.SortOrder;

Expand All @@ -62,6 +63,7 @@ SingleDimensionValuesSource<?> createValuesSource(
private final DocValueFormat format;
private final int reverseMul;
private final boolean missingBucket;
private final MissingOrder missingOrder;
private final boolean hasScript;
private final SingleDimensionValuesSourceProvider singleDimensionValuesSourceProvider;

Expand All @@ -83,6 +85,7 @@ SingleDimensionValuesSource<?> createValuesSource(
DocValueFormat format,
SortOrder order,
boolean missingBucket,
MissingOrder missingOrder,
boolean hasScript,
SingleDimensionValuesSourceProvider singleDimensionValuesSourceProvider
) {
Expand All @@ -94,6 +97,7 @@ SingleDimensionValuesSource<?> createValuesSource(
this.missingBucket = missingBucket;
this.hasScript = hasScript;
this.singleDimensionValuesSourceProvider = singleDimensionValuesSourceProvider;
this.missingOrder = missingOrder;
}

/**
Expand Down Expand Up @@ -132,6 +136,13 @@ boolean missingBucket() {
return missingBucket;
}

/**
* If true, an explicit `null bucket represents documents with missing values.
penghuo marked this conversation as resolved.
Show resolved Hide resolved
*/
MissingOrder missingOrder() {
return missingOrder;
}

/**
* Returns true if the source contains a script that can change the value.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class CompositeValuesSourceParserHelper {
static <VB extends CompositeValuesSourceBuilder<VB>, T> void declareValuesSourceFields(AbstractObjectParser<VB, T> objectParser) {
objectParser.declareField(VB::field, XContentParser::text, new ParseField("field"), ObjectParser.ValueType.STRING);
objectParser.declareBoolean(VB::missingBucket, new ParseField("missing_bucket"));
objectParser.declareString(VB::missingOrder, new ParseField("missing_order"));
penghuo marked this conversation as resolved.
Show resolved Hide resolved

objectParser.declareField(VB::userValuetypeHint, p -> {
ValueType valueType = ValueType.lenientParse(p.text());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.search.aggregations.bucket.histogram.DateIntervalConsumer;
import org.opensearch.search.aggregations.bucket.histogram.DateIntervalWrapper;
import org.opensearch.search.aggregations.bucket.histogram.Histogram;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.support.CoreValuesSourceType;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
Expand Down Expand Up @@ -81,6 +82,7 @@ CompositeValuesSourceConfig apply(
boolean hasScript, // probably redundant with the config, but currently we check this two different ways...
String format,
boolean missingBucket,
MissingOrder missingOrder,
SortOrder order
);
}
Expand Down Expand Up @@ -288,7 +290,7 @@ public static void register(ValuesSourceRegistry.Builder builder) {
builder.register(
REGISTRY_KEY,
org.opensearch.common.collect.List.of(CoreValuesSourceType.DATE, CoreValuesSourceType.NUMERIC),
(valuesSourceConfig, rounding, name, hasScript, format, missingBucket, order) -> {
(valuesSourceConfig, rounding, name, hasScript, format, missingBucket, missingOrder, order) -> {
ValuesSource.Numeric numeric = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource();
// TODO once composite is plugged in to the values source registry or at least understands Date values source types use it
// here
Expand All @@ -304,6 +306,7 @@ public static void register(ValuesSourceRegistry.Builder builder) {
docValueFormat,
order,
missingBucket,
missingOrder,
hasScript,
(
BigArrays bigArrays,
Expand All @@ -319,6 +322,7 @@ public static void register(ValuesSourceRegistry.Builder builder) {
roundingValuesSource::round,
compositeValuesSourceConfig.format(),
compositeValuesSourceConfig.missingBucket(),
compositeValuesSourceConfig.missingOrder(),
size,
compositeValuesSourceConfig.reverseMul()
);
Expand All @@ -339,6 +343,6 @@ protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardCon
Rounding rounding = dateHistogramInterval.createRounding(timeZone(), offset);
return queryShardContext.getValuesSourceRegistry()
.getAggregator(REGISTRY_KEY, config)
.apply(config, rounding, name, config.script() != null, format(), missingBucket(), order());
.apply(config, rounding, name, config.script() != null, format(), missingBucket(), missingOrder(), order());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;

import java.io.IOException;

Expand All @@ -63,10 +64,11 @@ class DoubleValuesSource extends SingleDimensionValuesSource<Double> {
CheckedFunction<LeafReaderContext, SortedNumericDoubleValues, IOException> docValuesFunc,
DocValueFormat format,
boolean missingBucket,
MissingOrder missingOrder,
int size,
int reverseMul
) {
super(bigArrays, format, fieldType, missingBucket, size, reverseMul);
super(bigArrays, format, fieldType, missingBucket, missingOrder, size, reverseMul);
this.docValuesFunc = docValuesFunc;
this.bits = missingBucket ? new BitArray(100, bigArrays) : null;
this.values = bigArrays.newDoubleArray(Math.min(size, 100), false);
Expand All @@ -89,10 +91,9 @@ void copyCurrent(int slot) {
@Override
int compare(int from, int to) {
if (missingBucket) {
if (bits.get(from) == false) {
return bits.get(to) ? -1 * reverseMul : 0;
} else if (bits.get(to) == false) {
return reverseMul;
int result = missingOrder.compare(() -> bits.get(from) == false, () -> bits.get(to) == false, reverseMul);
if (!MissingOrder.unknownOrder(result)) {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
return result;
}
}
return compareValues(values.get(from), values.get(to));
Expand All @@ -101,10 +102,9 @@ int compare(int from, int to) {
@Override
int compareCurrent(int slot) {
if (missingBucket) {
if (missingCurrentValue) {
return bits.get(slot) ? -1 * reverseMul : 0;
} else if (bits.get(slot) == false) {
return reverseMul;
int result = missingOrder.compare(() -> missingCurrentValue, () -> bits.get(slot) == false, reverseMul);
if (!MissingOrder.unknownOrder(result)) {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
return result;
}
}
return compareValues(currentValue, values.get(slot));
Expand All @@ -113,10 +113,9 @@ int compareCurrent(int slot) {
@Override
int compareCurrentWithAfter() {
if (missingBucket) {
if (missingCurrentValue) {
return afterValue != null ? -1 * reverseMul : 0;
} else if (afterValue == null) {
return reverseMul;
int result = missingOrder.compare(() -> missingCurrentValue, () -> afterValue == null, reverseMul);
if (!MissingOrder.unknownOrder(result)) {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
return result;
}
}
return compareValues(currentValue, afterValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.search.aggregations.bucket.geogrid.CellIdSource;
import org.opensearch.search.aggregations.bucket.geogrid.GeoTileGridAggregationBuilder;
import org.opensearch.search.aggregations.bucket.geogrid.GeoTileUtils;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.support.CoreValuesSourceType;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
Expand All @@ -72,6 +73,7 @@ CompositeValuesSourceConfig apply(
boolean hasScript, // probably redundant with the config, but currently we check this two different ways...
String format,
boolean missingBucket,
MissingOrder missingOrder,
SortOrder order
);
}
Expand Down Expand Up @@ -103,7 +105,7 @@ static void register(ValuesSourceRegistry.Builder builder) {
builder.register(
REGISTRY_KEY,
CoreValuesSourceType.GEOPOINT,
(valuesSourceConfig, precision, boundingBox, name, hasScript, format, missingBucket, order) -> {
(valuesSourceConfig, precision, boundingBox, name, hasScript, format, missingBucket, missingOrder, order) -> {
ValuesSource.GeoPoint geoPoint = (ValuesSource.GeoPoint) valuesSourceConfig.getValuesSource();
// is specified in the builder.
final MappedFieldType fieldType = valuesSourceConfig.fieldType();
Expand All @@ -115,6 +117,7 @@ static void register(ValuesSourceRegistry.Builder builder) {
DocValueFormat.GEOTILE,
order,
missingBucket,
missingOrder,
hasScript,
(
BigArrays bigArrays,
Expand All @@ -132,6 +135,7 @@ static void register(ValuesSourceRegistry.Builder builder) {
LongUnaryOperator.identity(),
compositeValuesSourceConfig.format(),
compositeValuesSourceConfig.missingBucket(),
compositeValuesSourceConfig.missingOrder(),
size,
compositeValuesSourceConfig.reverseMul()
);
Expand Down Expand Up @@ -220,7 +224,7 @@ protected ValuesSourceType getDefaultValuesSourceType() {
protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardContext, ValuesSourceConfig config) throws IOException {
return queryShardContext.getValuesSourceRegistry()
.getAggregator(REGISTRY_KEY, config)
.apply(config, precision, geoBoundingBox(), name, script() != null, format(), missingBucket(), order());
.apply(config, precision, geoBoundingBox(), name, script() != null, format(), missingBucket(), missingOrder(), order());
}

}
Loading