diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/Annotation.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/Annotation.java index 91c4053ed1568..fe7acb729eb14 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/Annotation.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/Annotation.java @@ -12,8 +12,8 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.common.time.TimeUtils; +import org.elasticsearch.xpack.core.ml.job.config.Job; import java.io.IOException; import java.util.Date; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java index 05c083836b43b..fc788dbda5f67 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java @@ -31,17 +31,14 @@ public class AnnotationIndex { public static final String WRITE_ALIAS_NAME = ".ml-annotations-write"; // Exposed for testing, but always use the aliases in non-test code public static final String INDEX_NAME = ".ml-annotations-6"; - public static final String INDEX_PATTERN = ".ml-annotations*"; private static final String MAPPINGS_VERSION_VARIABLE = "xpack.ml.version"; /** - * Create the .ml-annotations index with correct mappings if it does not already - * exist. This index is read and written by the UI results views, so needs to - * exist when there might be ML results to view. + * Create the .ml-annotations-6 index with correct mappings if it does not already exist. This index is read and written by the UI + * results views, so needs to exist when there might be ML results to view. */ - public static void createAnnotationsIndexIfNecessary(Settings settings, Client client, ClusterState state, - final ActionListener finalListener) { + public static void createAnnotationsIndexIfNecessary(Client client, ClusterState state, final ActionListener finalListener) { final ActionListener createAliasListener = ActionListener.wrap(success -> { final IndicesAliasesRequest request = @@ -97,8 +94,8 @@ public static void createAnnotationsIndexIfNecessary(Settings settings, Client c finalListener.onResponse(false); } - public static String annotationsMapping() { - return TemplateUtils.loadTemplate("/org/elasticsearch/xpack/core/ml/annotations_index_mappings.json", - Version.CURRENT.toString(), MAPPINGS_VERSION_VARIABLE); + private static String annotationsMapping() { + return TemplateUtils.loadTemplate( + "/org/elasticsearch/xpack/core/ml/annotations_index_mappings.json", Version.CURRENT.toString(), MAPPINGS_VERSION_VARIABLE); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersister.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersister.java new file mode 100644 index 0000000000000..0aca3ba1f03de --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersister.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.annotations; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; + +/** + * Persists annotations to Elasticsearch index. + */ +public class AnnotationPersister { + + private static final Logger logger = LogManager.getLogger(AnnotationPersister.class); + + private final Client client; + private final AbstractAuditor auditor; + + public AnnotationPersister(Client client, AbstractAuditor auditor) { + this.client = Objects.requireNonNull(client); + this.auditor = Objects.requireNonNull(auditor); + } + + /** + * Persists the given annotation to annotations index. + * + * @param annotationId existing annotation id. If {@code null}, a new annotation will be created and id will be assigned automatically + * @param annotation annotation to be persisted + * @param errorMessage error message to report when annotation fails to be persisted + * @return tuple of the form (annotation id, annotation object) + */ + public Tuple persistAnnotation(@Nullable String annotationId, Annotation annotation, String errorMessage) { + Objects.requireNonNull(annotation); + try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { + IndexRequest indexRequest = + new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME) + .id(annotationId) + .source(xContentBuilder); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { + IndexResponse response = client.index(indexRequest).actionGet(); + return Tuple.tuple(response.getId(), annotation); + } + } catch (IOException ex) { + String jobId = annotation.getJobId(); + logger.error(errorMessage, ex); + auditor.error(jobId, errorMessage); + return null; + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 413d241dda693..ecafdfd7e19b0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -130,6 +130,7 @@ public final class Messages { public static final String JOB_AUDIT_DELETED = "Job deleted"; public static final String JOB_AUDIT_KILLING = "Killing job"; public static final String JOB_AUDIT_OLD_RESULTS_DELETED = "Deleted results prior to {0}"; + public static final String JOB_AUDIT_SNAPSHOT_STORED = "Job model snapshot with id [{0}] stored"; public static final String JOB_AUDIT_REVERTED = "Job model snapshot reverted to ''{0}''"; public static final String JOB_AUDIT_SNAPSHOT_DELETED = "Model snapshot [{0}] with description ''{1}'' deleted"; public static final String JOB_AUDIT_FILTER_UPDATED_ON_PROCESS = "Updated filter [{0}] in running process"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java index 268da28b1d07e..035579b2243b0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelSnapshot.java @@ -299,6 +299,10 @@ public static String documentIdPrefix(String jobId) { return jobId + "_" + TYPE + "_"; } + public static String annotationDocumentId(ModelSnapshot snapshot) { + return "annotation_for_" + documentId(snapshot); + } + public static String documentId(ModelSnapshot snapshot) { return documentId(snapshot.getJobId(), snapshot.getSnapshotId()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersisterTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersisterTests.java new file mode 100644 index 0000000000000..051036cfeb0b6 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersisterTests.java @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.annotations; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; + +import java.io.IOException; + +import static org.elasticsearch.common.collect.Tuple.tuple; +import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class AnnotationPersisterTests extends ESTestCase { + + private static final String ANNOTATION_ID = "existing_annotation_id"; + private static final String ERROR_MESSAGE = "an error occurred while persisting annotation"; + + private Client client; + private AbstractAuditor auditor; + private IndexResponse indexResponse; + + private ArgumentCaptor indexRequestCaptor; + + @Before + public void setUpMocks() { + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(threadContext); + client = mock(Client.class); + when(client.threadPool()).thenReturn(threadPool); + + auditor = mock(AbstractAuditor.class); + + indexResponse = mock(IndexResponse.class); + when(indexResponse.getId()).thenReturn(ANNOTATION_ID); + + indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class); + } + + public void testPersistAnnotation_Create() throws IOException { + doReturn(instantFuture(indexResponse)).when(client).index(any()); + + AnnotationPersister persister = new AnnotationPersister(client, auditor); + Annotation annotation = AnnotationTests.randomAnnotation(); + Tuple result = persister.persistAnnotation(null, annotation, ERROR_MESSAGE); + assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation)))); + + InOrder inOrder = inOrder(client); + inOrder.verify(client).threadPool(); + inOrder.verify(client).index(indexRequestCaptor.capture()); + verifyNoMoreInteractions(client, auditor); + + IndexRequest indexRequest = indexRequestCaptor.getValue(); + + assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME))); + assertThat(indexRequest.id(), is(nullValue())); + assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation))); + assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX)); + } + + public void testPersistAnnotation_Update() throws IOException { + doReturn(instantFuture(indexResponse)).when(client).index(any()); + + AnnotationPersister persister = new AnnotationPersister(client, auditor); + Annotation annotation = AnnotationTests.randomAnnotation(); + Tuple result = persister.persistAnnotation(ANNOTATION_ID, annotation, ERROR_MESSAGE); + assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation)))); + + InOrder inOrder = inOrder(client); + inOrder.verify(client).threadPool(); + inOrder.verify(client).index(indexRequestCaptor.capture()); + verifyNoMoreInteractions(client, auditor); + + IndexRequest indexRequest = indexRequestCaptor.getValue(); + assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME))); + assertThat(indexRequest.id(), is(equalTo(ANNOTATION_ID))); + assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation))); + assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX)); + } + + @SuppressWarnings("unchecked") + private static ActionFuture instantFuture(T response) { + ActionFuture future = mock(ActionFuture.class); + when(future.actionGet()).thenReturn(response); + return future; + } + + private Annotation parseAnnotation(BytesReference source) throws IOException { + try (XContentParser parser = createParser(jsonXContent, source)) { + return Annotation.PARSER.parse(parser, null); + } + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationTests.java index 0ce9f323d3edd..4b1898ed23a66 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationTests.java @@ -22,6 +22,10 @@ protected Annotation doParseInstance(XContentParser parser) { @Override protected Annotation createTestInstance() { + return randomAnnotation(); + } + + static Annotation randomAnnotation() { return new Annotation(randomAlphaOfLengthBetween(100, 1000), new Date(randomNonNegativeLong()), randomAlphaOfLengthBetween(5, 20), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index f2912f454e494..bb7e290b0f775 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -132,6 +132,7 @@ import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction; import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider; @@ -534,6 +535,7 @@ public Collection createComponents(Client client, ClusterService cluster new MlIndexTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry); AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); + AnnotationPersister anomalyDetectionAnnotationPersister = new AnnotationPersister(client, anomalyDetectionAuditor); DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName()); InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService.getNodeName()); this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor); @@ -613,13 +615,15 @@ public Collection createComponents(Client client, ClusterService cluster threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(environment, settings, client, threadPool, xContentRegistry, anomalyDetectionAuditor, clusterService, jobManager, jobResultsProvider, jobResultsPersister, - jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, nativeStorageProvider, indexNameExpressionResolver); + jobDataCountsPersister, anomalyDetectionAnnotationPersister, autodetectProcessFactory, normalizerFactory, + nativeStorageProvider, indexNameExpressionResolver); this.autodetectProcessManager.set(autodetectProcessManager); DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder( client, xContentRegistry, anomalyDetectionAuditor, + anomalyDetectionAnnotationPersister, System::currentTimeMillis, jobConfigProvider, jobResultsProvider, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index ac8d9f63849e9..67b4e1e36b7e1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -66,7 +66,7 @@ public void clusterChanged(ClusterChangedEvent event) { // The atomic flag prevents multiple simultaneous attempts to create the // index if there is a flurry of cluster state updates in quick succession if (event.localNodeMaster() && isIndexCreationInProgress.compareAndSet(false, true)) { - AnnotationIndex.createAnnotationsIndexIfNecessary(settings, client, event.state(), ActionListener.wrap( + AnnotationIndex.createAnnotationsIndexIfNecessary(client, event.state(), ActionListener.wrap( r -> { isIndexCreationInProgress.set(false); if (r) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 22ebff57a4ba8..3bcfc76ed1ed3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -8,16 +8,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.rest.RestStatus; @@ -25,7 +21,7 @@ import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.annotations.Annotation; -import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.messages.Messages; @@ -55,6 +51,7 @@ class DatafeedJob { static final long MISSING_DATA_CHECK_INTERVAL_MS = 900_000; //15 minutes in ms private final AnomalyDetectionAuditor auditor; + private final AnnotationPersister annotationPersister; private final String jobId; private final DataDescription dataDescription; private final long frequencyMs; @@ -69,8 +66,7 @@ class DatafeedJob { private volatile long lookbackStartTimeMs; private volatile long latestFinalBucketEndTimeMs; private volatile long lastDataCheckTimeMs; - private volatile String lastDataCheckAnnotationId; - private volatile Annotation lastDataCheckAnnotation; + private volatile Tuple lastDataCheckAnnotationWithId; private volatile Long lastEndTimeMs; private AtomicBoolean running = new AtomicBoolean(true); private volatile boolean isIsolated; @@ -78,8 +74,9 @@ class DatafeedJob { DatafeedJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs, DataExtractorFactory dataExtractorFactory, DatafeedTimingStatsReporter timingStatsReporter, Client client, - AnomalyDetectionAuditor auditor, Supplier currentTimeSupplier, DelayedDataDetector delayedDataDetector, - Integer maxEmptySearches, long latestFinalBucketEndTimeMs, long latestRecordTimeMs, boolean haveSeenDataPreviously) { + AnomalyDetectionAuditor auditor, AnnotationPersister annotationPersister, Supplier currentTimeSupplier, + DelayedDataDetector delayedDataDetector, Integer maxEmptySearches, long latestFinalBucketEndTimeMs, long latestRecordTimeMs, + boolean haveSeenDataPreviously) { this.jobId = jobId; this.dataDescription = Objects.requireNonNull(dataDescription); this.frequencyMs = frequencyMs; @@ -88,6 +85,7 @@ class DatafeedJob { this.timingStatsReporter = timingStatsReporter; this.client = client; this.auditor = auditor; + this.annotationPersister = annotationPersister; this.currentTimeSupplier = currentTimeSupplier; this.delayedDataDetector = delayedDataDetector; this.maxEmptySearches = maxEmptySearches; @@ -209,14 +207,14 @@ private void checkForMissingDataIfNecessary() { String msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing, XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(lastBucket.getTimestamp().getTime())); - Annotation annotation = createAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(), endTime, msg); + Annotation annotation = createDelayedDataAnnotation(missingDataBuckets.get(0).getBucket().getTimestamp(), endTime, msg); // Have we an annotation that covers the same area with the same message? // Cannot use annotation.equals(other) as that checks createTime - if (lastDataCheckAnnotation != null - && annotation.getAnnotation().equals(lastDataCheckAnnotation.getAnnotation()) - && annotation.getTimestamp().equals(lastDataCheckAnnotation.getTimestamp()) - && annotation.getEndTimestamp().equals(lastDataCheckAnnotation.getEndTimestamp())) { + if (lastDataCheckAnnotationWithId != null + && annotation.getAnnotation().equals(lastDataCheckAnnotationWithId.v2().getAnnotation()) + && annotation.getTimestamp().equals(lastDataCheckAnnotationWithId.v2().getTimestamp()) + && annotation.getEndTimestamp().equals(lastDataCheckAnnotationWithId.v2().getEndTimestamp())) { return; } @@ -224,18 +222,29 @@ private void checkForMissingDataIfNecessary() { // in the job list page. auditor.warning(jobId, msg); - if (lastDataCheckAnnotationId != null) { - updateAnnotation(annotation); + if (lastDataCheckAnnotationWithId == null) { + lastDataCheckAnnotationWithId = + annotationPersister.persistAnnotation( + null, + annotation, + "[" + jobId + "] failed to create annotation for delayed data checker."); } else { - lastDataCheckAnnotationId = addAndSetDelayedDataAnnotation(annotation); + String annotationId = lastDataCheckAnnotationWithId.v1(); + Annotation updatedAnnotation = updateAnnotation(annotation); + lastDataCheckAnnotationWithId = + annotationPersister.persistAnnotation( + annotationId, + updatedAnnotation, + "[" + jobId + "] failed to update annotation for delayed data checker."); } } } } - private Annotation createAnnotation(Date startTime, Date endTime, String msg) { + private Annotation createDelayedDataAnnotation(Date startTime, Date endTime, String msg) { Date currentTime = new Date(currentTimeSupplier.get()); - return new Annotation(msg, + return new Annotation( + msg, currentTime, XPackUser.NAME, startTime, @@ -246,43 +255,14 @@ private Annotation createAnnotation(Date startTime, Date endTime, String msg) { "annotation"); } - private String addAndSetDelayedDataAnnotation(Annotation annotation) { - try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { - IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME); - request.source(xContentBuilder); - try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { - IndexResponse response = client.index(request).actionGet(); - lastDataCheckAnnotation = annotation; - return response.getId(); - } - } catch (IOException ex) { - String errorMessage = "[" + jobId + "] failed to create annotation for delayed data checker."; - LOGGER.error(errorMessage, ex); - auditor.error(jobId, errorMessage); - return null; - } - } - - private void updateAnnotation(Annotation annotation) { - Annotation updatedAnnotation = new Annotation(lastDataCheckAnnotation); + private Annotation updateAnnotation(Annotation annotation) { + Annotation updatedAnnotation = new Annotation(lastDataCheckAnnotationWithId.v2()); updatedAnnotation.setModifiedUsername(XPackUser.NAME); updatedAnnotation.setModifiedTime(new Date(currentTimeSupplier.get())); updatedAnnotation.setAnnotation(annotation.getAnnotation()); updatedAnnotation.setTimestamp(annotation.getTimestamp()); updatedAnnotation.setEndTimestamp(annotation.getEndTimestamp()); - try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { - IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME); - indexRequest.id(lastDataCheckAnnotationId); - indexRequest.source(xContentBuilder); - try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { - client.index(indexRequest).actionGet(); - lastDataCheckAnnotation = updatedAnnotation; - } - } catch (IOException ex) { - String errorMessage = "[" + jobId + "] failed to update annotation for delayed data checker."; - LOGGER.error(errorMessage, ex); - auditor.error(jobId, errorMessage); - } + return updatedAnnotation; } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index 499da77ecbb95..9197633a37c34 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -14,6 +14,7 @@ import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.node.Node; import org.elasticsearch.xpack.core.action.util.QueryPage; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; @@ -46,6 +47,7 @@ public class DatafeedJobBuilder { private final Client client; private final NamedXContentRegistry xContentRegistry; private final AnomalyDetectionAuditor auditor; + private final AnnotationPersister annotationPersister; private final Supplier currentTimeSupplier; private final JobConfigProvider jobConfigProvider; private final JobResultsProvider jobResultsProvider; @@ -55,12 +57,14 @@ public class DatafeedJobBuilder { private final String nodeName; public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor, - Supplier currentTimeSupplier, JobConfigProvider jobConfigProvider, - JobResultsProvider jobResultsProvider, DatafeedConfigProvider datafeedConfigProvider, - JobResultsPersister jobResultsPersister, Settings settings, String nodeName) { + AnnotationPersister annotationPersister, Supplier currentTimeSupplier, + JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider, + DatafeedConfigProvider datafeedConfigProvider, JobResultsPersister jobResultsPersister, Settings settings, + String nodeName) { this.client = client; this.xContentRegistry = Objects.requireNonNull(xContentRegistry); this.auditor = Objects.requireNonNull(auditor); + this.annotationPersister = Objects.requireNonNull(annotationPersister); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); this.jobConfigProvider = Objects.requireNonNull(jobConfigProvider); this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); @@ -90,6 +94,7 @@ void build(String datafeedId, ActionListener listener) { context.timingStatsReporter, client, auditor, + annotationPersister, currentTimeSupplier, delayedDataDetector, datafeedConfigHolder.get().getMaxEmptySearches(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index b6388ffd07356..17a6f04834091 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -21,6 +21,7 @@ import org.elasticsearch.index.reindex.BulkByScrollTask; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -69,9 +70,11 @@ public void deleteModelSnapshots(List modelSnapshots, ActionListe List idsToDelete = new ArrayList<>(); Set indices = new HashSet<>(); indices.add(stateIndexName); + indices.add(AnnotationIndex.READ_ALIAS_NAME); for (ModelSnapshot modelSnapshot : modelSnapshots) { idsToDelete.addAll(modelSnapshot.stateDocumentIds()); idsToDelete.add(ModelSnapshot.documentId(modelSnapshot)); + idsToDelete.add(ModelSnapshot.annotationDocumentId(modelSnapshot)); indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId())); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 205a4292194a7..60038a9641e85 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetFiltersAction; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -103,6 +104,7 @@ public class AutodetectProcessManager implements ClusterStateListener { private final JobResultsPersister jobResultsPersister; private final JobDataCountsPersister jobDataCountsPersister; + private final AnnotationPersister annotationPersister; private NativeStorageProvider nativeStorageProvider; private final ConcurrentMap processByAllocation = new ConcurrentHashMap<>(); @@ -118,9 +120,9 @@ public class AutodetectProcessManager implements ClusterStateListener { public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor, ClusterService clusterService, JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister, - JobDataCountsPersister jobDataCountsPersister, AutodetectProcessFactory autodetectProcessFactory, - NormalizerFactory normalizerFactory, NativeStorageProvider nativeStorageProvider, - IndexNameExpressionResolver expressionResolver) { + JobDataCountsPersister jobDataCountsPersister, AnnotationPersister annotationPersister, + AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, + NativeStorageProvider nativeStorageProvider, IndexNameExpressionResolver expressionResolver) { this.environment = environment; this.client = client; this.threadPool = threadPool; @@ -133,6 +135,7 @@ public AutodetectProcessManager(Environment environment, Settings settings, Clie this.jobResultsProvider = jobResultsProvider; this.jobResultsPersister = jobResultsPersister; this.jobDataCountsPersister = jobDataCountsPersister; + this.annotationPersister = annotationPersister; this.auditor = auditor; this.nativeStorageProvider = Objects.requireNonNull(nativeStorageProvider); clusterService.addListener(this); @@ -511,6 +514,7 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet jobId, renormalizer, jobResultsPersister, + annotationPersister, process, autodetectParams.modelSizeStats(), autodetectParams.timingStats()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index 4c6de562156e9..49a687b264bd2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -20,6 +20,8 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.annotations.Annotation; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; @@ -34,6 +36,7 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; +import org.elasticsearch.xpack.core.security.user.XPackUser; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.TimingStatsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; @@ -41,7 +44,9 @@ import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import java.time.Clock; import java.time.Duration; +import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -79,8 +84,10 @@ public class AutodetectResultProcessor { private final String jobId; private final Renormalizer renormalizer; private final JobResultsPersister persister; + private final AnnotationPersister annotationPersister; private final AutodetectProcess process; private final TimingStatsReporter timingStatsReporter; + private final Clock clock; final CountDownLatch completionLatch = new CountDownLatch(1); final Semaphore updateModelSnapshotSemaphore = new Semaphore(1); @@ -102,26 +109,30 @@ public AutodetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister, + AnnotationPersister annotationPersister, AutodetectProcess process, ModelSizeStats latestModelSizeStats, TimingStats timingStats) { - this(client, auditor, jobId, renormalizer, persister, process, latestModelSizeStats, timingStats, new FlushListener()); + this(client, auditor, jobId, renormalizer, persister, annotationPersister, process, latestModelSizeStats, timingStats, + Clock.systemUTC(), new FlushListener()); } // Visible for testing AutodetectResultProcessor(Client client, AnomalyDetectionAuditor auditor, String jobId, Renormalizer renormalizer, - JobResultsPersister persister, AutodetectProcess autodetectProcess, ModelSizeStats latestModelSizeStats, - TimingStats timingStats, FlushListener flushListener) { + JobResultsPersister persister, AnnotationPersister annotationPersister, AutodetectProcess autodetectProcess, + ModelSizeStats latestModelSizeStats, TimingStats timingStats, Clock clock, FlushListener flushListener) { this.client = Objects.requireNonNull(client); this.auditor = Objects.requireNonNull(auditor); this.jobId = Objects.requireNonNull(jobId); this.renormalizer = Objects.requireNonNull(renormalizer); this.persister = Objects.requireNonNull(persister); + this.annotationPersister = Objects.requireNonNull(annotationPersister); this.process = Objects.requireNonNull(autodetectProcess); this.flushListener = Objects.requireNonNull(flushListener); this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats); this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId, this::isAlive); this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister); + this.clock = Objects.requireNonNull(clock); this.deleteInterimRequired = true; this.priorRunsBucketCount = timingStats.getBucketCount(); } @@ -268,6 +279,10 @@ void processResult(AutodetectResult result) { if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { updateModelSnapshotOnJob(modelSnapshot); } + annotationPersister.persistAnnotation( + ModelSnapshot.annotationDocumentId(modelSnapshot), + createModelSnapshotAnnotation(modelSnapshot), + "[" + jobId + "] failed to create annotation for model snapshot."); } Quantiles quantiles = result.getQuantiles(); if (quantiles != null) { @@ -310,6 +325,21 @@ void processResult(AutodetectResult result) { } } + private Annotation createModelSnapshotAnnotation(ModelSnapshot modelSnapshot) { + assert modelSnapshot != null; + Date currentTime = new Date(clock.millis()); + return new Annotation( + Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_STORED, modelSnapshot.getSnapshotId()), + currentTime, + XPackUser.NAME, + modelSnapshot.getTimestamp(), + modelSnapshot.getTimestamp(), + jobId, + currentTime, + XPackUser.NAME, + "annotation"); + } + private void processModelSizeStats(ModelSizeStats modelSizeStats) { LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}", jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(), diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java index 9646fda946aac..48912da073c2e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.action.util.QueryPage; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -47,6 +48,7 @@ public class DatafeedJobBuilderTests extends ESTestCase { private Client client; private AnomalyDetectionAuditor auditor; + private AnnotationPersister annotationPersister; private Consumer taskHandler; private JobResultsProvider jobResultsProvider; private JobConfigProvider jobConfigProvider; @@ -64,6 +66,7 @@ public void init() { when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); when(client.settings()).thenReturn(Settings.EMPTY); auditor = mock(AnomalyDetectionAuditor.class); + annotationPersister = mock(AnnotationPersister.class); taskHandler = mock(Consumer.class); jobResultsPersister = mock(JobResultsPersister.class); @@ -90,6 +93,7 @@ public void init() { client, xContentRegistry(), auditor, + annotationPersister, System::currentTimeMillis, jobConfigProvider, jobResultsProvider, @@ -213,6 +217,7 @@ public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception { client, xContentRegistry(), auditor, + annotationPersister, System::currentTimeMillis, jobConfigProvider, jobResultsProvider, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index b28480354d9b6..5e7046eee0488 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.annotations.Annotation; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.results.Bucket; @@ -458,7 +459,7 @@ private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long long latestRecordTimeMs, boolean haveSeenDataPreviously) { Supplier currentTimeSupplier = () -> currentTime; return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, timingStatsReporter, - client, auditor, currentTimeSupplier, delayedDataDetector, null, latestFinalBucketEndTimeMs, latestRecordTimeMs, - haveSeenDataPreviously); + client, auditor, new AnnotationPersister(client, auditor), currentTimeSupplier, delayedDataDetector, null, + latestFinalBucketEndTimeMs, latestRecordTimeMs, haveSeenDataPreviously); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 23fe7704a5fd3..a9c73745817ab 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -5,6 +5,10 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -13,18 +17,25 @@ import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; +import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; -import org.elasticsearch.xpack.core.action.util.QueryPage; +import org.elasticsearch.xpack.core.ml.annotations.Annotation; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; @@ -61,6 +72,8 @@ import org.junit.After; import org.junit.Before; +import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -75,8 +88,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent; import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -129,6 +147,7 @@ public void createComponents() throws Exception { JOB_ID, renormalizer, new JobResultsPersister(originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node")), + new AnnotationPersister(originSettingClient, auditor), process, new ModelSizeStats.Builder(JOB_ID).build(), new TimingStats(JOB_ID)) { @@ -142,10 +161,12 @@ protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { } @After - public void deleteJob() { + public void deleteJob() throws Exception { DeleteJobAction.Request request = new DeleteJobAction.Request(JOB_ID); AcknowledgedResponse response = client().execute(DeleteJobAction.INSTANCE, request).actionGet(); assertTrue(response.isAcknowledged()); + // Verify that deleting job also deletes associated model snapshots annotations + assertThat(getAnnotations(), empty()); } public void testProcessResults() throws Exception { @@ -201,11 +222,44 @@ public void testProcessResults() throws Exception { assertEquals(modelSnapshot, persistedModelSnapshot.results().get(0)); assertEquals(Collections.singletonList(modelSnapshot), capturedUpdateModelSnapshotOnJobRequests); + // Verify that creating model snapshot also creates associated annotation + List annotations = getAnnotations(); + assertThat(annotations, hasSize(1)); + assertThat( + annotations.get(0).getAnnotation(), + is(equalTo( + new ParameterizedMessage("Job model snapshot with id [{}] stored", modelSnapshot.getSnapshotId()).getFormattedMessage()))); + Optional persistedQuantiles = getQuantiles(); assertTrue(persistedQuantiles.isPresent()); assertEquals(quantiles, persistedQuantiles.get()); } + public void testProcessResults_ModelSnapshot() throws Exception { + ModelSnapshot modelSnapshot = createModelSnapshot(); + ResultsBuilder resultsBuilder = new ResultsBuilder().addModelSnapshot(modelSnapshot); + when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator()); + + resultProcessor.process(); + resultProcessor.awaitCompletion(); + + QueryPage persistedModelSnapshot = getModelSnapshots(); + assertThat(persistedModelSnapshot.count(), is(equalTo(1L))); + assertThat(persistedModelSnapshot.results(), contains(modelSnapshot)); + + // Verify that creating model snapshot also creates associated annotation + List annotations = getAnnotations(); + assertThat(annotations, hasSize(1)); + assertThat( + annotations.get(0).getAnnotation(), + is(equalTo( + new ParameterizedMessage("Job model snapshot with id [{}] stored", modelSnapshot.getSnapshotId()).getFormattedMessage()))); + + // Verify that deleting model snapshot also deletes associated annotation + deleteModelSnapshot(JOB_ID, modelSnapshot.getSnapshotId()); + assertThat(getAnnotations(), empty()); + } + public void testProcessResults_TimingStats() throws Exception { ResultsBuilder resultsBuilder = new ResultsBuilder() .addBucket(createBucket(true, 100)) @@ -424,7 +478,10 @@ private static ModelSizeStats createModelSizeStats() { } private static ModelSnapshot createModelSnapshot() { - return new ModelSnapshot.Builder(JOB_ID).setSnapshotId(randomAlphaOfLength(12)).build(); + return new ModelSnapshot.Builder(JOB_ID) + .setSnapshotId(randomAlphaOfLength(12)) + .setTimestamp(Date.from(Instant.ofEpochMilli(1000000000))) + .build(); } private static Quantiles createQuantiles() { @@ -606,6 +663,34 @@ private QueryPage getModelSnapshots() throws Exception { return resultHolder.get(); } + private List getAnnotations() throws Exception { + // Refresh the annotations index so that recently indexed annotation docs are visible. + client().admin().indices().prepareRefresh(AnnotationIndex.INDEX_NAME) + .setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED) + .execute() + .actionGet(); + + SearchRequest searchRequest = new SearchRequest(AnnotationIndex.READ_ALIAS_NAME); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + List annotations = new ArrayList<>(); + for (SearchHit hit : searchResponse.getHits().getHits()) { + annotations.add(parseAnnotation(hit.getSourceRef())); + } + return annotations; + } + + private Annotation parseAnnotation(BytesReference source) throws IOException { + try (XContentParser parser = createParser(jsonXContent, source)) { + return Annotation.PARSER.parse(parser, null); + } + } + + private void deleteModelSnapshot(String jobId, String snapshotId) { + DeleteModelSnapshotAction.Request request = new DeleteModelSnapshotAction.Request(jobId, snapshotId); + AcknowledgedResponse response = client().execute(DeleteModelSnapshotAction.INSTANCE, request).actionGet(); + assertThat(response.isAcknowledged(), is(true)); + } + private Optional getQuantiles() throws Exception { AtomicReference errorHolder = new AtomicReference<>(); AtomicReference> resultHolder = new AtomicReference<>(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 996ffec02af52..23e09ea35c6fa 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; @@ -123,6 +124,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private JobResultsProvider jobResultsProvider; private JobResultsPersister jobResultsPersister; private JobDataCountsPersister jobDataCountsPersister; + private AnnotationPersister annotationPersister; private AutodetectCommunicator autodetectCommunicator; private AutodetectProcessFactory autodetectFactory; private NormalizerFactory normalizerFactory; @@ -153,6 +155,7 @@ public void setup() throws Exception { jobResultsPersister = mock(JobResultsPersister.class); when(jobResultsPersister.bulkPersisterBuilder(any(), any())).thenReturn(mock(JobResultsPersister.Builder.class)); jobDataCountsPersister = mock(JobDataCountsPersister.class); + annotationPersister = mock(AnnotationPersister.class); autodetectCommunicator = mock(AutodetectCommunicator.class); autodetectFactory = mock(AutodetectProcessFactory.class); normalizerFactory = mock(NormalizerFactory.class); @@ -706,7 +709,7 @@ private AutodetectProcessManager createSpyManager(Settings settings) { private AutodetectProcessManager createManager(Settings settings) { return new AutodetectProcessManager(environment, settings, client, threadPool, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService, jobManager, jobResultsProvider, - jobResultsPersister, jobDataCountsPersister, autodetectFactory, normalizerFactory, nativeStorageProvider, + jobResultsPersister, jobDataCountsPersister, annotationPersister, autodetectFactory, normalizerFactory, nativeStorageProvider, new IndexNameExpressionResolver()); } private AutodetectProcessManager createSpyManagerAndCallProcessData(String jobId) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java index fc42247024096..116a9b773a07a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java @@ -23,6 +23,8 @@ import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.annotations.Annotation; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; @@ -35,6 +37,7 @@ import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; +import org.elasticsearch.xpack.core.security.user.XPackUser; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; @@ -42,9 +45,13 @@ import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.junit.After; import org.junit.Before; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Date; @@ -72,6 +79,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { private static final String JOB_ID = "valid_id"; private static final long BUCKET_SPAN_MS = 1000; + private static final Instant CURRENT_TIME = Instant.ofEpochMilli(2000000000); private ThreadPool threadPool; private Client client; @@ -79,6 +87,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { private Renormalizer renormalizer; private JobResultsPersister persister; private JobResultsPersister.Builder bulkBuilder; + private AnnotationPersister annotationPersister; private AutodetectProcess process; private FlushListener flushListener; private AutodetectResultProcessor processorUnderTest; @@ -95,6 +104,7 @@ public void setUpMocks() { renormalizer = mock(Renormalizer.class); persister = mock(JobResultsPersister.class); bulkBuilder = mock(JobResultsPersister.Builder.class); + annotationPersister = mock(AnnotationPersister.class); when(persister.bulkPersisterBuilder(eq(JOB_ID), any())).thenReturn(bulkBuilder); process = mock(AutodetectProcess.class); flushListener = mock(FlushListener.class); @@ -104,15 +114,17 @@ public void setUpMocks() { JOB_ID, renormalizer, persister, + annotationPersister, process, new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), new TimingStats(JOB_ID), + Clock.fixed(CURRENT_TIME, ZoneId.systemDefault()), flushListener); } @After public void cleanup() { - verifyNoMoreInteractions(auditor, renormalizer, persister); + verifyNoMoreInteractions(auditor, renormalizer, persister, annotationPersister); executor.shutdown(); } @@ -353,6 +365,7 @@ public void testProcessResult_modelSnapshot() { AutodetectResult result = mock(AutodetectResult.class); ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID) .setSnapshotId("a_snapshot_id") + .setTimestamp(Date.from(Instant.ofEpochMilli(1000000000))) .setMinVersion(Version.CURRENT) .build(); when(result.getModelSnapshot()).thenReturn(modelSnapshot); @@ -367,6 +380,23 @@ public void testProcessResult_modelSnapshot() { verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(persister).persistModelSnapshot(eq(modelSnapshot), eq(WriteRequest.RefreshPolicy.IMMEDIATE), any()); + ArgumentCaptor annotationCaptor = ArgumentCaptor.forClass(Annotation.class); + verify(annotationPersister).persistAnnotation( + eq(ModelSnapshot.annotationDocumentId(modelSnapshot)), annotationCaptor.capture(), any()); + Annotation annotation = annotationCaptor.getValue(); + Annotation expectedAnnotation = + new Annotation( + "Job model snapshot with id [a_snapshot_id] stored", + Date.from(CURRENT_TIME), + XPackUser.NAME, + modelSnapshot.getTimestamp(), + modelSnapshot.getTimestamp(), + JOB_ID, + Date.from(CURRENT_TIME), + XPackUser.NAME, + "annotation"); + assertThat(annotation, is(equalTo(expectedAnnotation))); + UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID, new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build());