Skip to content

Commit

Permalink
addressing (#36891)(#36888)(#36889) (#37080)
Browse files Browse the repository at this point in the history
  • Loading branch information
benwtrent committed Jan 3, 2019
1 parent b307d99 commit a44e005
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.security.user.SystemUser;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
Expand Down Expand Up @@ -190,24 +191,27 @@ private void checkForMissingDataIfNecessary() {
long totalRecordsMissing = missingDataBuckets.stream()
.mapToLong(BucketWithMissingData::getMissingDocumentCount)
.sum();
Date endTime = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket().getTimestamp();
Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(),
endTime,
totalRecordsMissing);
Bucket lastBucket = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket();
// Get the end of the last bucket and make it milliseconds
Date endTime = new Date((lastBucket.getEpoch() + lastBucket.getBucketSpan()) * 1000);

String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(lastBucket.getTimestamp().getTime()));

Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(), endTime, msg);

// Have we an annotation that covers the same area with the same message?
// Cannot use annotation.equals(other) as that checks createTime
if (lastDataCheckAnnotation != null
&& annotation.getAnnotation().equals(lastDataCheckAnnotation.getAnnotation())
&& annotation.getTimestamp().equals(lastDataCheckAnnotation.getTimestamp())
&& annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getTimestamp())) {
&& annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getEndTimestamp())) {
return;
}

// Creating a warning in addition to updating/creating our annotation. This allows the issue to be plainly visible
// in the job list page.
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime())));
auditor.warning(jobId, msg);

if (lastDataCheckAnnotationId != null) {
updateAnnotation(annotation);
Expand All @@ -218,17 +222,16 @@ private void checkForMissingDataIfNecessary() {
}
}

private Annotation createAnnotation(Date startTime, Date endTime, long recordsMissing) {
String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, recordsMissing,
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(endTime.getTime()));
private Annotation createAnnotation(Date startTime, Date endTime, String msg) {
Date currentTime = new Date(currentTimeSupplier.get());
return new Annotation(msg,
new Date(currentTimeSupplier.get()),
currentTime,
SystemUser.NAME,
startTime,
endTime,
jobId,
null,
null,
currentTime,
SystemUser.NAME,
"annotation");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -228,6 +229,8 @@ public void testRealtimeRun() throws Exception {
flushJobResponse = new FlushJobAction.Response(true, new Date(2000));
Bucket bucket = mock(Bucket.class);
when(bucket.getTimestamp()).thenReturn(new Date(2000));
when(bucket.getEpoch()).thenReturn(2L);
when(bucket.getBucketSpan()).thenReturn(4L);
when(flushJobFuture.actionGet()).thenReturn(flushJobResponse);
when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture);
when(delayedDataDetector.detectMissingData(2000))
Expand Down Expand Up @@ -272,10 +275,10 @@ public void testRealtimeRun() throws Exception {
new Date(currentTime),
SystemUser.NAME,
bucket.getTimestamp(),
bucket.getTimestamp(),
new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000),
jobId,
null,
null,
new Date(currentTime),
SystemUser.NAME,
"annotation");

IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME, ElasticsearchMappings.DOC_TYPE);
Expand All @@ -288,8 +291,13 @@ public void testRealtimeRun() throws Exception {
assertThat(request.source(), equalTo(indexRequestArgumentCaptor.getValue().source()));

// Execute a fourth time, this time we return a new delayedDataDetector response to verify annotation gets updated
Bucket bucket2 = mock(Bucket.class);
when(bucket2.getTimestamp()).thenReturn(new Date(6000));
when(bucket2.getEpoch()).thenReturn(6L);
when(bucket2.getBucketSpan()).thenReturn(4L);
when(delayedDataDetector.detectMissingData(2000))
.thenReturn(Collections.singletonList(BucketWithMissingData.fromMissingAndBucket(15, bucket)));
.thenReturn(Arrays.asList(BucketWithMissingData.fromMissingAndBucket(10, bucket),
BucketWithMissingData.fromMissingAndBucket(5, bucket2)));
currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
inputStream = new ByteArrayInputStream(contentBytes);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
Expand All @@ -299,14 +307,15 @@ public void testRealtimeRun() throws Exception {

msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA,
15,
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(2000));
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(6000));
// What we expect the updated annotation to be indexed as
IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME, ElasticsearchMappings.DOC_TYPE);
indexRequest.id(annotationDocId);
Annotation updatedAnnotation = new Annotation(expectedAnnotation);
updatedAnnotation.setAnnotation(msg);
updatedAnnotation.setModifiedTime(new Date(currentTime));
updatedAnnotation.setModifiedUsername(SystemUser.NAME);
updatedAnnotation.setEndTimestamp(new Date((bucket2.getEpoch() + bucket2.getBucketSpan()) * 1000));
try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
indexRequest.source(xContentBuilder);
}
Expand All @@ -319,6 +328,17 @@ public void testRealtimeRun() throws Exception {
assertThat(indexRequest.source().utf8ToString(),
equalTo(updateRequestArgumentCaptor.getValue().source().utf8ToString()));
assertThat(updateRequestArgumentCaptor.getValue().opType(), equalTo(DocWriteRequest.OpType.INDEX));

// Execute a fifth time, no changes should occur as annotation is the same
currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
inputStream = new ByteArrayInputStream(contentBytes);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
datafeedJob.runRealtime();

// We should not get 3 index requests for the annotations
verify(client, atMost(2)).index(any());
}

public void testEmptyDataCountGivenlookback() throws Exception {
Expand Down

0 comments on commit a44e005

Please sign in to comment.