Skip to content

Commit

Permalink
[Transform] fix transform failure case for percentiles and spa… (#54202)
Browse files Browse the repository at this point in the history
index null if percentiles could not be calculated due to sparse data

fixes #54201
  • Loading branch information
Hendrik Muhs committed Mar 25, 2020
1 parent 8c278b9 commit e724b86
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@

package org.elasticsearch.xpack.transform.integration;

import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;

public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
Expand Down Expand Up @@ -102,4 +107,110 @@ public void testIndexTemplateMappingClash() throws Exception {
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.rating.avg", searchResult)).get(0);
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
}

public void testSparseDataPercentiles() throws Exception {
String indexName = "cpu-utilization";
String transformIndex = "pivot-cpu";
String transformId = "pivot-cpu";

try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("mappings")
.startObject("properties")
.startObject("host")
.field("type", "keyword")
.endObject()
.startObject("cpu")
.field("type", "integer")
.endObject()
.endObject()
.endObject();
}
builder.endObject();
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
Request req = new Request("PUT", indexName);
req.setEntity(entity);
client().performRequest(req);
}

final StringBuilder bulk = new StringBuilder();
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
bulk.append("{\"host\":\"host-1\",\"cpu\": 22}\n");
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
bulk.append("{\"host\":\"host-1\",\"cpu\": 55}\n");
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
bulk.append("{\"host\":\"host-1\",\"cpu\": 23}\n");
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
bulk.append("{\"host\":\"host-2\",\"cpu\": 0}\n");
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
bulk.append("{\"host\":\"host-2\",\"cpu\": 99}\n");
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
bulk.append("{\"host\":\"host-1\",\"cpu\": 28}\n");
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
bulk.append("{\"host\":\"host-1\",\"cpu\": 77}\n");

// missing value for cpu
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
bulk.append("{\"host\":\"host-3\"}\n");
bulk.append("\r\n");
final Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);

final Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId);

String config = "{" + " \"source\": {\"index\":\"" + indexName + "\"}," + " \"dest\": {\"index\":\"" + transformIndex + "\"},";

config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"host\": {"
+ " \"terms\": {"
+ " \"field\": \"host\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"p\": {"
+ " \"percentiles\": {"
+ " \"field\": \"cpu\""
+ " } }"
+ " } }"
+ "}";

createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

startAndWaitForTransform(transformId, transformIndex);
assertTrue(indexExists(transformIndex));

Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats));

// get and check some data
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=host:host-1");

assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
@SuppressWarnings("unchecked")
Map<String, Object> percentiles = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue(
"hits.hits._source.p",
searchResult
)).get(0);

assertEquals(28.0, (double) percentiles.get("50"), 0.000001);
assertEquals(77.0, (double) percentiles.get("99"), 0.000001);

searchResult = getAsMap(transformIndex + "/_search?q=host:host-3");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));

@SuppressWarnings("unchecked")
Map<String, Object> percentilesEmpty = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue(
"hits.hits._source.p",
searchResult
)).get(0);
assertTrue(percentilesEmpty.containsKey("50"));
assertNull(percentilesEmpty.get("50"));
assertTrue(percentilesEmpty.containsKey("99"));
assertNull(percentilesEmpty.get("99"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,16 @@ static class PercentilesAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
Percentiles aggregation = (Percentiles) agg;

HashMap<String, Double> percentiles = new HashMap<>();

for (Percentile p : aggregation) {
percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), p.getValue());
// in case of sparse data percentiles might not have data, in this case it returns NaN,
// we need to guard the output and set null in this case
if (Numbers.isValidDouble(p.getValue()) == false) {
percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), null);
} else {
percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), p.getValue());
}
}

return percentiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,11 @@ public void testPercentilesAggExtractor() {
);
}

public void testPercentilesAggExtractorNaN() {
Aggregation agg = createPercentilesAgg("p_agg", Arrays.asList(new Percentile(1, Double.NaN), new Percentile(50, Double.NaN)));
assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""), equalTo(asMap("1", null, "50", null)));
}

public static SingleBucketAggregation createSingleBucketAgg(String name, long docCount, Aggregation... subAggregations) {
SingleBucketAggregation agg = mock(SingleBucketAggregation.class);
when(agg.getDocCount()).thenReturn(docCount);
Expand Down

0 comments on commit e724b86

Please sign in to comment.