Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Avoid possible datafeed infinite loop with filtering aggregations #104722

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/104722.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 104722
summary: Avoid possible datafeed infinite loop with filtering aggregations
area: Machine Learning
type: bug
issues:
- 104699
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,91 @@ public void testLookbackOnlyGivenAggregationsWithHistogram() throws Exception {
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}

/**
* This test confirms the fix for <a href="https://github.com/elastic/elasticsearch/issues/104699">the issue
* where a datafeed with aggregations that filter everything for a bucket can go into an infinite loop</a>.
* In this test the filter in the aggregation is crazy as it literally filters everything. Real users would
* have a filter that only occasionally results in no results from the aggregation while the query alone
* returns data. But the code path that's exercised is the same.
*/
public void testLookbackOnlyGivenAggregationsWithHistogramAndBucketFilter() throws Exception {
String jobId = "aggs-histogram-filter-job";
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("""
{
"description": "Aggs job with dodgy filter",
"analysis_config": {
"bucket_span": "1h",
"summary_count_field_name": "doc_count",
"detectors": [
{
"function": "mean",
"field_name": "responsetime",
"by_field_name": "airline"
}
]
},
"data_description": {"time_field": "time stamp"}
}""");
client().performRequest(createJobRequest);

String datafeedId = "datafeed-" + jobId;
// The "filter_everything" aggregation in here means the output is always empty.
String aggregations = """
{
"buckets": {
"histogram": {
"field": "time stamp",
"interval": 3600000
},
"aggregations": {
"time stamp": {
"max": {
"field": "time stamp"
}
},
"filter_everything" : {
"filter": {
"term" : {
"airline": "does not exist"
}
},
"aggregations": {
"airline": {
"terms": {
"field": "airline",
"size": 10
},
"aggregations": {
"responsetime": {
"avg": {
"field": "responsetime"
}
}
}
}
}
}
}
}
}""";
// The chunking timespan of 1 hour here must be less than the span of the data in order for the problem to be reproduced
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs").setChunkingTimespan("1h").setAggregations(aggregations).build();
openJob(client(), jobId);

startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest(
new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")
);
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":0"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":0"));
assertThat(jobStatsResponseAsString, containsString("\"bucket_count\":0"));

// The most important thing this test is asserting is that we don't go into an infinite loop!
}

public void testLookbackOnlyGivenAggregationsWithDateHistogram() throws Exception {
String jobId = "aggs-date-histogram-job";
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,22 @@ private Result getNextStream() throws IOException {
if (isNewSearch && hasNext()) {
// If it was a new search it means it returned 0 results. Thus,
// we reconfigure and jump to the next time interval where there are data.
// In theory, if everything is consistent, it would be sufficient to call
// setUpChunkedSearch() here. However, the way that works is to take the
// query from the datafeed config and add on some simple aggregations.
// These aggregations are completely separate from any that might be defined
// in the datafeed config. It is possible that the aggregations in the
// datafeed config rather than the query are responsible for no data being
// found. For example, "filter" or "bucket_selector" aggregations can do this.
// Originally we thought this situation would never happen, with the query
// selecting data and the aggregations just grouping it, but recently we've
// seen cases of users filtering in the aggregations. Therefore, we
// unconditionally advance the start time by one chunk here. setUpChunkedSearch()
// might then advance substantially further, but in the pathological cases
// where setUpChunkedSearch() thinks data exists at the current start time
// while the datafeed's own aggregation doesn't, at least we'll step forward
// a little bit rather than go into an infinite loop.
currentStart += chunkSpan;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checking, but adding this wouldn't accidentally skip some data if the data did exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because if there was data then the test on line 174 should have passed and we should have returned the data on line 175, so exited from the function before this point.

On line 172 the time period we searched was [currentStart, currentStart + chunkSpan), so we should have found the data then. This is why it should be completely safe to step forward by chunkSpan. It might be possible to step forward further, which is what line 197 is trying to do.

setUpChunkedSearch();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,12 +431,12 @@ public void testExtractionGivenAutoChunkAndIntermediateEmptySearchShouldReconfig

InputStream inputStream1 = mock(InputStream.class);

DataExtractor subExtactor1 = new StubSubExtractor(new SearchInterval(100_000L, 200_000L), inputStream1);
when(dataExtractorFactory.newExtractor(100000L, 200000L)).thenReturn(subExtactor1);
DataExtractor subExtractor1 = new StubSubExtractor(new SearchInterval(100_000L, 200_000L), inputStream1);
when(dataExtractorFactory.newExtractor(100000L, 200000L)).thenReturn(subExtractor1);

// This one is empty
DataExtractor subExtactor2 = new StubSubExtractor(new SearchInterval(200_000L, 300_000L));
when(dataExtractorFactory.newExtractor(200000, 300000L)).thenReturn(subExtactor2);
DataExtractor subExtractor2 = new StubSubExtractor(new SearchInterval(200_000L, 300_000L));
when(dataExtractorFactory.newExtractor(200000, 300000L)).thenReturn(subExtractor2);

assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream1, extractor.next().data().get());
Expand All @@ -447,8 +447,8 @@ public void testExtractionGivenAutoChunkAndIntermediateEmptySearchShouldReconfig

// This is the last one
InputStream inputStream2 = mock(InputStream.class);
DataExtractor subExtactor3 = new StubSubExtractor(new SearchInterval(200_000L, 400_000L), inputStream2);
when(dataExtractorFactory.newExtractor(200000, 400000)).thenReturn(subExtactor3);
DataExtractor subExtractor3 = new StubSubExtractor(new SearchInterval(200_000L, 400_000L), inputStream2);
when(dataExtractorFactory.newExtractor(200000, 400000)).thenReturn(subExtractor3);

assertEquals(inputStream2, extractor.next().data().get());
assertThat(extractor.next().data().isPresent(), is(false));
Expand All @@ -464,7 +464,7 @@ public void testExtractionGivenAutoChunkAndIntermediateEmptySearchShouldReconfig
String searchRequest = capturedSearchRequests.get(0).toString().replaceAll("\\s", "");
assertThat(searchRequest, containsString("\"gte\":100000,\"lt\":400000"));
searchRequest = capturedSearchRequests.get(1).toString().replaceAll("\\s", "");
assertThat(searchRequest, containsString("\"gte\":200000,\"lt\":400000"));
assertThat(searchRequest, containsString("\"gte\":300000,\"lt\":400000"));
}

public void testCancelGivenNextWasNeverCalled() {
Expand Down