Skip to content

Commit

Permalink
[ML] Datafeed config CRUD operations (elastic#32854)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Oct 29, 2018
1 parent 27a81f7 commit 59a1205
Show file tree
Hide file tree
Showing 23 changed files with 1,047 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ public final class MlMetaIndex {
*/
public static final String INDEX_NAME = ".ml-meta";

public static final String INCLUDE_TYPE_KEY = "include_type";

public static final String TYPE = "doc";

private MlMetaIndex() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOut
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
DelegatingMapParams extendedParams =
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_CLUSTER_STATE, "true"), params);
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
datafeed.doXContentBody(builder, params);
builder.endObject();
datafeed.toXContent(builder, params);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -111,7 +111,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (description != null) {
builder.field(DESCRIPTION.getPreferredName(), description);
}
if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) {
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) {
builder.field(TYPE.getPreferredName(), CALENDAR_TYPE);
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.Operator;
import org.elasticsearch.xpack.core.ml.job.config.RuleAction;
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.Intervals;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;

import java.io.IOException;
Expand Down Expand Up @@ -170,7 +170,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (eventId != null) {
builder.field(EVENT_ID.getPreferredName(), eventId);
}
if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) {
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) {
builder.field(TYPE.getPreferredName(), SCHEDULED_EVENT_TYPE);
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
public static final String DOC_COUNT = "doc_count";

public static final ParseField ID = new ParseField("datafeed_id");
public static final ParseField CONFIG_TYPE = new ParseField("config_type");
public static final ParseField QUERY_DELAY = new ParseField("query_delay");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField INDEXES = new ParseField("indexes");
Expand All @@ -94,6 +95,7 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie
ObjectParser<Builder, Void> parser = new ObjectParser<>("datafeed_config", ignoreUnknownFields, Builder::new);

parser.declareString(Builder::setId, ID);
parser.declareString((c, s) -> {}, CONFIG_TYPE);
parser.declareString(Builder::setJobId, Job.ID);
parser.declareStringArray(Builder::setIndices, INDEXES);
parser.declareStringArray(Builder::setIndices, INDICES);
Expand Down Expand Up @@ -199,6 +201,16 @@ public DatafeedConfig(StreamInput in) throws IOException {
}
}

/**
* The name of datafeed configuration document name from the datafeed ID.
*
* @param datafeedId The datafeed ID
* @return The ID of document the datafeed config is persisted in
*/
public static String documentId(String datafeedId) {
return "datafeed-" + datafeedId;
}

public String getId() {
return id;
}
Expand All @@ -207,6 +219,10 @@ public String getJobId() {
return jobId;
}

public String getConfigType() {
return TYPE;
}

public TimeValue getQueryDelay() {
return queryDelay;
}
Expand Down Expand Up @@ -297,14 +313,11 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
doXContentBody(builder, params);
builder.endObject();
return builder;
}

public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(ID.getPreferredName(), id);
builder.field(Job.ID.getPreferredName(), jobId);
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false) == true) {
builder.field(CONFIG_TYPE.getPreferredName(), TYPE);
}
builder.field(QUERY_DELAY.getPreferredName(), queryDelay.getStringRep());
if (frequency != null) {
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
Expand All @@ -326,9 +339,10 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
if (chunkingConfig != null) {
builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig);
}
if (headers.isEmpty() == false && params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == true) {
if (headers.isEmpty() == false && params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false) == true) {
builder.field(HEADERS.getPreferredName(), headers);
}
builder.endObject();
return builder;
}

Expand Down Expand Up @@ -468,6 +482,10 @@ public void setId(String datafeedId) {
id = ExceptionsHelper.requireNonNull(datafeedId, ID.getPreferredName());
}

public String getId() {
return id;
}

public void setJobId(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
// negative means "unknown", which should only happen for a 5.4 job
if (detectorIndex >= 0
// no point writing this to cluster state, as the indexes will get reassigned on reload anyway
&& params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == false) {
&& params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false) == false) {
builder.field(DETECTOR_INDEX.getPreferredName(), detectorIndex);
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -101,7 +101,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(DESCRIPTION.getPreferredName(), description);
}
builder.field(ITEMS.getPreferredName(), items);
if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) {
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) {
builder.field(TYPE.getPreferredName(), FILTER_TYPE);
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
public final class ToXContentParams {

/**
* Parameter to indicate whether we are serialising to X Content for cluster state output.
* Parameter to indicate whether we are serialising to X Content for
* internal storage. Certain fields need to be persisted but should
* not be visible everywhere.
*/
public static final String FOR_CLUSTER_STATE = "for_cluster_state";
public static final String FOR_INTERNAL_STORAGE = "for_internal_storage";

/**
* When serialising POJOs to X Content this indicates whether the type field
* should be included or not
*/
public static final String INCLUDE_TYPE = "include_type";

private ToXContentParams() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
Expand All @@ -36,17 +40,22 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig.Mode;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
Expand All @@ -63,6 +72,10 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId) {
}

public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long bucketSpanMillis) {
return createRandomizedDatafeedConfigBuilder(jobId, bucketSpanMillis).build();
}

private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(String jobId, long bucketSpanMillis) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(randomValidDatafeedId(), jobId);
builder.setIndices(randomStringList(1, 10));
builder.setTypes(randomStringList(0, 10));
Expand Down Expand Up @@ -109,7 +122,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
return builder.build();
return builder;
}

@Override
Expand Down Expand Up @@ -167,6 +180,33 @@ public void testFutureMetadataParse() throws IOException {
assertNotNull(DatafeedConfig.LENIENT_PARSER.apply(parser, null).build());
}

public void testToXContentForInternalStorage() throws IOException {
DatafeedConfig.Builder builder = createRandomizedDatafeedConfigBuilder("foo", 300);

// headers are only persisted to cluster state
Map<String, String> headers = new HashMap<>();
headers.put("header-name", "header-value");
builder.setHeaders(headers);
DatafeedConfig config = builder.build();

ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"));

BytesReference forClusterstateXContent = XContentHelper.toXContent(config, XContentType.JSON, params, false);
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, forClusterstateXContent.streamInput());

DatafeedConfig parsedConfig = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build();
assertThat(parsedConfig.getHeaders(), hasEntry("header-name", "header-value"));

// headers are not written without the FOR_INTERNAL_STORAGE param
BytesReference nonClusterstateXContent = XContentHelper.toXContent(config, XContentType.JSON, ToXContent.EMPTY_PARAMS, false);
parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, nonClusterstateXContent.streamInput());

parsedConfig = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build();
assertThat(parsedConfig.getHeaders().entrySet(), hasSize(0));
}

public void testCopyConstructor() {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
DatafeedConfig datafeedConfig = createTestInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;

Expand Down Expand Up @@ -67,7 +68,7 @@ protected void doExecute(Task task, PostCalendarEventsAction.Request request,
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
indexRequest.source(event.toXContent(builder,
new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY,
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE,
"true"))));
} catch (IOException e) {
throw new IllegalStateException("Failed to serialise event", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.xpack.core.ml.action.PutCalendarAction;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -53,7 +54,7 @@ protected void doExecute(Task task, PutCalendarAction.Request request, ActionLis
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, calendar.documentId());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
indexRequest.source(calendar.toXContent(builder,
new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"))));
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true"))));
} catch (IOException e) {
throw new IllegalStateException("Failed to serialise calendar with id [" + calendar.getId() + "]", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -53,7 +54,7 @@ protected void doExecute(Task task, PutFilterAction.Request request, ActionListe
indexRequest.opType(DocWriteRequest.OpType.CREATE);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"));
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true"));
indexRequest.source(filter.toXContent(builder, params));
} catch (IOException e) {
throw new IllegalStateException("Failed to serialise filter with id [" + filter.getId() + "]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.ml.job.JobManager;

import java.io.IOException;
Expand Down Expand Up @@ -105,7 +106,7 @@ private void indexUpdatedFilter(MlFilter filter, long version, UpdateFilterActio
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"));
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true"));
indexRequest.source(filter.toXContent(builder, params));
} catch (IOException e) {
throw new IllegalStateException("Failed to serialise filter with id [" + filter.getId() + "]", e);
Expand Down
Loading

0 comments on commit 59a1205

Please sign in to comment.