Skip to content

Commit

Permalink
Merge remote-tracking branch 'elastic/master' into smarter-incrementa…
Browse files Browse the repository at this point in the history
…lity-snapshots
  • Loading branch information
original-brownbear committed Sep 10, 2019
2 parents 3880103 + e5ab12b commit 03b287e
Show file tree
Hide file tree
Showing 20 changed files with 201 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ static Request multiSearchTemplate(MultiSearchTemplateRequest multiSearchTemplat
if (multiSearchTemplateRequest.maxConcurrentSearchRequests() != MultiSearchRequest.MAX_CONCURRENT_SEARCH_REQUESTS_DEFAULT) {
params.putParam("max_concurrent_searches", Integer.toString(multiSearchTemplateRequest.maxConcurrentSearchRequests()));
}
request.addParameters(params.asMap());

XContent xContent = REQUEST_BODY_CONTENT_TYPE.xContent();
byte[] source = MultiSearchTemplateRequest.writeMultiLineFormat(multiSearchTemplateRequest, xContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1364,12 +1364,20 @@ public void testMultiSearchTemplate() throws Exception {
multiSearchTemplateRequest.add(searchTemplateRequest);
}

Map<String, String> expectedParams = new HashMap<>();
if (randomBoolean()) {
multiSearchTemplateRequest.maxConcurrentSearchRequests(randomIntBetween(1,10));
expectedParams.put("max_concurrent_searches", Integer.toString(multiSearchTemplateRequest.maxConcurrentSearchRequests()));
}
expectedParams.put(RestSearchAction.TYPED_KEYS_PARAM, "true");

Request multiRequest = RequestConverters.multiSearchTemplate(multiSearchTemplateRequest);

assertEquals(HttpPost.METHOD_NAME, multiRequest.getMethod());
assertEquals("/_msearch/template", multiRequest.getEndpoint());
List<SearchTemplateRequest> searchRequests = multiSearchTemplateRequest.requests();
assertEquals(numSearchRequests, searchRequests.size());
assertEquals(expectedParams, multiRequest.getParameters());

HttpEntity actualEntity = multiRequest.getEntity();
byte[] expectedBytes = MultiSearchTemplateRequest.writeMultiLineFormat(multiSearchTemplateRequest, XContentType.JSON.xContent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public abstract class AbstractAuditMessage implements ToXContentObject {
public static final ParseField LEVEL = new ParseField("level");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField NODE_NAME = new ParseField("node_name");
public static final ParseField JOB_TYPE = new ParseField("job_type");

protected static final <T extends AbstractAuditMessage> ConstructingObjectParser<T, Void> createParser(
String name, AbstractAuditMessageFactory<T> messageFactory, ParseField resourceField) {
Expand Down Expand Up @@ -99,13 +100,17 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
if (nodeName != null) {
builder.field(NODE_NAME.getPreferredName(), nodeName);
}
String jobType = getJobType();
if (jobType != null) {
builder.field(JOB_TYPE.getPreferredName(), jobType);
}
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(resourceId, message, level, timestamp, nodeName);
return Objects.hash(resourceId, message, level, timestamp, nodeName, getJobType());
}

@Override
Expand All @@ -122,8 +127,17 @@ public boolean equals(Object obj) {
Objects.equals(message, other.message) &&
Objects.equals(level, other.level) &&
Objects.equals(timestamp, other.timestamp) &&
Objects.equals(nodeName, other.nodeName);
Objects.equals(nodeName, other.nodeName) &&
Objects.equals(getJobType(), other.getJobType());
}

/**
* @return job type string used to tell apart jobs of different types stored in the same index
*/
public abstract String getJobType();

/**
* @return resource id field name used when storing a new message
*/
protected abstract String getResourceField();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public DataFrameAuditMessage(String resourceId, String message, Level level, Dat
super(resourceId, message, level, timestamp, nodeName);
}

@Override
public final String getJobType() {
return null;
}

@Override
protected String getResourceField() {
return TRANSFORM_ID.getPreferredName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1121,12 +1121,13 @@ public static XContentBuilder auditMessageMapping() throws IOException {
XContentBuilder builder = jsonBuilder().startObject();
builder.startObject(SINGLE_MAPPING_NAME);
addMetaInformation(builder);
builder.field(DYNAMIC, "false");
builder.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyDetectionAuditMessage.LEVEL.getPreferredName())
.field(TYPE, KEYWORD)
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyDetectionAuditMessage.MESSAGE.getPreferredName())
.field(TYPE, TEXT)
Expand All @@ -1142,6 +1143,9 @@ public static XContentBuilder auditMessageMapping() throws IOException {
.startObject(AnomalyDetectionAuditMessage.NODE_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyDetectionAuditMessage.JOB_TYPE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public AnomalyDetectionAuditMessage(String resourceId, String message, Level lev
super(resourceId, message, level, timestamp, nodeName);
}

@Override
public final String getJobType() {
return Job.ANOMALY_DETECTOR_JOB_TYPE;
}

@Override
protected String getResourceField() {
return JOB_ID.getPreferredName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package org.elasticsearch.xpack.core.ml.notifications;

public final class AuditorField {
public static final String NOTIFICATIONS_INDEX = ".ml-notifications";

private AuditorField() {}
public static final String NOTIFICATIONS_INDEX = ".ml-notifications-000001";

private AuditorField() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.notifications;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.util.Date;

public class DataFrameAnalyticsAuditMessage extends AbstractAuditMessage {

private static final ParseField JOB_ID = Job.ID;
public static final ConstructingObjectParser<DataFrameAnalyticsAuditMessage, Void> PARSER =
createParser("ml_analytics_audit_message", DataFrameAnalyticsAuditMessage::new, JOB_ID);

public DataFrameAnalyticsAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
super(resourceId, message, level, timestamp, nodeName);
}

@Override
public final String getJobType() {
return "data_frame_analytics";
}

@Override
protected String getResourceField() {
return JOB_ID.getPreferredName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ static class TestAuditMessage extends AbstractAuditMessage {
super(resourceId, message, level, timestamp, nodeName);
}

@Override
public String getJobType() {
return "test_type";
}

@Override
protected String getResourceField() {
return TEST_ID.getPreferredName();
Expand All @@ -42,6 +47,11 @@ public void testGetResourceField() {
assertThat(message.getResourceField(), equalTo(TestAuditMessage.TEST_ID.getPreferredName()));
}

public void testGetJobType() {
TestAuditMessage message = createTestInstance();
assertThat(message.getJobType(), equalTo("test_type"));
}

public void testNewInfo() {
TestAuditMessage message = new TestAuditMessage(RESOURCE_ID, MESSAGE, Level.INFO, TIMESTAMP, NODE_NAME);
assertThat(message.getResourceId(), equalTo(RESOURCE_ID));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,15 @@

import java.util.Date;

import static org.hamcrest.Matchers.nullValue;

public class DataFrameAuditMessageTests extends AbstractXContentTestCase<DataFrameAuditMessage> {

public void testGetJobType() {
DataFrameAuditMessage message = createTestInstance();
assertThat(message.getJobType(), nullValue());
}

@Override
protected DataFrameAuditMessage doParseInstance(XContentParser parser) {
return DataFrameAuditMessage.PARSER.apply(parser, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.util.Date;

import static org.hamcrest.Matchers.equalTo;

public class AnomalyDetectionAuditMessageTests extends AbstractXContentTestCase<AnomalyDetectionAuditMessage> {

public void testGetJobType() {
AnomalyDetectionAuditMessage message = createTestInstance();
assertThat(message.getJobType(), equalTo(Job.ANOMALY_DETECTOR_JOB_TYPE));
}

@Override
protected AnomalyDetectionAuditMessage doParseInstance(XContentParser parser) {
return AnomalyDetectionAuditMessage.PARSER.apply(parser, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.notifications;

import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xpack.core.common.notifications.Level;

import java.util.Date;

import static org.hamcrest.Matchers.equalTo;

public class DataFrameAnalyticsAuditMessageTests extends AbstractXContentTestCase<DataFrameAnalyticsAuditMessage> {

public void testGetJobType() {
DataFrameAnalyticsAuditMessage message = createTestInstance();
assertThat(message.getJobType(), equalTo("data_frame_analytics"));
}

@Override
protected DataFrameAnalyticsAuditMessage doParseInstance(XContentParser parser) {
return DataFrameAnalyticsAuditMessage.PARSER.apply(parser, null);
}

@Override
protected boolean supportsUnknownFields() {
return true;
}

@Override
protected DataFrameAnalyticsAuditMessage createTestInstance() {
return new DataFrameAnalyticsAuditMessage(
randomBoolean() ? null : randomAlphaOfLength(10),
randomAlphaOfLengthBetween(1, 20),
randomFrom(Level.values()),
new Date(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.junit.After;
import org.junit.Before;

Expand Down Expand Up @@ -183,7 +184,8 @@ public void testDeleteExpiredData() throws Exception {
long totalModelSizeStatsBeforeDelete = client().prepareSearch("*")
.setQuery(QueryBuilders.termQuery("result_type", "model_size_stats"))
.get().getHits().getTotalHits().value;
long totalNotificationsCountBeforeDelete = client().prepareSearch(".ml-notifications").get().getHits().getTotalHits().value;
long totalNotificationsCountBeforeDelete =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value;
assertThat(totalModelSizeStatsBeforeDelete, greaterThan(0L));
assertThat(totalNotificationsCountBeforeDelete, greaterThan(0L));

Expand Down Expand Up @@ -233,7 +235,8 @@ public void testDeleteExpiredData() throws Exception {
long totalModelSizeStatsAfterDelete = client().prepareSearch("*")
.setQuery(QueryBuilders.termQuery("result_type", "model_size_stats"))
.get().getHits().getTotalHits().value;
long totalNotificationsCountAfterDelete = client().prepareSearch(".ml-notifications").get().getHits().getTotalHits().value;
long totalNotificationsCountAfterDelete =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value;
assertThat(totalModelSizeStatsAfterDelete, equalTo(totalModelSizeStatsBeforeDelete));
assertThat(totalNotificationsCountAfterDelete, greaterThanOrEqualTo(totalNotificationsCountBeforeDelete));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.core.ml.job.config.RuleScope;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.junit.After;

import java.io.IOException;
Expand Down Expand Up @@ -186,7 +187,8 @@ public void testScope() throws Exception {

// Wait until the notification that the filter was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
SearchResponse searchResponse =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX)
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(QueryBuilders.boolQuery()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.junit.After;

import java.io.IOException;
Expand Down Expand Up @@ -223,7 +224,8 @@ public void testAddEventsToOpenJob() throws Exception {

// Wait until the notification that the process was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
SearchResponse searchResponse =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX)
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(QueryBuilders.boolQuery()
Expand Down Expand Up @@ -298,7 +300,8 @@ public void testAddOpenedJobToGroupWithCalendar() throws Exception {

// Wait until the notification that the job was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
SearchResponse searchResponse =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX)
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(QueryBuilders.boolQuery()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.process.DummyController;
import org.elasticsearch.xpack.ml.process.MlController;
import org.elasticsearch.xpack.ml.process.MlControllerHolder;
Expand Down Expand Up @@ -471,6 +472,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
}

AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName());
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
Expand Down Expand Up @@ -593,6 +595,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
jobDataCountsPersister,
datafeedManager,
anomalyDetectionAuditor,
dataFrameAnalyticsAuditor,
new MlAssignmentNotifier(settings, anomalyDetectionAuditor, threadPool, client, clusterService),
memoryTracker,
analyticsProcessManager,
Expand Down Expand Up @@ -898,8 +901,12 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat

public static boolean allTemplatesInstalled(ClusterState clusterState) {
boolean allPresent = true;
List<String> templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobResultsIndexPrefix());
List<String> templateNames =
Arrays.asList(
AuditorField.NOTIFICATIONS_INDEX,
MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
AnomalyDetectorsIndex.jobResultsIndexPrefix());
for (String templateName : templateNames) {
allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState);
}
Expand Down
Loading

0 comments on commit 03b287e

Please sign in to comment.