Skip to content

Commit

Permalink
Aggregation support for Value Scripts that change types (#54830)
Browse files Browse the repository at this point in the history
  • Loading branch information
not-napoleon authored Apr 24, 2020
1 parent 32317a6 commit 754e3ca
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,28 +110,19 @@ private static ValuesSourceConfig internalResolve(QueryShardContext context,
) {
ValuesSourceConfig config;
MappedFieldType fieldType = null;
ValuesSourceType valuesSourceType;
ValueType scriptValueType = null;
AggregationScript.LeafFactory aggregationScript = null;
ValuesSourceType valuesSourceType = null;
ValueType scriptValueType = userValueTypeHint;
AggregationScript.LeafFactory aggregationScript = createScript(script, context); // returns null if script is null
boolean unmapped = false;
if (userValueTypeHint != null) {
// If the user gave us a type hint, respect that.
valuesSourceType = userValueTypeHint.getValuesSourceType();
}
if (field == null) {
// Stand Alone Script Case
if (script == null) {
throw new IllegalStateException(
"value source config is invalid; must have either a field context or a script or marked as unmapped");
}
/*
* This is the Stand Alone Script path. We should have a script that will produce a value independent of the presence or
* absence of any one field. The type of the script is given by the userValueTypeHint field, if the user specified a type,
* or the aggregation's default type if the user didn't.
*/
if (userValueTypeHint != null) {
valuesSourceType = userValueTypeHint.getValuesSourceType();
} else {
valuesSourceType = defaultValueSourceType;
"value source config is invalid; must have either a field or a script");
}
aggregationScript = createScript(script, context);
scriptValueType = userValueTypeHint;
} else {
// Field case
fieldType = context.fieldMapper(field);
Expand All @@ -141,22 +132,17 @@ private static ValuesSourceConfig internalResolve(QueryShardContext context,
* pattern. In this case, we're going to end up using the EMPTY variant of the ValuesSource, and possibly applying a user
* specified missing value.
*/
if (userValueTypeHint != null) {
valuesSourceType = userValueTypeHint.getValuesSourceType();
} else {
valuesSourceType = defaultValueSourceType;
}
unmapped = true;
if (userValueTypeHint != null) {
// todo do we really need this for unmapped?
scriptValueType = userValueTypeHint;
}
} else {
aggregationScript = null; // Value scripts are not allowed on unmapped fields. What would that do, anyway?
} else if (valuesSourceType == null) {
// We have a field, and the user didn't specify a type, so get the type from the field
valuesSourceType = fieldResolver.getValuesSourceType(context, fieldType, aggregationName, userValueTypeHint,
defaultValueSourceType);
aggregationScript = createScript(script, context);
}
}
if (valuesSourceType == null) {
valuesSourceType = defaultValueSourceType;
}
config = new ValuesSourceConfig(valuesSourceType, fieldType, unmapped, aggregationScript, scriptValueType , context);
config.format(resolveFormat(format, valuesSourceType, timeZone, fieldType));
config.missing(missing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,14 @@ public void testBadIncludeExclude() throws IOException {
));
assertThat(e.getMessage(), equalTo("Aggregation [_name] cannot support regular expression style include/exclude settings as " +
"they can only be applied to string fields. Use an array of values for include/exclude clauses"));
}

e = expectThrows(AggregationExecutionException.class, () -> testBothCases(new MatchNoDocsQuery(), dataset,
aggregation -> aggregation.field(BINARY_FIELD).includeExclude(includeExclude),
public void testBadUserValueTypeHint() throws IOException {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testBothCases(new MatchNoDocsQuery(), dataset,
aggregation -> aggregation.field(BINARY_FIELD),
agg -> fail("test should have failed with exception"), ValueType.NUMERIC // numeric type hint
));
assertThat(e.getMessage(), equalTo("Aggregation [_name] cannot support regular expression style include/exclude settings as " +
"they can only be applied to string fields. Use an array of values for include/exclude clauses"));
assertThat(e.getMessage(), equalTo("Expected numeric type on field [binary], but got [binary]"));
}

private void testSearchCase(Query query, List<Long> dataset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
Expand Down Expand Up @@ -98,8 +103,10 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

import static java.util.Collections.singleton;
import static org.elasticsearch.index.mapper.SeqNoFieldMapper.PRIMARY_TERM_NAME;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.bucketScript;
Expand All @@ -111,6 +118,26 @@ public class TermsAggregatorTests extends AggregatorTestCase {

private boolean randomizeAggregatorImpl = true;

// Constants for a script that returns a string
private static final String STRING_SCRIPT_NAME = "string_script";
private static final String STRING_SCRIPT_OUTPUT = "Orange";

@Override
protected ScriptService getMockScriptService() {
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
Map<String, Function<Map<String, Object>, Object>> nonDeterministicScripts = new HashMap<>();

scripts.put(STRING_SCRIPT_NAME, value -> STRING_SCRIPT_OUTPUT);

MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME,
scripts,
nonDeterministicScripts,
Collections.emptyMap());
Map<String, ScriptEngine> engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine);

return new ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS);
}

protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder,
IndexSearcher indexSearcher, MappedFieldType... fieldTypes) throws IOException {
try {
Expand Down Expand Up @@ -1245,6 +1272,26 @@ public void testWithNestedAggregations() throws IOException {
}
}

public void testNumberToStringValueScript() throws IOException {
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.INTEGER);
fieldType.setName("number");

TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("name")
.userValueTypeHint(ValueType.STRING)
.field("number")
.script(new Script(ScriptType.INLINE, MockScriptEngine.NAME, STRING_SCRIPT_NAME, Collections.emptyMap()));

testCase(aggregationBuilder, new MatchAllDocsQuery(), iw -> {
final int numDocs = 10;
for (int i = 0; i < numDocs; i++) {
iw.addDocument(singleton(new NumericDocValuesField("number", i + 1)));
}
}, (Consumer<InternalTerms>) terms -> {
assertTrue(AggregationInspectionHelper.hasValue(terms));
}, fieldType);
}


private void assertNestedTopHitsScore(InternalMultiBucketAggregation<?, ?> terms, boolean withScore) {
assertThat(terms.getBuckets().size(), equalTo(9));
int ptr = 9;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,26 @@

package org.elasticsearch.search.aggregations.metrics;

import static java.util.Collections.singleton;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.Field;
Expand Down Expand Up @@ -74,7 +94,6 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
Expand All @@ -91,26 +110,6 @@
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.LeafDocLookup;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import static java.util.Collections.singleton;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MinAggregatorTests extends AggregatorTestCase {

private final String SCRIPT_NAME = "script_name";
Expand Down Expand Up @@ -931,24 +930,4 @@ private void testCase(Query query,
MinAggregationBuilder aggregationBuilder = new MinAggregationBuilder("min").field("number");
testCase(aggregationBuilder, query, buildIndex, verify, fieldType);
}

private <T extends AggregationBuilder, V extends InternalAggregation> void testCase(T aggregationBuilder, Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<V> verify, MappedFieldType fieldType) throws IOException {
try (Directory directory = newDirectory()) {
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
buildIndex.accept(indexWriter);
indexWriter.close();

try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);

V agg = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType);
verify.accept(agg);

}
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,6 @@
*/
package org.elasticsearch.search.aggregations;

import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Document;
Expand Down Expand Up @@ -68,6 +48,7 @@
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
Expand Down Expand Up @@ -136,6 +117,27 @@
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Base class for testing {@link Aggregator} implementations.
* Provides helpers for constructing and searching an {@link Aggregator} implementation based on a provided
Expand Down Expand Up @@ -537,6 +539,25 @@ protected void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketCo
InternalAggregationTestCase.assertMultiBucketConsumer(agg, bucketConsumer);
}

protected <T extends AggregationBuilder,
V extends InternalAggregation> void testCase(T aggregationBuilder, Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<V> verify, MappedFieldType fieldType) throws IOException {
try (Directory directory = newDirectory()) {
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
buildIndex.accept(indexWriter);
indexWriter.close();

try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);

V agg = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType);
verify.accept(agg);

}
}
}

private static class ShardSearcher extends IndexSearcher {
private final List<LeafReaderContext> ctx;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -43,7 +39,6 @@
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
Expand Down Expand Up @@ -435,24 +430,4 @@ private void testCase(Query query,
testCase(aggregationBuilder, query, buildIndex, verify, fieldType);
}

private <T extends AggregationBuilder, V extends InternalAggregation> void testCase(
T aggregationBuilder, Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<V> verify, MappedFieldType fieldType) throws IOException {
try (Directory directory = newDirectory()) {
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
buildIndex.accept(indexWriter);
indexWriter.close();

try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);

V agg = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType);
verify.accept(agg);

}
}
}


}
Loading

0 comments on commit 754e3ca

Please sign in to comment.