Skip to content

Commit

Permalink
Revert "Continue realizing sorting by aggregations (#52298)"
Browse files Browse the repository at this point in the history
This reverts commit 5659e72.

It causes some test failures that didn't come up in the PR tests.
  • Loading branch information
nik9000 committed Feb 21, 2020
1 parent 5659e72 commit b3afbe7
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Iterator;

/**
* An Aggregator.
Expand Down Expand Up @@ -93,53 +91,6 @@ public static boolean descendsFromBucketAggregator(Aggregator parent) {
*/
public abstract Aggregator subAggregator(String name);

/**
* Resolve the next step of the sort path as though this aggregation
* supported sorting. This is usually the "first step" when resolving
* a sort path because most aggs that support sorting their buckets
* aren't valid in the middle of a sort path.
* <p>
* For example, the {@code terms} aggs supports sorting its buckets, but
* that sort path itself can't contain a different {@code terms}
* aggregation.
*/
public final Aggregator resolveSortPathOnValidAgg(AggregationPath.PathElement next, Iterator<AggregationPath.PathElement> path) {
Aggregator n = subAggregator(next.name);
if (n == null) {
throw new IllegalArgumentException("The provided aggregation [" + next + "] either does not exist, or is "
+ "a pipeline aggregation and cannot be used to sort the buckets.");
}
if (false == path.hasNext()) {
return n;
}
if (next.key != null) {
throw new IllegalArgumentException("Key only allowed on last aggregation path element but got [" + next + "]");
}
return n.resolveSortPath(path.next(), path);
}

/**
* Resolve a sort path to the target.
* <p>
* The default implementation throws an exception but we override it on aggregations that support sorting.
*/
public Aggregator resolveSortPath(AggregationPath.PathElement next, Iterator<AggregationPath.PathElement> path) {
throw new IllegalArgumentException("Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end. [" + name() + "] is not single-bucket.");
}

/**
* Validates the "key" portion of a sort on this aggregation.
* <p>
* The default implementation throws an exception but we override it on aggregations that support sorting.
*/
public void validateSortPathKey(String key) {
throw new IllegalArgumentException("Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end.");
}

/**
* Build an aggregation for data that has been collected into {@code bucket}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.IntConsumer;
Expand Down Expand Up @@ -165,24 +163,4 @@ public final void close() {
}
}

@Override
public Aggregator resolveSortPath(AggregationPath.PathElement next, Iterator<AggregationPath.PathElement> path) {
if (this instanceof SingleBucketAggregator) {
return resolveSortPathOnValidAgg(next, path);
}
return super.resolveSortPath(next, path);
}

@Override
public void validateSortPathKey(String key) {
if (false == this instanceof SingleBucketAggregator) {
super.validateSortPathKey(key);
return;
}
if (key != null && false == "doc_count".equals(key)) {
throw new IllegalArgumentException("Ordering on a single-bucket aggregation can only be done on its doc_count. " +
"Either drop the key (a la \"" + name() + "\") or change it to \"doc_count\" (a la \"" + name() +
".doc_count\")");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Iterator;

/**
* A {@link BucketCollector} that records collected doc IDs and buckets and
Expand Down Expand Up @@ -122,15 +120,6 @@ public void postCollection() throws IOException {
"Deferred collectors cannot be collected directly. They must be collected through the recording wrapper.");
}

@Override
public Aggregator resolveSortPath(PathElement next, Iterator<PathElement> path) {
return in.resolveSortPath(next, path);
}

@Override
public void validateSortPathKey(String key) {
in.validateSortPathKey(key);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Object getProperty(List<String> path) {
@Override
public final double sortValue(String key) {
if (key == null) {
throw new IllegalArgumentException("Missing value key in [" + key + "] which refers to a multi-value metric aggregation");
throw new IllegalArgumentException("Missing value key in [" + key+ "] which refers to a multi-value metric aggregation");
}
return value(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,6 @@ protected SingleValue(String name, SearchContext context, Aggregator parent, Lis
}

public abstract double metric(long owningBucketOrd);

@Override
public void validateSortPathKey(String key) {
if (key != null && false == "value".equals(key)) {
throw new IllegalArgumentException("Ordering on a single-value metrics aggregation can only be done on its value. " +
"Either drop the key (a la \"" + name() + "\") or change it to \"value\" (a la \"" + name() + ".value\")");
}
}
}

public abstract static class MultiValue extends NumericMetricsAggregator {
Expand All @@ -61,16 +53,5 @@ protected MultiValue(String name, SearchContext context, Aggregator parent, List
public abstract boolean hasMetric(String name);

public abstract double metric(String name, long owningBucketOrd);

@Override
public void validateSortPathKey(String key) {
if (key == null) {
throw new IllegalArgumentException("When ordering on a multi-value metrics aggregation a metric name must be specified.");
}
if (false == hasMetric(key)) {
throw new IllegalArgumentException(
"Unknown metric name [" + key + "] on multi-value metrics aggregation [" + name() + "]");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public List<String> getPathElementsAsStringList() {
return stringPathElements;
}

private AggregationPath subPath(int offset, int length) {
public AggregationPath subPath(int offset, int length) {
List<PathElement> subTokens = new ArrayList<>(pathElements.subList(offset, offset + length));
return new AggregationPath(subTokens);
}
Expand All @@ -196,29 +196,38 @@ public double resolveValue(InternalAggregations aggregations) {
assert path.hasNext();
return aggregations.sortValue(path.next(), path);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid aggregation order path [" + this + "]. " + e.getMessage(), e);
throw new IllegalArgumentException("Invalid order path [" + this + "]. " + e.getMessage(), e);
}
}

/**
* Resolves the {@linkplain Aggregator} pointed to by this path against
* the given root {@linkplain Aggregator}.
* Resolves the aggregator pointed by this path using the given root as a point of reference.
*
* @param root The point of reference of this path
* @return The aggregator pointed by this path starting from the given aggregator as a point of reference
*/
public Aggregator resolveAggregator(Aggregator root) {
Iterator<PathElement> path = pathElements.iterator();
assert path.hasNext();
return root.resolveSortPathOnValidAgg(path.next(), path);
Aggregator aggregator = root;
for (int i = 0; i < pathElements.size(); i++) {
AggregationPath.PathElement token = pathElements.get(i);
aggregator = ProfilingAggregator.unwrap(aggregator.subAggregator(token.name));
assert (aggregator instanceof SingleBucketAggregator && i <= pathElements.size() - 1)
|| (aggregator instanceof NumericMetricsAggregator && i == pathElements.size() - 1) :
"this should be picked up before aggregation execution - on validate";
}
return aggregator;
}

/**
* Resolves the {@linkplain Aggregator} pointed to by the first element
* of this path against the given root {@linkplain Aggregator}.
* Resolves the topmost aggregator pointed by this path using the given root as a point of reference.
*
* @param root The point of reference of this path
* @return The first child aggregator of the root pointed by this path
*/
public Aggregator resolveTopmostAggregator(Aggregator root) {
AggregationPath.PathElement token = pathElements.get(0);
// TODO both unwrap and subAggregator are only used here!
Aggregator aggregator = ProfilingAggregator.unwrap(root.subAggregator(token.name));
assert (aggregator instanceof SingleBucketAggregator)
assert (aggregator instanceof SingleBucketAggregator )
|| (aggregator instanceof NumericMetricsAggregator) : "this should be picked up before aggregation execution - on validate";
return aggregator;
}
Expand All @@ -230,10 +239,76 @@ public Aggregator resolveTopmostAggregator(Aggregator root) {
* @throws AggregationExecutionException on validation error
*/
public void validate(Aggregator root) throws AggregationExecutionException {
try {
resolveAggregator(root).validateSortPathKey(lastPathElement().key);
} catch (IllegalArgumentException e) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this + "]. " + e.getMessage(), e);
Aggregator aggregator = root;
for (int i = 0; i < pathElements.size(); i++) {
String name = pathElements.get(i).name;
aggregator = ProfilingAggregator.unwrap(aggregator.subAggregator(name));
if (aggregator == null) {
throw new AggregationExecutionException("Invalid aggregator order path [" + this + "]. The " +
"provided aggregation [" + name + "] either does not exist, or is a pipeline aggregation " +
"and cannot be used to sort the buckets.");
}

if (i < pathElements.size() - 1) {

// we're in the middle of the path, so the aggregator can only be a single-bucket aggregator

if (!(aggregator instanceof SingleBucketAggregator)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end. Sub-path [" +
subPath(0, i + 1) + "] points to non single-bucket aggregation");
}

if (pathElements.get(i).key != null) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a " +
"final single-bucket or a metrics aggregation at the path end. Sub-path [" +
subPath(0, i + 1) + "] points to non single-bucket aggregation");
}
}
}
boolean singleBucket = aggregator instanceof SingleBucketAggregator;
if (!singleBucket && !(aggregator instanceof NumericMetricsAggregator)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end.");
}

AggregationPath.PathElement lastToken = lastPathElement();

if (singleBucket) {
if (lastToken.key != null && !"doc_count".equals(lastToken.key)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Ordering on a single-bucket aggregation can only be done on its doc_count. " +
"Either drop the key (a la \"" + lastToken.name + "\") or change it to \"doc_count\" (a la \"" + lastToken.name +
".doc_count\")");
}
return; // perfectly valid to sort on single-bucket aggregation (will be sored on its doc_count)
}

if (aggregator instanceof NumericMetricsAggregator.SingleValue) {
if (lastToken.key != null && !"value".equals(lastToken.key)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Ordering on a single-value metrics aggregation can only be done on its value. " +
"Either drop the key (a la \"" + lastToken.name + "\") or change it to \"value\" (a la \"" + lastToken.name +
".value\")");
}
return; // perfectly valid to sort on single metric aggregation (will be sorted on its associated value)
}

// the aggregator must be of a multi-value metrics type
if (lastToken.key == null) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. When ordering on a multi-value metrics aggregation a metric name must be specified");
}

if (!((NumericMetricsAggregator.MultiValue) aggregator).hasMetric(lastToken.key)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Unknown metric name [" + lastToken.key + "] on multi-value metrics aggregation [" + lastToken.name + "]");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.Timer;

import java.io.IOException;
import java.util.Iterator;

public class ProfilingAggregator extends Aggregator {

Expand Down Expand Up @@ -72,16 +70,6 @@ public Aggregator subAggregator(String name) {
return delegate.subAggregator(name);
}

@Override
public Aggregator resolveSortPath(PathElement next, Iterator<PathElement> path) {
return delegate.resolveSortPath(next, path);
}

@Override
public void validateSortPathKey(String key) {
delegate.validateSortPathKey(key);
}

@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
Timer timer = profileBreakdown.getTimer(AggregationTimingType.BUILD_AGGREGATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,7 @@ public void testOrderByPipelineAggregation() throws Exception {

AggregationExecutionException e = expectThrows(AggregationExecutionException.class,
() -> createAggregator(termsAgg, indexSearcher, fieldType));
assertEquals("Invalid aggregation order path [script]. The provided aggregation [script] " +
assertEquals("Invalid aggregator order path [script]. The provided aggregation [script] " +
"either does not exist, or is a pipeline aggregation and cannot be used to sort the buckets.",
e.getMessage());
}
Expand Down

0 comments on commit b3afbe7

Please sign in to comment.