diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java index 0b675b171acba..523d8de0db3a1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java @@ -51,7 +51,7 @@ */ public class ChunkedDataExtractor implements DataExtractor { - private interface DataSummary { + interface DataSummary { long estimateChunk(); boolean hasData(); long earliestTime(); @@ -209,11 +209,11 @@ private DataSummary newScrolledDataSummary() { LOGGER.debug("[{}] Scrolling Data summary response was obtained", context.jobId); timingStatsReporter.reportSearchDuration(searchResponse.getTook()); - Aggregations aggregations = searchResponse.getAggregations(); long earliestTime = 0; long latestTime = 0; long totalHits = searchResponse.getHits().getTotalHits().value; if (totalHits > 0) { + Aggregations aggregations = searchResponse.getAggregations(); Min min = aggregations.get(EARLIEST_TIME); earliestTime = (long) min.getValue(); Max max = aggregations.get(LATEST_TIME); @@ -231,6 +231,13 @@ private DataSummary newAggregatedDataSummary() { timingStatsReporter.reportSearchDuration(searchResponse.getTook()); Aggregations aggregations = searchResponse.getAggregations(); + // This can happen if all the indices the datafeed is searching are deleted after it started. + // Note that unlike the scrolled data summary method above we cannot check for this situation + // by checking for zero hits, because aggregations that work on rollups return zero hits even + // when they retrieve data. + if (aggregations == null) { + return AggregatedDataSummary.noDataSummary(context.histogramInterval); + } Min min = aggregations.get(EARLIEST_TIME); Max max = aggregations.get(LATEST_TIME); return new AggregatedDataSummary(min.getValue(), max.getValue(), context.histogramInterval); @@ -309,13 +316,18 @@ public boolean hasData() { } } - private static class AggregatedDataSummary implements DataSummary { + static class AggregatedDataSummary implements DataSummary { private final double earliestTime; private final double latestTime; private final long histogramIntervalMillis; - private AggregatedDataSummary(double earliestTime, double latestTime, long histogramInterval) { + static AggregatedDataSummary noDataSummary(long histogramInterval) { + // hasData() uses infinity to mean no data + return new AggregatedDataSummary(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, histogramInterval); + } + + AggregatedDataSummary(double earliestTime, double latestTime, long histogramInterval) { this.earliestTime = earliestTime; this.latestTime = latestTime; this.histogramIntervalMillis = histogramInterval; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java index 1c37a8e2b5d40..537c62e48cf10 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java @@ -159,7 +159,8 @@ public void testExtractionGivenSpecifiedChunk() throws IOException { public void testExtractionGivenSpecifiedChunkAndAggs() throws IOException { chunkSpan = TimeValue.timeValueSeconds(1); TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L, true, 1000L); - extractor.setNextResponse(createSearchResponse(0L, 1000L, 2200L)); + // 0 hits with non-empty data is possible with rollups + extractor.setNextResponse(createSearchResponse(randomFrom(0L, 2L, 10000L), 1000L, 2200L)); InputStream inputStream1 = mock(InputStream.class); InputStream inputStream2 = mock(InputStream.class); @@ -200,7 +201,8 @@ public void testExtractionGivenAutoChunkAndAggs() throws IOException { chunkSpan = null; TestDataExtractor extractor = new TestDataExtractor(100_000L, 450_000L, true, 200L); - extractor.setNextResponse(createSearchResponse(0L, 100_000L, 400_000L)); + // 0 hits with non-empty data is possible with rollups + extractor.setNextResponse(createSearchResponse(randomFrom(0L, 2L, 10000L), 100_000L, 400_000L)); InputStream inputStream1 = mock(InputStream.class); InputStream inputStream2 = mock(InputStream.class); @@ -504,6 +506,11 @@ public void testDataSummaryRequestIsFailed() { expectThrows(SearchPhaseExecutionException.class, extractor::next); } + public void testNoDataSummaryHasNoData() { + ChunkedDataExtractor.DataSummary summary = ChunkedDataExtractor.AggregatedDataSummary.noDataSummary(randomNonNegativeLong()); + assertFalse(summary.hasData()); + } + private SearchResponse createSearchResponse(long totalHits, long earliestTime, long latestTime) { SearchResponse searchResponse = mock(SearchResponse.class); when(searchResponse.status()).thenReturn(RestStatus.OK);