Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] rewrite deprecated datafeed configs to use new date_histogram interval fields #52538

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer;
Expand All @@ -35,6 +37,12 @@ class AggProvider implements Writeable, ToXContentObject {

static AggProvider fromXContent(XContentParser parser, boolean lenient) throws IOException {
Map<String, Object> aggs = parser.mapOrdered();
// NOTE: Always rewrite potentially old date histogram intervals.
// This should occur in 8.x+ but not 7.x.
// 7.x is BWC with versions that do not support the new date_histogram fields
if (lenient) {
rewriteDateHistogramInterval(aggs, false);
}
AggregatorFactories.Builder parsedAggs = null;
Exception exception = null;
try {
Expand All @@ -56,6 +64,35 @@ static AggProvider fromXContent(XContentParser parser, boolean lenient) throws I
return new AggProvider(aggs, parsedAggs, exception);
}

@SuppressWarnings("unchecked")
static boolean rewriteDateHistogramInterval(Map<String, Object> aggs, boolean inDateHistogram) {
boolean didRewrite = false;
if (aggs.containsKey(Histogram.INTERVAL_FIELD.getPreferredName()) && inDateHistogram) {
Object currentInterval = aggs.remove(Histogram.INTERVAL_FIELD.getPreferredName());
if (DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(currentInterval.toString()) != null) {
aggs.put("calendar_interval", currentInterval.toString());
didRewrite = true;
} else if (currentInterval instanceof Number) {
aggs.put("fixed_interval", ((Number)currentInterval).longValue() + "ms");
didRewrite = true;
} else if (currentInterval instanceof String) {
aggs.put("fixed_interval", currentInterval.toString());
didRewrite = true;
} else {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT,
new IllegalArgumentException("unable to parse date_histogram interval parameter"));
}
}
for(Map.Entry<String, Object> entry : aggs.entrySet()) {
if (entry.getValue() instanceof Map<?, ?>) {
boolean rewrite = rewriteDateHistogramInterval((Map<String, Object>)entry.getValue(),
entry.getKey().equals(DateHistogramAggregationBuilder.NAME));
didRewrite = didRewrite || rewrite;
}
}
return didRewrite;
}

static AggProvider fromParsedAggs(AggregatorFactories.Builder parsedAggs) throws IOException {
return parsedAggs == null ?
null :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -89,6 +90,82 @@ public void testEmptyAggMap() throws IOException {
assertThat(e.getMessage(), equalTo("Datafeed aggregations are not parsable"));
}

public void testRewriteBadNumericInterval() {
long numericInterval = randomNonNegativeLong();
Map<String, Object> maxTime = Collections.singletonMap("max", Collections.singletonMap("field", "time"));
Map<String, Object> numericDeprecated = new HashMap<>(){{
put("interval", numericInterval);
put("field", "foo");
put("aggs", Collections.singletonMap("time", maxTime));
}};
Map<String, Object> expected = new HashMap<>() {{
put("fixed_interval", numericInterval + "ms");
put("field", "foo");
put("aggs", Collections.singletonMap("time", maxTime));
}};
Map<String, Object> deprecated = Collections.singletonMap("buckets", Collections.singletonMap("date_histogram", numericDeprecated));
assertTrue(AggProvider.rewriteDateHistogramInterval(deprecated, false));
assertThat(deprecated, equalTo(Collections.singletonMap("buckets", Collections.singletonMap("date_histogram", expected))));

numericDeprecated = new HashMap<>(){{
put("interval", numericInterval + "ms");
put("field", "foo");
put("aggs", Collections.singletonMap("time", maxTime));
}};
deprecated = Collections.singletonMap("date_histogram", Collections.singletonMap("date_histogram", numericDeprecated));
assertTrue(AggProvider.rewriteDateHistogramInterval(deprecated, false));
assertThat(deprecated,
equalTo(Collections.singletonMap("date_histogram", Collections.singletonMap("date_histogram", expected))));
}

public void testRewriteBadCalendarInterval() {
String calendarInterval = "1w";
Map<String, Object> maxTime = Collections.singletonMap("max", Collections.singletonMap("field", "time"));
Map<String, Object> calendarDeprecated = new HashMap<>(){{
put("interval", calendarInterval);
put("field", "foo");
put("aggs", Collections.singletonMap("time", maxTime));
}};
Map<String, Object> expected = new HashMap<>() {{
put("calendar_interval", calendarInterval);
put("field", "foo");
put("aggs", Collections.singletonMap("time", maxTime));
}};
Map<String, Object> deprecated = Collections.singletonMap("buckets",
Collections.singletonMap("date_histogram", calendarDeprecated));
assertTrue(AggProvider.rewriteDateHistogramInterval(deprecated, false));
assertThat(deprecated, equalTo(Collections.singletonMap("buckets", Collections.singletonMap("date_histogram", expected))));

calendarDeprecated = new HashMap<>(){{
put("interval", calendarInterval);
put("field", "foo");
put("aggs", Collections.singletonMap("time", maxTime));
}};
deprecated = Collections.singletonMap("date_histogram", Collections.singletonMap("date_histogram", calendarDeprecated));
assertTrue(AggProvider.rewriteDateHistogramInterval(deprecated, false));
assertThat(deprecated,
equalTo(Collections.singletonMap("date_histogram", Collections.singletonMap("date_histogram", expected))));
}

public void testRewriteWhenNoneMustOccur() {
String calendarInterval = "1w";
Map<String, Object> maxTime = Collections.singletonMap("max", Collections.singletonMap("field", "time"));
Map<String, Object> calendarDeprecated = new HashMap<>(){{
put("calendar_interval", calendarInterval);
put("field", "foo");
put("aggs", Collections.singletonMap("time", maxTime));
}};
Map<String, Object> expected = new HashMap<>() {{
put("calendar_interval", calendarInterval);
put("field", "foo");
put("aggs", Collections.singletonMap("time", maxTime));
}};
Map<String, Object> current = Collections.singletonMap("buckets", Collections.singletonMap("date_histogram", calendarDeprecated));
assertFalse(AggProvider.rewriteDateHistogramInterval(current, false));
assertThat(current,
equalTo(Collections.singletonMap("buckets", Collections.singletonMap("date_histogram", expected))));
}

@Override
protected AggProvider mutateInstance(AggProvider instance) throws IOException {
Exception parsingException = instance.getParsingException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,6 @@

public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedConfig> {

@AwaitsFix(bugUrl = "Tests need to be updated to use calendar/fixed interval explicitly")
public void testIntervalWarnings() {
/*
Placeholder test for visibility. Datafeeds use calendar and fixed intervals through the deprecated
methods. The randomized creation + final superclass tests made it impossible to add warning assertions,
so warnings have been disabled on this test.

When fixed, `enableWarningsCheck()` should be removed.
*/
}

@Override
protected boolean enableWarningsCheck() {
return false;
}

@Override
protected DatafeedConfig createTestInstance() {
return createRandomizedDatafeedConfig(randomAlphaOfLength(10));
Expand Down Expand Up @@ -234,7 +218,7 @@ protected DatafeedConfig doParseInstance(XContentParser parser) {
" }\n" +
"}";

private static final String MULTIPLE_AGG_DEF_DATAFEED = "{\n" +
private static final String AGG_WITH_OLD_DATE_HISTOGRAM_INTERVAL = "{\n" +
" \"datafeed_id\": \"farequote-datafeed\",\n" +
" \"job_id\": \"farequote\",\n" +
" \"frequency\": \"1h\",\n" +
Expand All @@ -252,12 +236,33 @@ protected DatafeedConfig doParseInstance(XContentParser parser) {
" }\n" +
" }\n" +
" }\n" +
" }\n" +
"}";

private static final String MULTIPLE_AGG_DEF_DATAFEED = "{\n" +
" \"datafeed_id\": \"farequote-datafeed\",\n" +
" \"job_id\": \"farequote\",\n" +
" \"frequency\": \"1h\",\n" +
" \"indices\": [\"farequote1\", \"farequote2\"],\n" +
" \"aggregations\": {\n" +
" \"buckets\": {\n" +
" \"date_histogram\": {\n" +
" \"field\": \"time\",\n" +
" \"fixed_interval\": \"360s\",\n" +
" \"time_zone\": \"UTC\"\n" +
" },\n" +
" \"aggregations\": {\n" +
" \"time\": {\n" +
" \"max\": {\"field\": \"time\"}\n" +
" }\n" +
" }\n" +
" }\n" +
" }," +
" \"aggs\": {\n" +
" \"buckets2\": {\n" +
" \"date_histogram\": {\n" +
" \"field\": \"time\",\n" +
" \"interval\": \"360s\",\n" +
" \"fixed_interval\": \"360s\",\n" +
" \"time_zone\": \"UTC\"\n" +
" },\n" +
" \"aggregations\": {\n" +
Expand Down Expand Up @@ -313,6 +318,25 @@ public void testPastAggConfigParse() throws IOException {
}
}

public void testPastAggConfigOldDateHistogramParse() throws IOException {
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
AGG_WITH_OLD_DATE_HISTOGRAM_INTERVAL)) {

DatafeedConfig datafeedConfig = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build();
assertThat(datafeedConfig.getParsedAggregations(xContentRegistry()), is(not(nullValue())));
}

try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_AGG_DATAFEED)) {

XContentParseException e = expectThrows(XContentParseException.class,
() -> DatafeedConfig.STRICT_PARSER.apply(parser, null).build());
assertEquals("[25:3] [datafeed_config] failed to parse field [aggregations]", e.getMessage());
}
}

public void testFutureMetadataParse() throws IOException {
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, FUTURE_DATAFEED);
Expand Down Expand Up @@ -503,7 +527,9 @@ public void testHasAggregations_NonEmpty() {
builder.setIndices(Collections.singletonList("myIndex"));
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(300000).subAggregation(maxTime).field("time")));
AggregationBuilders.dateHistogram("time")
.fixedInterval(new DateHistogramInterval(300000 + "ms"))
.subAggregation(maxTime).field("time")));
DatafeedConfig datafeedConfig = builder.build();

assertThat(datafeedConfig.hasAggregations(), is(true));
Expand Down Expand Up @@ -535,21 +561,13 @@ public void testBuild_GivenHistogramWithDefaultInterval() {
public void testBuild_GivenDateHistogramWithInvalidTimeZone() {
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("bucket").field("time")
.interval(300000L).timeZone(ZoneId.of("CET")).subAggregation(maxTime);
.fixedInterval(new DateHistogramInterval("30000ms")).timeZone(ZoneId.of("CET")).subAggregation(maxTime);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> createDatafeedWithDateHistogram(dateHistogram));

assertThat(e.getMessage(), equalTo("ML requires date_histogram.time_zone to be UTC"));
}

@AwaitsFix(bugUrl = "Needs ML to look at and fix. Unclear how this should be handled, interval is not an optional param")
public void testBuild_GivenDateHistogramWithDefaultInterval() {
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> createDatafeedWithDateHistogram((String) null));

assertThat(e.getMessage(), containsString("Aggregation interval must be greater than 0"));
}

public void testBuild_GivenValidDateHistogram() {
long millisInDay = 24 * 3600000L;

Expand All @@ -568,9 +586,9 @@ public void testBuild_GivenValidDateHistogram() {

public void testBuild_GivenDateHistogramWithMoreThanCalendarWeek() {
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> createDatafeedWithDateHistogram("8d"));
() -> createDatafeedWithDateHistogram("month"));

assertThat(e.getMessage(), containsString("When specifying a date_histogram calendar interval [8d]"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should still be a valid test. It's testing the code in

if (interval.days() > 7) {
throw ExceptionsHelper.badRequestException(invalidDateHistogramCalendarIntervalMessage(calendarInterval));
}
return interval.millis();
}
private static String invalidDateHistogramCalendarIntervalMessage(String interval) {
throw ExceptionsHelper.badRequestException("When specifying a date_histogram calendar interval ["
+ interval + "], ML does not accept intervals longer than a week because of " +
"variable lengths of periods greater than a week");
}

Did this test fail after the other changes in this PR? If so that might be a sign of a problem.

Even though histograms in general can plot variable length time buckets (e.g. months), it's still reasonable for ML to prohibit such bucket spans as they would make the modelling much more complex.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the test back and fixed it

assertThat(e.getMessage(), containsString("When specifying a date_histogram calendar interval [month]"));
}

public void testDefaultChunkingConfig_GivenAggregations() {
Expand Down Expand Up @@ -711,7 +729,7 @@ public void testSerializationOfComplexAggs() throws IOException {
new Script("params.bytes > 0 ? params.bytes : null"));
DateHistogramAggregationBuilder dateHistogram =
AggregationBuilders.dateHistogram("histogram_buckets")
.field("timestamp").interval(300000).timeZone(ZoneOffset.UTC)
.field("timestamp").fixedInterval(new DateHistogramInterval("300000ms")).timeZone(ZoneOffset.UTC)
.subAggregation(maxTime)
.subAggregation(avgAggregationBuilder)
.subAggregation(derivativePipelineAggregationBuilder)
Expand Down Expand Up @@ -763,7 +781,7 @@ public void testSerializationOfComplexAggsBetweenVersions() throws IOException {
new Script("params.bytes > 0 ? params.bytes : null"));
DateHistogramAggregationBuilder dateHistogram =
AggregationBuilders.dateHistogram("histogram_buckets")
.field("timestamp").interval(300000).timeZone(ZoneOffset.UTC)
.field("timestamp").fixedInterval(new DateHistogramInterval("30000ms")).timeZone(ZoneOffset.UTC)
.subAggregation(maxTime)
.subAggregation(avgAggregationBuilder)
.subAggregation(derivativePipelineAggregationBuilder)
Expand Down Expand Up @@ -813,7 +831,11 @@ private static DatafeedConfig createDatafeedWithDateHistogram(String interval) {
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("buckets").subAggregation(maxTime).field("time");
if (interval != null) {
dateHistogram.dateHistogramInterval(new DateHistogramInterval(interval));
if (DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(interval) != null) {
dateHistogram.calendarInterval(new DateHistogramInterval(interval));
} else {
dateHistogram.fixedInterval(new DateHistogramInterval(interval));
}
}
return createDatafeedWithDateHistogram(dateHistogram);
}
Expand All @@ -822,7 +844,7 @@ private static DatafeedConfig createDatafeedWithDateHistogram(Long interval) {
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("buckets").subAggregation(maxTime).field("time");
if (interval != null) {
dateHistogram.interval(interval);
dateHistogram.fixedInterval(new DateHistogramInterval(interval + "ms"));
}
return createDatafeedWithDateHistogram(dateHistogram);
}
Expand Down Expand Up @@ -879,9 +901,12 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOExcept
} else {
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
String timeField = randomAlphaOfLength(10);
long fixedInterval = between(10000, 3600000);

aggBuilder
.addAggregator(new DateHistogramAggregationBuilder(timeField).field(timeField).interval(between(10000, 3600000))
.subAggregation(new MaxAggregationBuilder(timeField).field(timeField)));
.addAggregator(new DateHistogramAggregationBuilder(timeField).field(timeField)
.fixedInterval(new DateHistogramInterval(fixedInterval + "ms"))
.subAggregation(new MaxAggregationBuilder(timeField).field(timeField)));
builder.setParsedAggregations(aggBuilder);
if (instance.getScriptFields().isEmpty() == false) {
builder.setScriptFields(Collections.emptyList());
Expand Down
Loading