Skip to content

Commit

Permalink
[7.x][Transform] add support for top metrics (elastic#72311)
Browse files Browse the repository at this point in the history
add support for the stats and top metrics aggregation in transform. With this change it became
easier to add more multi value aggregations to transform

Limitations:
 - only the 1st element of top_metrics gets consumed by transform[*].
 - all values of stats will be mapped to double if mapping deduction is used, including count,
   sum, min, max

fixes elastic#52236
relates elastic#51925
  • Loading branch information
Hendrik Muhs authored Apr 27, 2021
1 parent ca7db22 commit 6175b6d
Show file tree
Hide file tree
Showing 8 changed files with 378 additions and 24 deletions.
2 changes: 2 additions & 0 deletions docs/reference/rest-api/common-parms.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -730,8 +730,10 @@ currently supported:
* <<search-aggregations-metrics-percentile-aggregation,Percentiles>>
* <<search-aggregations-bucket-rare-terms-aggregation, Rare Terms>>
* <<search-aggregations-metrics-scripted-metric-aggregation,Scripted metric>>
* <<search-aggregations-metrics-stats-aggregation,Stats>>
* <<search-aggregations-metrics-sum-aggregation,Sum>>
* <<search-aggregations-bucket-terms-aggregation, Terms>>
* <<search-aggregations-metrics-top-metrics,Top Metrics>>
* <<search-aggregations-metrics-valuecount-aggregation,Value count>>
* <<search-aggregations-metrics-weight-avg-aggregation,Weighted average>>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,57 @@ public void testPivotWithWeightedAvgAgg() throws Exception {
assertEquals(4.47169811, actual.doubleValue(), 0.000001);
}

public void testPivotWithTopMetrics() throws Exception {
String transformId = "top_metrics_transform";
String transformIndex = "top_metrics_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);

final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);

String config = "{"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";

config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"top_business\": {"
+ " \"top_metrics\": {"
+ " \"metrics\": {\"field\": \"business_id\"},"
+ " \"sort\": {\"timestamp\": \"desc\"}"
+ "} } } }"
+ "}";

createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

startAndWaitForTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
assertTrue(indexExists(transformIndex));

Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
String actual = (String) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_business.business_id", searchResult)).get(0);
assertEquals("business_9", actual);

searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_1");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
actual = (String) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_business.business_id", searchResult)).get(0);
assertEquals("business_3", actual);
}

public void testManyBucketsWithSmallPageSize() throws Exception {
String transformId = "test_with_many_buckets";
String transformIndex = transformId + "-idx";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,10 @@ protected void createPivotReviewsTransform(
+ " \"affiliate_missing\": {"
+ " \"missing\": {"
+ " \"field\": \"affiliate_id\""

+ " } },"
+ " \"stats\": {"
+ " \"stats\": {"
+ " \"field\": \"stars\""
+ " } } } },"
+ "\"frequency\":\"1s\""
+ "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
import org.elasticsearch.search.aggregations.metrics.GeoBounds;
import org.elasticsearch.search.aggregations.metrics.GeoCentroid;
import org.elasticsearch.search.aggregations.metrics.MultiValueAggregation;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.MultiValue;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
import org.elasticsearch.search.aggregations.metrics.Percentile;
import org.elasticsearch.search.aggregations.metrics.Percentiles;
Expand Down Expand Up @@ -63,6 +65,8 @@ public final class AggregationResultUtils {
tempMap.put(SingleBucketAggregation.class.getName(), new SingleBucketAggExtractor());
tempMap.put(MultiBucketsAggregation.class.getName(), new MultiBucketsAggExtractor());
tempMap.put(GeoShapeMetricAggregation.class.getName(), new GeoShapeMetricAggExtractor());
tempMap.put(MultiValue.class.getName(), new NumericMultiValueAggExtractor());
tempMap.put(MultiValueAggregation.class.getName(), new MultiValueAggExtractor());
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
}

Expand Down Expand Up @@ -151,8 +155,14 @@ static AggValueExtractor getExtractor(Aggregation aggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoCentroid.class.getName());
} else if (aggregation instanceof GeoBounds) {
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoBounds.class.getName());
// note: percentiles is also a multi value agg, therefore check percentiles first
// TODO: can the Percentiles extractor be removed?
} else if (aggregation instanceof Percentiles) {
return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
} else if (aggregation instanceof MultiValue) {
return TYPE_VALUE_EXTRACTOR_MAP.get(MultiValue.class.getName());
} else if (aggregation instanceof MultiValueAggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(MultiValueAggregation.class.getName());
} else if (aggregation instanceof SingleBucketAggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(SingleBucketAggregation.class.getName());
} else if (aggregation instanceof MultiBucketsAggregation) {
Expand Down Expand Up @@ -259,6 +269,44 @@ public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lo
}
}

static class MultiValueAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
MultiValueAggregation aggregation = (MultiValueAggregation) agg;
Map<String, Object> extracted = new HashMap<>();
for (String valueName : aggregation.valueNames()) {
List<String> valueAsStrings = aggregation.getValuesAsStrings(valueName);

// todo: size > 1 is not supported, requires a refactoring so that `size()` is exposed in the agg builder
if (valueAsStrings.size() > 0) {
extracted.put(valueName, valueAsStrings.get(0));
}
}

return extracted;
}
}

static class NumericMultiValueAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
MultiValue aggregation = (MultiValue) agg;
Map<String, Object> extracted = new HashMap<>();

String fieldLookupPrefix = (lookupFieldPrefix.isEmpty() ? agg.getName() : lookupFieldPrefix + "." + agg.getName()) + ".";
for (String valueName : aggregation.valueNames()) {
double value = aggregation.value(valueName);

String fieldType = fieldTypeMap.get(fieldLookupPrefix + valueName);
if (Numbers.isValidDouble(value)) {
extracted.put(valueName, dropFloatingPointComponentIfTypeRequiresIt(fieldType, value));
}
}

return extracted;
}
}

static class PercentilesAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
Expand Down Expand Up @@ -402,12 +450,11 @@ static class GeoShapeMetricAggExtractor implements AggValueExtractor {

@Override
public Object value(Aggregation aggregation, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
assert aggregation instanceof GeoShapeMetricAggregation
: "Unexpected type ["
+ aggregation.getClass().getName()
+ "] for aggregation ["
+ aggregation.getName()
+ "]";
assert aggregation instanceof GeoShapeMetricAggregation : "Unexpected type ["
+ aggregation.getClass().getName()
+ "] for aggregation ["
+ aggregation.getName()
+ "]";
return ((GeoShapeMetricAggregation) aggregation).geoJSONGeometry();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -73,10 +74,8 @@ public final class TransformAggregations {
"sampler",
"significant_terms", // https://github.com/elastic/elasticsearch/issues/51073
"significant_text",
"stats", // https://github.com/elastic/elasticsearch/issues/51925
"string_stats", // https://github.com/elastic/elasticsearch/issues/51925
"top_hits",
"top_metrics", // https://github.com/elastic/elasticsearch/issues/52236
"t_test", // https://github.com/elastic/elasticsearch/issues/54503,
"variable_width_histogram", // https://github.com/elastic/elasticsearch/issues/58140
"rate", // https://github.com/elastic/elasticsearch/issues/61351
Expand Down Expand Up @@ -113,7 +112,9 @@ enum AggregationType {
FILTER("filter", LONG),
TERMS("terms", FLATTENED),
RARE_TERMS("rare_terms", FLATTENED),
MISSING("missing", LONG);
MISSING("missing", LONG),
TOP_METRICS("top_metrics", SOURCE),
STATS("stats", DOUBLE);

private final String aggregationType;
private final String targetMapping;
Expand Down Expand Up @@ -175,7 +176,32 @@ public static String resolveTargetMapping(String aggregationType, String sourceT
return agg.getTargetMapping();
}

/**
* Checks the aggregation object and returns a tuple with 2 maps:
*
* 1. mapping the name of the agg to the used field
* 2. mapping the name of the agg to the aggregation type
*
* Example:
* {
* "my_agg": {
* "max": {
* "field": "my_field"
* }}}
*
* creates ({ "my_agg": "my_field" }, { "my_agg": "max" })
*
* Both mappings can contain _multiple_ entries, e.g. due to sub aggregations or because of aggregations creating multiple
* values(e.g. percentiles)
*
* Note about order: aggregation can hit in multiple places (e.g. a multi value agg implement {@link ValuesSourceAggregationBuilder})
* Be careful changing the order in this method
*
* @param agg the aggregation builder
* @return a tuple with 2 mappings that maps the used field(s) and aggregation type(s)
*/
public static Tuple<Map<String, String>, Map<String, String>> getAggregationInputAndOutputTypes(AggregationBuilder agg) {
// todo: can this be removed?
if (agg instanceof PercentilesAggregationBuilder) {
PercentilesAggregationBuilder percentilesAgg = (PercentilesAggregationBuilder) agg;

Expand All @@ -185,15 +211,42 @@ public static Tuple<Map<String, String>, Map<String, String>> getAggregationInpu
Collections.emptyMap(),
Arrays.stream(percentilesAgg.percentiles())
.mapToObj(OutputFieldNameConverter::fromDouble)
.collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1))
.collect(
Collectors.toMap(p -> percentilesAgg.getName() + "." + p, p -> { return percentilesAgg.getType(); }, (p1, p2) -> p1)
)
);
}

// does the agg specify output field names
Optional<Set<String>> outputFieldNames = agg.getOutputFieldNames();
if (outputFieldNames.isPresent()) {
return new Tuple<>(
outputFieldNames.get()
.stream()
.collect(
Collectors.toMap(
outputField -> agg.getName() + "." + outputField,
outputField -> outputField,
(v1, v2) -> v1
)
),
outputFieldNames.get()
.stream()
.collect(
Collectors.toMap(
outputField -> agg.getName() + "." + outputField,
outputField -> agg.getType(),
(v1, v2) -> v1
)
)
);
}

if (agg instanceof ValuesSourceAggregationBuilder) {
ValuesSourceAggregationBuilder<?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?>) agg;
return new Tuple<>(
Collections.singletonMap(valueSourceAggregation.getName(), valueSourceAggregation.field()),
Collections.singletonMap(agg.getName(), agg.getType())
Collections.singletonMap(valueSourceAggregation.getName(), valueSourceAggregation.getType())
);
}

Expand Down
Loading

0 comments on commit 6175b6d

Please sign in to comment.