Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Fixing duplicate audit messages in delayed data check #37080

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.security.user.SystemUser;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
Expand Down Expand Up @@ -189,24 +190,27 @@ private void checkForMissingDataIfNecessary() {
long totalRecordsMissing = missingDataBuckets.stream()
.mapToLong(BucketWithMissingData::getMissingDocumentCount)
.sum();
Date endTime = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket().getTimestamp();
Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(),
endTime,
totalRecordsMissing);
Bucket lastBucket = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket();
// Get the end of the last bucket and make it milliseconds
Date endTime = new Date((lastBucket.getEpoch() + lastBucket.getBucketSpan()) * 1000);

String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(lastBucket.getTimestamp().getTime()));

Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(), endTime, msg);

// Have we an annotation that covers the same area with the same message?
// Cannot use annotation.equals(other) as that checks createTime
if (lastDataCheckAnnotation != null
&& annotation.getAnnotation().equals(lastDataCheckAnnotation.getAnnotation())
&& annotation.getTimestamp().equals(lastDataCheckAnnotation.getTimestamp())
&& annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getTimestamp())) {
&& annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getEndTimestamp())) {
return;
}

// Creating a warning in addition to updating/creating our annotation. This allows the issue to be plainly visible
// in the job list page.
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime())));
auditor.warning(jobId, msg);

if (lastDataCheckAnnotationId != null) {
updateAnnotation(annotation);
Expand All @@ -217,17 +221,16 @@ private void checkForMissingDataIfNecessary() {
}
}

private Annotation createAnnotation(Date startTime, Date endTime, long recordsMissing) {
String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, recordsMissing,
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime()));
private Annotation createAnnotation(Date startTime, Date endTime, String msg) {
Date currentTime = new Date(currentTimeSupplier.get());
return new Annotation(msg,
new Date(currentTimeSupplier.get()),
currentTime,
SystemUser.NAME,
startTime,
endTime,
jobId,
null,
null,
currentTime,
SystemUser.NAME,
"annotation");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -226,6 +227,8 @@ public void testRealtimeRun() throws Exception {
flushJobResponse = new FlushJobAction.Response(true, new Date(2000));
Bucket bucket = mock(Bucket.class);
when(bucket.getTimestamp()).thenReturn(new Date(2000));
when(bucket.getEpoch()).thenReturn(2L);
when(bucket.getBucketSpan()).thenReturn(4L);
when(flushJobFuture.actionGet()).thenReturn(flushJobResponse);
when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture);
when(delayedDataDetector.detectMissingData(2000))
Expand Down Expand Up @@ -270,10 +273,10 @@ public void testRealtimeRun() throws Exception {
new Date(currentTime),
SystemUser.NAME,
bucket.getTimestamp(),
bucket.getTimestamp(),
new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000),
jobId,
null,
null,
new Date(currentTime),
SystemUser.NAME,
"annotation");

IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
Expand All @@ -286,8 +289,13 @@ public void testRealtimeRun() throws Exception {
assertThat(request.source(), equalTo(indexRequestArgumentCaptor.getValue().source()));

// Execute a fourth time, this time we return a new delayedDataDetector response to verify annotation gets updated
Bucket bucket2 = mock(Bucket.class);
when(bucket2.getTimestamp()).thenReturn(new Date(6000));
when(bucket2.getEpoch()).thenReturn(6L);
when(bucket2.getBucketSpan()).thenReturn(4L);
when(delayedDataDetector.detectMissingData(2000))
.thenReturn(Collections.singletonList(BucketWithMissingData.fromMissingAndBucket(15, bucket)));
.thenReturn(Arrays.asList(BucketWithMissingData.fromMissingAndBucket(10, bucket),
BucketWithMissingData.fromMissingAndBucket(5, bucket2)));
currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
inputStream = new ByteArrayInputStream(contentBytes);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
Expand All @@ -297,14 +305,15 @@ public void testRealtimeRun() throws Exception {

msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA,
15,
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(2000));
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(6000));
// What we expect the updated annotation to be indexed as
IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
indexRequest.id(annotationDocId);
Annotation updatedAnnotation = new Annotation(expectedAnnotation);
updatedAnnotation.setAnnotation(msg);
updatedAnnotation.setModifiedTime(new Date(currentTime));
updatedAnnotation.setModifiedUsername(SystemUser.NAME);
updatedAnnotation.setEndTimestamp(new Date((bucket2.getEpoch() + bucket2.getBucketSpan()) * 1000));
try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
indexRequest.source(xContentBuilder);
}
Expand All @@ -317,6 +326,17 @@ public void testRealtimeRun() throws Exception {
assertThat(indexRequest.source().utf8ToString(),
equalTo(updateRequestArgumentCaptor.getValue().source().utf8ToString()));
assertThat(updateRequestArgumentCaptor.getValue().opType(), equalTo(DocWriteRequest.OpType.INDEX));

// Execute a fifth time, no changes should occur as annotation is the same
currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
inputStream = new ByteArrayInputStream(contentBytes);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
datafeedJob.runRealtime();

// We should not get 3 index requests for the annotations
verify(client, atMost(2)).index(any());
}

public void testEmptyDataCountGivenlookback() throws Exception {
Expand Down