Skip to content

Commit

Permalink
Create an annotation when a model snapshot is stored (#53783)
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek authored Mar 30, 2020
1 parent c20dc02 commit 7c5c912
Show file tree
Hide file tree
Showing 19 changed files with 422 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.io.IOException;
import java.util.Date;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,14 @@ public class AnnotationIndex {
public static final String WRITE_ALIAS_NAME = ".ml-annotations-write";
// Exposed for testing, but always use the aliases in non-test code
public static final String INDEX_NAME = ".ml-annotations-6";
public static final String INDEX_PATTERN = ".ml-annotations*";

private static final String MAPPINGS_VERSION_VARIABLE = "xpack.ml.version";

/**
* Create the .ml-annotations index with correct mappings if it does not already
* exist. This index is read and written by the UI results views, so needs to
* exist when there might be ML results to view.
* Create the .ml-annotations-6 index with correct mappings if it does not already exist. This index is read and written by the UI
* results views, so needs to exist when there might be ML results to view.
*/
public static void createAnnotationsIndexIfNecessary(Settings settings, Client client, ClusterState state,
final ActionListener<Boolean> finalListener) {
public static void createAnnotationsIndexIfNecessary(Client client, ClusterState state, final ActionListener<Boolean> finalListener) {

final ActionListener<Boolean> createAliasListener = ActionListener.wrap(success -> {
final IndicesAliasesRequest request =
Expand Down Expand Up @@ -97,8 +94,8 @@ public static void createAnnotationsIndexIfNecessary(Settings settings, Client c
finalListener.onResponse(false);
}

public static String annotationsMapping() {
return TemplateUtils.loadTemplate("/org/elasticsearch/xpack/core/ml/annotations_index_mappings.json",
Version.CURRENT.toString(), MAPPINGS_VERSION_VARIABLE);
private static String annotationsMapping() {
return TemplateUtils.loadTemplate(
"/org/elasticsearch/xpack/core/ml/annotations_index_mappings.json", Version.CURRENT.toString(), MAPPINGS_VERSION_VARIABLE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.annotations;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;

/**
* Persists annotations to Elasticsearch index.
*/
public class AnnotationPersister {

private static final Logger logger = LogManager.getLogger(AnnotationPersister.class);

private final Client client;
private final AbstractAuditor auditor;

public AnnotationPersister(Client client, AbstractAuditor auditor) {
this.client = Objects.requireNonNull(client);
this.auditor = Objects.requireNonNull(auditor);
}

/**
* Persists the given annotation to annotations index.
*
* @param annotationId existing annotation id. If {@code null}, a new annotation will be created and id will be assigned automatically
* @param annotation annotation to be persisted
* @param errorMessage error message to report when annotation fails to be persisted
* @return tuple of the form (annotation id, annotation object)
*/
public Tuple<String, Annotation> persistAnnotation(@Nullable String annotationId, Annotation annotation, String errorMessage) {
Objects.requireNonNull(annotation);
try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
IndexRequest indexRequest =
new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME)
.id(annotationId)
.source(xContentBuilder);
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
IndexResponse response = client.index(indexRequest).actionGet();
return Tuple.tuple(response.getId(), annotation);
}
} catch (IOException ex) {
String jobId = annotation.getJobId();
logger.error(errorMessage, ex);
auditor.error(jobId, errorMessage);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public final class Messages {
public static final String JOB_AUDIT_DELETED = "Job deleted";
public static final String JOB_AUDIT_KILLING = "Killing job";
public static final String JOB_AUDIT_OLD_RESULTS_DELETED = "Deleted results prior to {0}";
public static final String JOB_AUDIT_SNAPSHOT_STORED = "Job model snapshot with id [{0}] stored";
public static final String JOB_AUDIT_REVERTED = "Job model snapshot reverted to ''{0}''";
public static final String JOB_AUDIT_SNAPSHOT_DELETED = "Model snapshot [{0}] with description ''{1}'' deleted";
public static final String JOB_AUDIT_FILTER_UPDATED_ON_PROCESS = "Updated filter [{0}] in running process";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ public static String documentIdPrefix(String jobId) {
return jobId + "_" + TYPE + "_";
}

public static String annotationDocumentId(ModelSnapshot snapshot) {
return "annotation_for_" + documentId(snapshot);
}

public static String documentId(ModelSnapshot snapshot) {
return documentId(snapshot.getJobId(), snapshot.getSnapshotId());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.annotations;

import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;

import java.io.IOException;

import static org.elasticsearch.common.collect.Tuple.tuple;
import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class AnnotationPersisterTests extends ESTestCase {

private static final String ANNOTATION_ID = "existing_annotation_id";
private static final String ERROR_MESSAGE = "an error occurred while persisting annotation";

private Client client;
private AbstractAuditor auditor;
private IndexResponse indexResponse;

private ArgumentCaptor<IndexRequest> indexRequestCaptor;

@Before
public void setUpMocks() {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(threadContext);
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);

auditor = mock(AbstractAuditor.class);

indexResponse = mock(IndexResponse.class);
when(indexResponse.getId()).thenReturn(ANNOTATION_ID);

indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
}

public void testPersistAnnotation_Create() throws IOException {
doReturn(instantFuture(indexResponse)).when(client).index(any());

AnnotationPersister persister = new AnnotationPersister(client, auditor);
Annotation annotation = AnnotationTests.randomAnnotation();
Tuple<String, Annotation> result = persister.persistAnnotation(null, annotation, ERROR_MESSAGE);
assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation))));

InOrder inOrder = inOrder(client);
inOrder.verify(client).threadPool();
inOrder.verify(client).index(indexRequestCaptor.capture());
verifyNoMoreInteractions(client, auditor);

IndexRequest indexRequest = indexRequestCaptor.getValue();

assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME)));
assertThat(indexRequest.id(), is(nullValue()));
assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation)));
assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX));
}

public void testPersistAnnotation_Update() throws IOException {
doReturn(instantFuture(indexResponse)).when(client).index(any());

AnnotationPersister persister = new AnnotationPersister(client, auditor);
Annotation annotation = AnnotationTests.randomAnnotation();
Tuple<String, Annotation> result = persister.persistAnnotation(ANNOTATION_ID, annotation, ERROR_MESSAGE);
assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation))));

InOrder inOrder = inOrder(client);
inOrder.verify(client).threadPool();
inOrder.verify(client).index(indexRequestCaptor.capture());
verifyNoMoreInteractions(client, auditor);

IndexRequest indexRequest = indexRequestCaptor.getValue();
assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME)));
assertThat(indexRequest.id(), is(equalTo(ANNOTATION_ID)));
assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation)));
assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX));
}

@SuppressWarnings("unchecked")
private static <T> ActionFuture<T> instantFuture(T response) {
ActionFuture future = mock(ActionFuture.class);
when(future.actionGet()).thenReturn(response);
return future;
}

private Annotation parseAnnotation(BytesReference source) throws IOException {
try (XContentParser parser = createParser(jsonXContent, source)) {
return Annotation.PARSER.parse(parser, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ protected Annotation doParseInstance(XContentParser parser) {

@Override
protected Annotation createTestInstance() {
return randomAnnotation();
}

static Annotation randomAnnotation() {
return new Annotation(randomAlphaOfLengthBetween(100, 1000),
new Date(randomNonNegativeLong()),
randomAlphaOfLengthBetween(5, 20),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
Expand Down Expand Up @@ -534,6 +535,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
new MlIndexTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry);

AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
AnnotationPersister anomalyDetectionAnnotationPersister = new AnnotationPersister(client, anomalyDetectionAuditor);
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName());
InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService.getNodeName());
this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
Expand Down Expand Up @@ -613,13 +615,15 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(environment, settings, client, threadPool,
xContentRegistry, anomalyDetectionAuditor, clusterService, jobManager, jobResultsProvider, jobResultsPersister,
jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, nativeStorageProvider, indexNameExpressionResolver);
jobDataCountsPersister, anomalyDetectionAnnotationPersister, autodetectProcessFactory, normalizerFactory,
nativeStorageProvider, indexNameExpressionResolver);
this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder =
new DatafeedJobBuilder(
client,
xContentRegistry,
anomalyDetectionAuditor,
anomalyDetectionAnnotationPersister,
System::currentTimeMillis,
jobConfigProvider,
jobResultsProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void clusterChanged(ClusterChangedEvent event) {
// The atomic flag prevents multiple simultaneous attempts to create the
// index if there is a flurry of cluster state updates in quick succession
if (event.localNodeMaster() && isIndexCreationInProgress.compareAndSet(false, true)) {
AnnotationIndex.createAnnotationsIndexIfNecessary(settings, client, event.state(), ActionListener.wrap(
AnnotationIndex.createAnnotationsIndexIfNecessary(client, event.state(), ActionListener.wrap(
r -> {
isIndexCreationInProgress.set(false);
if (r) {
Expand Down
Loading

0 comments on commit 7c5c912

Please sign in to comment.