diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 91ce79b4c7583..64e8512baa5b0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -31,6 +31,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.security.user.SystemUser; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData; @@ -189,24 +190,27 @@ private void checkForMissingDataIfNecessary() { long totalRecordsMissing = missingDataBuckets.stream() .mapToLong(BucketWithMissingData::getMissingDocumentCount) .sum(); - Date endTime = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket().getTimestamp(); - Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(), - endTime, - totalRecordsMissing); + Bucket lastBucket = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket(); + // Get the end of the last bucket and make it milliseconds + Date endTime = new Date((lastBucket.getEpoch() + lastBucket.getBucketSpan()) * 1000); + + String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing, + XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(lastBucket.getTimestamp().getTime())); + + Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(), endTime, msg); // Have we an annotation that covers the same area with the same message? // Cannot use annotation.equals(other) as that checks createTime if (lastDataCheckAnnotation != null && annotation.getAnnotation().equals(lastDataCheckAnnotation.getAnnotation()) && annotation.getTimestamp().equals(lastDataCheckAnnotation.getTimestamp()) - && annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getTimestamp())) { + && annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getEndTimestamp())) { return; } // Creating a warning in addition to updating/creating our annotation. This allows the issue to be plainly visible // in the job list page. - auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing, - XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime()))); + auditor.warning(jobId, msg); if (lastDataCheckAnnotationId != null) { updateAnnotation(annotation); @@ -217,17 +221,16 @@ private void checkForMissingDataIfNecessary() { } } - private Annotation createAnnotation(Date startTime, Date endTime, long recordsMissing) { - String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, recordsMissing, - XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime())); + private Annotation createAnnotation(Date startTime, Date endTime, String msg) { + Date currentTime = new Date(currentTimeSupplier.get()); return new Annotation(msg, - new Date(currentTimeSupplier.get()), + currentTime, SystemUser.NAME, startTime, endTime, jobId, - null, - null, + currentTime, + SystemUser.NAME, "annotation"); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index a1887f3c41814..534681ff3c86a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -45,6 +45,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; @@ -226,6 +227,8 @@ public void testRealtimeRun() throws Exception { flushJobResponse = new FlushJobAction.Response(true, new Date(2000)); Bucket bucket = mock(Bucket.class); when(bucket.getTimestamp()).thenReturn(new Date(2000)); + when(bucket.getEpoch()).thenReturn(2L); + when(bucket.getBucketSpan()).thenReturn(4L); when(flushJobFuture.actionGet()).thenReturn(flushJobResponse); when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture); when(delayedDataDetector.detectMissingData(2000)) @@ -270,10 +273,10 @@ public void testRealtimeRun() throws Exception { new Date(currentTime), SystemUser.NAME, bucket.getTimestamp(), - bucket.getTimestamp(), + new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000), jobId, - null, - null, + new Date(currentTime), + SystemUser.NAME, "annotation"); IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME); @@ -286,8 +289,13 @@ public void testRealtimeRun() throws Exception { assertThat(request.source(), equalTo(indexRequestArgumentCaptor.getValue().source())); // Execute a fourth time, this time we return a new delayedDataDetector response to verify annotation gets updated + Bucket bucket2 = mock(Bucket.class); + when(bucket2.getTimestamp()).thenReturn(new Date(6000)); + when(bucket2.getEpoch()).thenReturn(6L); + when(bucket2.getBucketSpan()).thenReturn(4L); when(delayedDataDetector.detectMissingData(2000)) - .thenReturn(Collections.singletonList(BucketWithMissingData.fromMissingAndBucket(15, bucket))); + .thenReturn(Arrays.asList(BucketWithMissingData.fromMissingAndBucket(10, bucket), + BucketWithMissingData.fromMissingAndBucket(5, bucket2))); currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1; inputStream = new ByteArrayInputStream(contentBytes); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); @@ -297,7 +305,7 @@ public void testRealtimeRun() throws Exception { msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, 15, - XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(2000)); + XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(6000)); // What we expect the updated annotation to be indexed as IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME); indexRequest.id(annotationDocId); @@ -305,6 +313,7 @@ public void testRealtimeRun() throws Exception { updatedAnnotation.setAnnotation(msg); updatedAnnotation.setModifiedTime(new Date(currentTime)); updatedAnnotation.setModifiedUsername(SystemUser.NAME); + updatedAnnotation.setEndTimestamp(new Date((bucket2.getEpoch() + bucket2.getBucketSpan()) * 1000)); try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { indexRequest.source(xContentBuilder); } @@ -317,6 +326,17 @@ public void testRealtimeRun() throws Exception { assertThat(indexRequest.source().utf8ToString(), equalTo(updateRequestArgumentCaptor.getValue().source().utf8ToString())); assertThat(updateRequestArgumentCaptor.getValue().opType(), equalTo(DocWriteRequest.OpType.INDEX)); + + // Execute a fifth time, no changes should occur as annotation is the same + currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1; + inputStream = new ByteArrayInputStream(contentBytes); + when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); + when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); + when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor); + datafeedJob.runRealtime(); + + // We should not get 3 index requests for the annotations + verify(client, atMost(2)).index(any()); } public void testEmptyDataCountGivenlookback() throws Exception {