From 59a1205157ec7d14b25fd4aa4ecb90a6e5e4b528 Mon Sep 17 00:00:00 2001 From: David Kyle <david.kyle@elastic.co> Date: Thu, 16 Aug 2018 10:09:32 +0100 Subject: [PATCH] [ML] Datafeed config CRUD operations (#32854) --- .../xpack/core/ml/MlMetaIndex.java | 2 - .../xpack/core/ml/MlMetadata.java | 2 +- .../core/ml/action/PutDatafeedAction.java | 4 +- .../xpack/core/ml/calendars/Calendar.java | 4 +- .../core/ml/calendars/ScheduledEvent.java | 4 +- .../core/ml/datafeed/DatafeedConfig.java | 32 +- .../xpack/core/ml/job/config/Detector.java | 2 +- .../xpack/core/ml/job/config/MlFilter.java | 4 +- .../xpack/core/ml/utils/ToXContentParams.java | 12 +- .../core/ml/datafeed/DatafeedConfigTests.java | 42 +- .../TransportPostCalendarEventsAction.java | 3 +- .../ml/action/TransportPutCalendarAction.java | 3 +- .../ml/action/TransportPutFilterAction.java | 3 +- .../action/TransportUpdateFilterAction.java | 3 +- .../persistence/DatafeedConfigProvider.java | 393 ++++++++++++++++++ .../job/persistence/ExpandedIdsMatcher.java | 158 +++++++ .../ml/job/persistence/JobConfigProvider.java | 163 +------- .../xpack/ml/MlSingleNodeTestCase.java | 32 ++ .../integration/DatafeedConfigProviderIT.java | 253 +++++++++++ .../ml/integration/JobConfigProviderIT.java | 31 -- .../ml/integration/JobResultsProviderIT.java | 7 +- .../persistence/ExpandedIdsMatcherTests.java | 101 +++++ .../persistence/JobConfigProviderTests.java | 96 ----- 23 files changed, 1047 insertions(+), 307 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcher.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcherTests.java delete mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProviderTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetaIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetaIndex.java index d625e6e311aaf..9014c415f16bb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetaIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetaIndex.java @@ -21,8 +21,6 @@ public final class MlMetaIndex { */ public static final String INDEX_NAME = ".ml-meta"; - public static final String INCLUDE_TYPE_KEY = "include_type"; - public static final String TYPE = "doc"; private MlMetaIndex() {} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 8d3c6a3565f93..14736a764390b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -167,7 +167,7 @@ private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOut @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { DelegatingMapParams extendedParams = - new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_CLUSTER_STATE, "true"), params); + new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params); mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams); mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams); return builder; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java index 4d3f720026e14..448d826973595 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java @@ -138,9 +138,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - datafeed.doXContentBody(builder, params); - builder.endObject(); + datafeed.toXContent(builder, params); return builder; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/calendars/Calendar.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/calendars/Calendar.java index 9add81aace357..723f1b5c8b7ca 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/calendars/Calendar.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/calendars/Calendar.java @@ -13,7 +13,7 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.xpack.core.ml.MlMetaIndex; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; import java.util.Arrays; @@ -111,7 +111,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (description != null) { builder.field(DESCRIPTION.getPreferredName(), description); } - if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) { + if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) { builder.field(TYPE.getPreferredName(), CALENDAR_TYPE); } builder.endObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/calendars/ScheduledEvent.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/calendars/ScheduledEvent.java index 79e569987fa02..042775c8024e4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/calendars/ScheduledEvent.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/calendars/ScheduledEvent.java @@ -15,7 +15,6 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.Operator; import org.elasticsearch.xpack.core.ml.job.config.RuleAction; @@ -23,6 +22,7 @@ import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.Intervals; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils; import java.io.IOException; @@ -170,7 +170,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (eventId != null) { builder.field(EVENT_ID.getPreferredName(), eventId); } - if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) { + if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) { builder.field(TYPE.getPreferredName(), SCHEDULED_EVENT_TYPE); } builder.endObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 3c6565e13c0ff..21faf0a3456cc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -72,6 +72,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements public static final String DOC_COUNT = "doc_count"; public static final ParseField ID = new ParseField("datafeed_id"); + public static final ParseField CONFIG_TYPE = new ParseField("config_type"); public static final ParseField QUERY_DELAY = new ParseField("query_delay"); public static final ParseField FREQUENCY = new ParseField("frequency"); public static final ParseField INDEXES = new ParseField("indexes"); @@ -94,6 +95,7 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie ObjectParser<Builder, Void> parser = new ObjectParser<>("datafeed_config", ignoreUnknownFields, Builder::new); parser.declareString(Builder::setId, ID); + parser.declareString((c, s) -> {}, CONFIG_TYPE); parser.declareString(Builder::setJobId, Job.ID); parser.declareStringArray(Builder::setIndices, INDEXES); parser.declareStringArray(Builder::setIndices, INDICES); @@ -199,6 +201,16 @@ public DatafeedConfig(StreamInput in) throws IOException { } } + /** + * The name of datafeed configuration document name from the datafeed ID. + * + * @param datafeedId The datafeed ID + * @return The ID of document the datafeed config is persisted in + */ + public static String documentId(String datafeedId) { + return "datafeed-" + datafeedId; + } + public String getId() { return id; } @@ -207,6 +219,10 @@ public String getJobId() { return jobId; } + public String getConfigType() { + return TYPE; + } + public TimeValue getQueryDelay() { return queryDelay; } @@ -297,14 +313,11 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - doXContentBody(builder, params); - builder.endObject(); - return builder; - } - - public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.field(ID.getPreferredName(), id); builder.field(Job.ID.getPreferredName(), jobId); + if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false) == true) { + builder.field(CONFIG_TYPE.getPreferredName(), TYPE); + } builder.field(QUERY_DELAY.getPreferredName(), queryDelay.getStringRep()); if (frequency != null) { builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep()); @@ -326,9 +339,10 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th if (chunkingConfig != null) { builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig); } - if (headers.isEmpty() == false && params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == true) { + if (headers.isEmpty() == false && params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false) == true) { builder.field(HEADERS.getPreferredName(), headers); } + builder.endObject(); return builder; } @@ -468,6 +482,10 @@ public void setId(String datafeedId) { id = ExceptionsHelper.requireNonNull(datafeedId, ID.getPreferredName()); } + public String getId() { + return id; + } + public void setJobId(String jobId) { this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java index d53e4cb74126d..a09bc376da2b2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java @@ -302,7 +302,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws // negative means "unknown", which should only happen for a 5.4 job if (detectorIndex >= 0 // no point writing this to cluster state, as the indexes will get reassigned on reload anyway - && params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == false) { + && params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false) == false) { builder.field(DETECTOR_INDEX.getPreferredName(), detectorIndex); } builder.endObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/MlFilter.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/MlFilter.java index 48051fa4733ff..f2be3315b4dc7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/MlFilter.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/MlFilter.java @@ -14,10 +14,10 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.MlStrings; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; import java.util.Arrays; @@ -101,7 +101,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(DESCRIPTION.getPreferredName(), description); } builder.field(ITEMS.getPreferredName(), items); - if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) { + if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) { builder.field(TYPE.getPreferredName(), FILTER_TYPE); } builder.endObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ToXContentParams.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ToXContentParams.java index d120e8cf6685e..f7fb9d46ec8a8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ToXContentParams.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ToXContentParams.java @@ -12,9 +12,17 @@ public final class ToXContentParams { /** - * Parameter to indicate whether we are serialising to X Content for cluster state output. + * Parameter to indicate whether we are serialising to X Content for + * internal storage. Certain fields need to be persisted but should + * not be visible everywhere. */ - public static final String FOR_CLUSTER_STATE = "for_cluster_state"; + public static final String FOR_INTERNAL_STORAGE = "for_internal_storage"; + + /** + * When serialising POJOs to X Content this indicates whether the type field + * should be included or not + */ + public static final String INCLUDE_TYPE = "include_type"; private ToXContentParams() { } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index 36bd2fbcb4689..f0c7806fd8cf9 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -8,13 +8,17 @@ import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParseException; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; @@ -36,17 +40,22 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig.Mode; import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.joda.time.DateTimeZone; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.TimeZone; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; @@ -63,6 +72,10 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId) { } public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long bucketSpanMillis) { + return createRandomizedDatafeedConfigBuilder(jobId, bucketSpanMillis).build(); + } + + private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(String jobId, long bucketSpanMillis) { DatafeedConfig.Builder builder = new DatafeedConfig.Builder(randomValidDatafeedId(), jobId); builder.setIndices(randomStringList(1, 10)); builder.setTypes(randomStringList(0, 10)); @@ -109,7 +122,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b if (randomBoolean()) { builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk()); } - return builder.build(); + return builder; } @Override @@ -167,6 +180,33 @@ public void testFutureMetadataParse() throws IOException { assertNotNull(DatafeedConfig.LENIENT_PARSER.apply(parser, null).build()); } + public void testToXContentForInternalStorage() throws IOException { + DatafeedConfig.Builder builder = createRandomizedDatafeedConfigBuilder("foo", 300); + + // headers are only persisted to cluster state + Map<String, String> headers = new HashMap<>(); + headers.put("header-name", "header-value"); + builder.setHeaders(headers); + DatafeedConfig config = builder.build(); + + ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")); + + BytesReference forClusterstateXContent = XContentHelper.toXContent(config, XContentType.JSON, params, false); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, forClusterstateXContent.streamInput()); + + DatafeedConfig parsedConfig = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build(); + assertThat(parsedConfig.getHeaders(), hasEntry("header-name", "header-value")); + + // headers are not written without the FOR_INTERNAL_STORAGE param + BytesReference nonClusterstateXContent = XContentHelper.toXContent(config, XContentType.JSON, ToXContent.EMPTY_PARAMS, false); + parser = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, nonClusterstateXContent.streamInput()); + + parsedConfig = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build(); + assertThat(parsedConfig.getHeaders().entrySet(), hasSize(0)); + } + public void testCopyConstructor() { for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { DatafeedConfig datafeedConfig = createTestInstance(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java index 7284a490eaa8f..24bc4ad016e7b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.calendars.Calendar; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; @@ -67,7 +68,7 @@ protected void doExecute(Task task, PostCalendarEventsAction.Request request, IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { indexRequest.source(event.toXContent(builder, - new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, + new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true")))); } catch (IOException e) { throw new IllegalStateException("Failed to serialise event", e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java index 7611a27cd5a1d..d50b798ebe729 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutCalendarAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.action.PutCalendarAction; import org.elasticsearch.xpack.core.ml.calendars.Calendar; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; import java.util.Collections; @@ -53,7 +54,7 @@ protected void doExecute(Task task, PutCalendarAction.Request request, ActionLis IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, calendar.documentId()); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { indexRequest.source(calendar.toXContent(builder, - new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true")))); + new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true")))); } catch (IOException e) { throw new IllegalStateException("Failed to serialise calendar with id [" + calendar.getId() + "]", e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutFilterAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutFilterAction.java index 19bf35aaed617..0414e1cdf140c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutFilterAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutFilterAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.core.ml.action.PutFilterAction; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; import java.util.Collections; @@ -53,7 +54,7 @@ protected void doExecute(Task task, PutFilterAction.Request request, ActionListe indexRequest.opType(DocWriteRequest.OpType.CREATE); indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true")); + ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true")); indexRequest.source(filter.toXContent(builder, params)); } catch (IOException e) { throw new IllegalStateException("Failed to serialise filter with id [" + filter.getId() + "]", e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java index c8dbf9273829f..abbefa1e4936f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.ml.job.JobManager; import java.io.IOException; @@ -105,7 +106,7 @@ private void indexUpdatedFilter(MlFilter filter, long version, UpdateFilterActio indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true")); + ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true")); indexRequest.source(filter.toXContent(builder, params)); } catch (IOException e) { throw new IllegalStateException("Failed to serialise filter with id [" + filter.getId() + "]", e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java new file mode 100644 index 0000000000000..9702f1096ecf4 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -0,0 +1,393 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.persistence; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.delete.DeleteAction; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.get.GetAction; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.index.query.WildcardQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; +import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + +public class DatafeedConfigProvider extends AbstractComponent { + + private final Client client; + private final NamedXContentRegistry xContentRegistry; + + private static final Map<String, String> TO_XCONTENT_PARAMS = new HashMap<>(); + static { + TO_XCONTENT_PARAMS.put(ToXContentParams.FOR_INTERNAL_STORAGE, "true"); + TO_XCONTENT_PARAMS.put(ToXContentParams.INCLUDE_TYPE, "true"); + } + + public DatafeedConfigProvider(Client client, Settings settings, NamedXContentRegistry xContentRegistry) { + super(settings); + this.client = client; + this.xContentRegistry = xContentRegistry; + } + + /** + * Persist the datafeed configuration to the config index. + * It is an error if a datafeed with the same Id already exists - + * the config will not be overwritten. + * + * @param config The datafeed configuration + * @param listener Index response listener + */ + public void putDatafeedConfig(DatafeedConfig config, ActionListener<IndexResponse> listener) { + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + XContentBuilder source = config.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); + + IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(config.getId())) + .setSource(source) + .setOpType(DocWriteRequest.OpType.CREATE) + .request(); + + executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener); + + } catch (IOException e) { + listener.onFailure(new ElasticsearchParseException("Failed to serialise datafeed config with id [" + config.getId() + "]", e)); + } + } + + /** + * Get the datafeed config specified by {@code datafeedId}. + * If the datafeed document is missing a {@code ResourceNotFoundException} + * is returned via the listener. + * + * @param datafeedId The datafeed ID + * @param datafeedConfigListener The config listener + */ + public void getDatafeedConfig(String datafeedId, ActionListener<DatafeedConfig.Builder> datafeedConfigListener) { + GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId)); + executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener<GetResponse>() { + @Override + public void onResponse(GetResponse getResponse) { + if (getResponse.isExists() == false) { + datafeedConfigListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId)); + return; + } + BytesReference source = getResponse.getSourceAsBytesRef(); + parseLenientlyFromSource(source, datafeedConfigListener); + } + @Override + public void onFailure(Exception e) { + datafeedConfigListener.onFailure(e); + } + }); + } + + /** + * Delete the datafeed config document + * + * @param datafeedId The datafeed id + * @param actionListener Deleted datafeed listener + */ + public void deleteDatafeedConfig(String datafeedId, ActionListener<DeleteResponse> actionListener) { + DeleteRequest request = new DeleteRequest(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId)); + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteAction.INSTANCE, request, new ActionListener<DeleteResponse>() { + @Override + public void onResponse(DeleteResponse deleteResponse) { + if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { + actionListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId)); + return; + } + assert deleteResponse.getResult() == DocWriteResponse.Result.DELETED; + actionListener.onResponse(deleteResponse); + } + @Override + public void onFailure(Exception e) { + actionListener.onFailure(e); + } + }); + } + + /** + * Get the datafeed config and apply the {@code update} + * then index the modified config setting the version in the request. + * + * @param datafeedId The Id of the datafeed to update + * @param update The update + * @param headers Datafeed headers applied with the update + * @param updatedConfigListener Updated datafeed config listener + */ + public void updateDatefeedConfig(String datafeedId, DatafeedUpdate update, Map<String, String> headers, + ActionListener<DatafeedConfig> updatedConfigListener) { + GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId)); + + executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener<GetResponse>() { + @Override + public void onResponse(GetResponse getResponse) { + if (getResponse.isExists() == false) { + updatedConfigListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId)); + return; + } + long version = getResponse.getVersion(); + BytesReference source = getResponse.getSourceAsBytesRef(); + DatafeedConfig.Builder configBuilder; + try { + configBuilder = parseLenientlyFromSource(source); + } catch (IOException e) { + updatedConfigListener.onFailure( + new ElasticsearchParseException("Failed to parse datafeed config [" + datafeedId + "]", e)); + return; + } + + DatafeedConfig updatedConfig; + try { + updatedConfig = update.apply(configBuilder.build(), headers); + } catch (Exception e) { + updatedConfigListener.onFailure(e); + return; + } + + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + XContentBuilder updatedSource = updatedConfig.toXContent(builder, ToXContent.EMPTY_PARAMS); + IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(updatedConfig.getId())) + .setSource(updatedSource) + .setVersion(version) + .request(); + + executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap( + indexResponse -> { + assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED; + updatedConfigListener.onResponse(updatedConfig); + }, + updatedConfigListener::onFailure + )); + + } catch (IOException e) { + updatedConfigListener.onFailure( + new ElasticsearchParseException("Failed to serialise datafeed config with id [" + datafeedId + "]", e)); + } + } + + @Override + public void onFailure(Exception e) { + updatedConfigListener.onFailure(e); + } + }); + } + + /** + * Expands an expression into the set of matching names. {@code expresssion} + * may be a wildcard, a datafeed ID or a list of those. + * If {@code expression} == 'ALL', '*' or the empty string then all + * datafeed IDs are returned. + * + * For example, given a set of names ["foo-1", "foo-2", "bar-1", bar-2"], + * expressions resolve follows: + * <ul> + * <li>"foo-1" : ["foo-1"]</li> + * <li>"bar-1" : ["bar-1"]</li> + * <li>"foo-1,foo-2" : ["foo-1", "foo-2"]</li> + * <li>"foo-*" : ["foo-1", "foo-2"]</li> + * <li>"*-1" : ["bar-1", "foo-1"]</li> + * <li>"*" : ["bar-1", "bar-2", "foo-1", "foo-2"]</li> + * <li>"_all" : ["bar-1", "bar-2", "foo-1", "foo-2"]</li> + * </ul> + * + * @param expression the expression to resolve + * @param allowNoDatafeeds if {@code false}, an error is thrown when no name matches the {@code expression}. + * This only applies to wild card expressions, if {@code expression} is not a + * wildcard then setting this true will not suppress the exception + * @param listener The expanded datafeed IDs listener + */ + public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ActionListener<Set<String>> listener) { + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens)); + sourceBuilder.sort(DatafeedConfig.ID.getPreferredName()); + String [] includes = new String[] {DatafeedConfig.ID.getPreferredName()}; + sourceBuilder.fetchSource(includes, null); + + SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.<SearchResponse>wrap( + response -> { + Set<String> datafeedIds = new HashSet<>(); + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + datafeedIds.add((String)hit.getSourceAsMap().get(DatafeedConfig.ID.getPreferredName())); + } + + requiredMatches.filterMatchedIds(datafeedIds); + if (requiredMatches.hasUnmatchedIds()) { + // some required datafeeds were not found + listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString())); + return; + } + + listener.onResponse(datafeedIds); + }, + listener::onFailure) + , client::search); + + } + + /** + * The same logic as {@link #expandDatafeedIds(String, boolean, ActionListener)} but + * the full datafeed configuration is returned. + * + * See {@link #expandDatafeedIds(String, boolean, ActionListener)} + * + * @param expression the expression to resolve + * @param allowNoDatafeeds if {@code false}, an error is thrown when no name matches the {@code expression}. + * This only applies to wild card expressions, if {@code expression} is not a + * wildcard then setting this true will not suppress the exception + * @param listener The expanded datafeed config listener + */ + // NORELEASE datafeed configs should be paged or have a mechanism to return all jobs if there are many of them + public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ActionListener<List<DatafeedConfig.Builder>> listener) { + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens)); + sourceBuilder.sort(DatafeedConfig.ID.getPreferredName()); + + SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.<SearchResponse>wrap( + response -> { + List<DatafeedConfig.Builder> datafeeds = new ArrayList<>(); + Set<String> datafeedIds = new HashSet<>(); + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + try { + BytesReference source = hit.getSourceRef(); + DatafeedConfig.Builder datafeed = parseLenientlyFromSource(source); + datafeeds.add(datafeed); + datafeedIds.add(datafeed.getId()); + } catch (IOException e) { + // TODO A better way to handle this rather than just ignoring the error? + logger.error("Error parsing datafeed configuration [" + hit.getId() + "]", e); + } + } + + requiredMatches.filterMatchedIds(datafeedIds); + if (requiredMatches.hasUnmatchedIds()) { + // some required datafeeds were not found + listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString())); + return; + } + + listener.onResponse(datafeeds); + }, + listener::onFailure) + , client::search); + + } + + private QueryBuilder buildQuery(String [] tokens) { + QueryBuilder jobQuery = new TermQueryBuilder(DatafeedConfig.CONFIG_TYPE.getPreferredName(), DatafeedConfig.TYPE); + if (Strings.isAllOrWildcard(tokens)) { + // match all + return jobQuery; + } + + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + boolQueryBuilder.filter(jobQuery); + BoolQueryBuilder shouldQueries = new BoolQueryBuilder(); + + List<String> terms = new ArrayList<>(); + for (String token : tokens) { + if (Regex.isSimpleMatchPattern(token)) { + shouldQueries.should(new WildcardQueryBuilder(DatafeedConfig.ID.getPreferredName(), token)); + } else { + terms.add(token); + } + } + + if (terms.isEmpty() == false) { + shouldQueries.should(new TermsQueryBuilder(DatafeedConfig.ID.getPreferredName(), terms)); + } + + if (shouldQueries.should().isEmpty() == false) { + boolQueryBuilder.filter(shouldQueries); + } + + return boolQueryBuilder; + } + + private void parseLenientlyFromSource(BytesReference source, ActionListener<DatafeedConfig.Builder> datafeedConfigListener) { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) { + datafeedConfigListener.onResponse(DatafeedConfig.LENIENT_PARSER.apply(parser, null)); + } catch (Exception e) { + datafeedConfigListener.onFailure(e); + } + } + + private DatafeedConfig.Builder parseLenientlyFromSource(BytesReference source) throws IOException { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) { + return DatafeedConfig.LENIENT_PARSER.apply(parser, null); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcher.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcher.java new file mode 100644 index 0000000000000..4f4968a9d5629 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcher.java @@ -0,0 +1,158 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.regex.Regex; + +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Class for tracking the set of Ids returned from some + * function a satisfy the required Ids as defined by an + * expression that may contain wildcards. + * + * For example, given a set of Ids ["foo-1", "foo-2", "bar-1", bar-2"]: + * <ul> + * <li>The expression foo* would be satisfied by foo-1 and foo-2</li> + * <li>The expression bar-1 would be satisfied by bar-1</li> + * <li>The expression bar-1,car-1 would leave car-1 unmatched</li> + * <li>The expression * would be satisfied by anything or nothing depending on the + * value of {@code allowNoMatchForWildcards}</li> + * </ul> + */ +public final class ExpandedIdsMatcher { + + public static String ALL = "_all"; + + /** + * Split {@code expression} into tokens separated by a ',' + * + * @param expression Expression containing zero or more ','s + * @return Array of tokens + */ + public static String [] tokenizeExpression(String expression) { + return Strings.tokenizeToStringArray(expression, ","); + } + + private final LinkedList<IdMatcher> requiredMatches; + + /** + * Generate the list of required matches from the expressions in {@code tokens} + * and initialize. + * + * @param tokens List of expressions that may be wildcards or full Ids + * @param allowNoMatchForWildcards If true then it is not required for wildcard + * expressions to match an Id meaning they are + * not returned in the list of required matches + */ + public ExpandedIdsMatcher(String [] tokens, boolean allowNoMatchForWildcards) { + requiredMatches = new LinkedList<>(); + + if (Strings.isAllOrWildcard(tokens)) { + // if allowNoJobForWildcards == true then any number + // of jobs with any id is ok. Therefore no matches + // are required + + if (allowNoMatchForWildcards == false) { + // require something, anything to match + requiredMatches.add(new WildcardMatcher("*")); + } + return; + } + + if (allowNoMatchForWildcards) { + // matches are not required for wildcards but + // specific job Ids are + for (String token : tokens) { + if (Regex.isSimpleMatchPattern(token) == false) { + requiredMatches.add(new EqualsIdMatcher(token)); + } + } + } else { + // Matches are required for wildcards + for (String token : tokens) { + if (Regex.isSimpleMatchPattern(token)) { + requiredMatches.add(new WildcardMatcher(token)); + } else { + requiredMatches.add(new EqualsIdMatcher(token)); + } + } + } + } + + /** + * For each {@code requiredMatchers} check there is an element + * present in {@code ids} that matches. Once a match is made the + * matcher is removed from {@code requiredMatchers}. + */ + public void filterMatchedIds(Collection<String> ids) { + for (String id: ids) { + Iterator<IdMatcher> itr = requiredMatches.iterator(); + if (itr.hasNext() == false) { + break; + } + while (itr.hasNext()) { + if (itr.next().matches(id)) { + itr.remove(); + } + } + } + } + + public boolean hasUnmatchedIds() { + return requiredMatches.isEmpty() == false; + } + + public List<String> unmatchedIds() { + return requiredMatches.stream().map(IdMatcher::getId).collect(Collectors.toList()); + } + + public String unmatchedIdsString() { + return requiredMatches.stream().map(IdMatcher::getId).collect(Collectors.joining(",")); + } + + + private abstract static class IdMatcher { + protected final String id; + + IdMatcher(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + public abstract boolean matches(String jobId); + } + + private static class EqualsIdMatcher extends IdMatcher { + EqualsIdMatcher(String id) { + super(id); + } + + @Override + public boolean matches(String id) { + return this.id.equals(id); + } + } + + private static class WildcardMatcher extends IdMatcher { + WildcardMatcher(String id) { + super(id); + } + + @Override + public boolean matches(String id) { + return Regex.simpleMatch(this.id, id); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 3166ca33c5bb9..1b89ecb1250ce 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -51,13 +51,9 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.Collection; import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -68,8 +64,6 @@ */ public class JobConfigProvider extends AbstractComponent { - public static String ALL = "_all"; - private final Client client; public JobConfigProvider(Client client, Settings settings) { @@ -189,7 +183,8 @@ public void onResponse(GetResponse getResponse) { try { jobBuilder = parseJobLenientlyFromSource(source); } catch (IOException e) { - updatedJobListener.onFailure(new ElasticsearchParseException("failed to parse " + getResponse.getType(), e)); + updatedJobListener.onFailure( + new ElasticsearchParseException("Failed to parse job configuration [" + jobId + "]", e)); return; } @@ -222,8 +217,6 @@ public void onResponse(GetResponse getResponse) { updatedJobListener.onFailure( new ElasticsearchParseException("Failed to serialise job with id [" + jobId + "]", e)); } - - } @Override @@ -259,7 +252,7 @@ public void onFailure(Exception e) { * @param listener The expanded job IDs listener */ public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener<Set<String>> listener) { - String [] tokens = tokenizeExpression(expression); + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens)); sourceBuilder.sort(Job.ID.getPreferredName()); String [] includes = new String[] {Job.ID.getPreferredName(), Job.GROUPS.getPreferredName()}; @@ -269,7 +262,7 @@ public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder).request(); - LinkedList<IdMatcher> requiredMatches = requiredMatches(tokens, allowNoJobs); + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, ActionListener.<SearchResponse>wrap( @@ -286,11 +279,10 @@ public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener } groupsIds.addAll(jobIds); - filterMatchedIds(requiredMatches, groupsIds); - if (requiredMatches.isEmpty() == false) { + requiredMatches.filterMatchedIds(groupsIds); + if (requiredMatches.hasUnmatchedIds()) { // some required jobs were not found - String missing = requiredMatches.stream().map(IdMatcher::getId).collect(Collectors.joining(",")); - listener.onFailure(ExceptionsHelper.missingJobException(missing)); + listener.onFailure(ExceptionsHelper.missingJobException(requiredMatches.unmatchedIdsString())); return; } @@ -315,7 +307,7 @@ public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener */ // NORELEASE jobs should be paged or have a mechanism to return all jobs if there are many of them public void expandJobs(String expression, boolean allowNoJobs, ActionListener<List<Job.Builder>> listener) { - String [] tokens = tokenizeExpression(expression); + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens)); sourceBuilder.sort(Job.ID.getPreferredName()); @@ -323,7 +315,7 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener<Li .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder).request(); - LinkedList<IdMatcher> requiredMatches = requiredMatches(tokens, allowNoJobs); + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, ActionListener.<SearchResponse>wrap( @@ -345,11 +337,10 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener<Li } } - filterMatchedIds(requiredMatches, jobAndGroupIds); - if (requiredMatches.isEmpty() == false) { + requiredMatches.filterMatchedIds(jobAndGroupIds); + if (requiredMatches.hasUnmatchedIds()) { // some required jobs were not found - String missing = requiredMatches.stream().map(IdMatcher::getId).collect(Collectors.joining(",")); - listener.onFailure(ExceptionsHelper.missingJobException(missing)); + listener.onFailure(ExceptionsHelper.missingJobException(requiredMatches.unmatchedIdsString())); return; } @@ -380,7 +371,7 @@ private Job.Builder parseJobLenientlyFromSource(BytesReference source) throws IO private QueryBuilder buildQuery(String [] tokens) { QueryBuilder jobQuery = new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE); - if (isWildcardAll(tokens)) { + if (Strings.isAllOrWildcard(tokens)) { // match all return jobQuery; } @@ -410,132 +401,4 @@ private QueryBuilder buildQuery(String [] tokens) { return boolQueryBuilder; } - - /** - * Does the {@code tokens} array resolves to a wildcard all expression. - * True if {@code tokens} is empty or if it contains a single element - * equal to {@link #ALL}, '*' or an empty string - * - * @param tokens Expression tokens - * @return True if tokens resolves to a wildcard all expression - */ - static boolean isWildcardAll(String [] tokens) { - if (tokens.length == 0) { - return true; - } - return tokens.length == 1 && (ALL.equals(tokens[0]) || Regex.isMatchAllPattern(tokens[0]) || tokens[0].isEmpty()); - } - - static String [] tokenizeExpression(String expression) { - return Strings.tokenizeToStringArray(expression, ","); - } - - /** - * Generate the list of required matches from the expressions in {@code tokens} - * - * @param tokens List of expressions that may be wildcards or full Ids - * @param allowNoJobForWildcards If true then it is not required for wildcard - * expressions to match an Id meaning they are - * not returned in the list of required matches - * @return A list of required Id matchers - */ - static LinkedList<IdMatcher> requiredMatches(String [] tokens, boolean allowNoJobForWildcards) { - LinkedList<IdMatcher> matchers = new LinkedList<>(); - - if (isWildcardAll(tokens)) { - // if allowNoJobForWildcards == true then any number - // of jobs with any id is ok. Therefore no matches - // are required - - if (allowNoJobForWildcards == false) { - // require something, anything to match - matchers.add(new WildcardMatcher("*")); - } - return matchers; - } - - if (allowNoJobForWildcards) { - // matches are not required for wildcards but - // specific job Ids are - for (String token : tokens) { - if (Regex.isSimpleMatchPattern(token) == false) { - matchers.add(new EqualsIdMatcher(token)); - } - } - } else { - // Matches are required for wildcards - for (String token : tokens) { - if (Regex.isSimpleMatchPattern(token)) { - matchers.add(new WildcardMatcher(token)); - } else { - matchers.add(new EqualsIdMatcher(token)); - } - } - } - - return matchers; - } - - /** - * For each given {@code requiredMatchers} check there is an element - * present in {@code ids} that matches. Once a match is made the - * matcher is popped from {@code requiredMatchers}. - * - * If all matchers are satisfied the list {@code requiredMatchers} will - * be empty after the call otherwise only the unmatched remain. - * - * @param requiredMatchers This is modified by the function: all matched matchers - * are removed from the list. At the end of the call only - * the unmatched ones are in this list - * @param ids Ids required to be matched - */ - static void filterMatchedIds(LinkedList<IdMatcher> requiredMatchers, Collection<String> ids) { - for (String id: ids) { - Iterator<IdMatcher> itr = requiredMatchers.iterator(); - if (itr.hasNext() == false) { - break; - } - while (itr.hasNext()) { - if (itr.next().matches(id)) { - itr.remove(); - } - } - } - } - - abstract static class IdMatcher { - protected final String id; - - IdMatcher(String id) { - this.id = id; - } - - public String getId() { - return id; - } - - public abstract boolean matches(String jobId); - } - - static class EqualsIdMatcher extends IdMatcher { - EqualsIdMatcher(String id) { - super(id); - } - - @Override - public boolean matches(String id) { - return this.id.equals(id); - } - } - - static class WildcardMatcher extends IdMatcher { - WildcardMatcher(String id) { - super(id); - } - - @Override - public boolean matches(String id) { - return Regex.simpleMatch(this.id, id); - } - } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java index 0668b29c626ed..181636de13663 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlSingleNodeTestCase.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -15,6 +16,9 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField; import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; /** * An extension to {@link ESSingleNodeTestCase} that adds node settings specifically needed for ML test cases. @@ -51,4 +55,32 @@ protected void waitForMlTemplates() throws Exception { }); } + protected <T> void blockingCall(Consumer<ActionListener<T>> function, AtomicReference<T> response, + AtomicReference<Exception> error) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + ActionListener<T> listener = ActionListener.wrap( + r -> { + response.set(r); + latch.countDown(); + }, + e -> { + error.set(e); + latch.countDown(); + } + ); + + function.accept(listener); + latch.await(); + } + + protected <T> T blockingCall(Consumer<ActionListener<T>> function) throws Exception { + AtomicReference<Exception> exceptionHolder = new AtomicReference<>(); + AtomicReference<T> responseHolder = new AtomicReference<>(); + blockingCall(function, responseHolder, exceptionHolder); + if (exceptionHolder.get() != null) { + assertNotNull(exceptionHolder.get().getMessage(), exceptionHolder.get()); + } + return responseHolder.get(); + } + } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java new file mode 100644 index 0000000000000..8eeeb2908cf88 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java @@ -0,0 +1,253 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.hamcrest.core.IsInstanceOf; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; + +public class DatafeedConfigProviderIT extends MlSingleNodeTestCase { + + private DatafeedConfigProvider datafeedConfigProvider; + + @Before + public void createComponents() throws Exception { + datafeedConfigProvider = new DatafeedConfigProvider(client(), Settings.EMPTY, xContentRegistry()); + waitForMlTemplates(); + } + + public void testCrud() throws InterruptedException { + String datafeedId = "df1"; + + AtomicReference<IndexResponse> indexResponseHolder = new AtomicReference<>(); + AtomicReference<Exception> exceptionHolder = new AtomicReference<>(); + + // Create datafeed config + DatafeedConfig config = createDatafeedConfig(datafeedId, "j1"); + blockingCall(actionListener -> datafeedConfigProvider.putDatafeedConfig(config, actionListener), + indexResponseHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertEquals(RestStatus.CREATED, indexResponseHolder.get().status()); + + // Read datafeed config + AtomicReference<DatafeedConfig.Builder> configBuilderHolder = new AtomicReference<>(); + blockingCall(actionListener -> datafeedConfigProvider.getDatafeedConfig(datafeedId, actionListener), + configBuilderHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertEquals(config, configBuilderHolder.get().build()); + + // Update + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedId); + List<String> updateIndices = Collections.singletonList("a-different-index"); + update.setIndices(updateIndices); + Map<String, String> updateHeaders = new HashMap<>(); + // Only security headers are updated, grab the first one + String securityHeader = ClientHelper.SECURITY_HEADER_FILTERS.iterator().next(); + updateHeaders.put(securityHeader, "CHANGED"); + + AtomicReference<DatafeedConfig> configHolder = new AtomicReference<>(); + blockingCall(actionListener -> + datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), updateHeaders, actionListener), + configHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertThat(configHolder.get().getIndices(), equalTo(updateIndices)); + assertThat(configHolder.get().getHeaders().get(securityHeader), equalTo("CHANGED")); + + // Delete + AtomicReference<DeleteResponse> deleteResponseHolder = new AtomicReference<>(); + blockingCall(actionListener -> datafeedConfigProvider.deleteDatafeedConfig(datafeedId, actionListener), + deleteResponseHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertEquals(DocWriteResponse.Result.DELETED, deleteResponseHolder.get().getResult()); + } + + public void testMultipleCreateAndDeletes() throws InterruptedException { + String datafeedId = "df2"; + + AtomicReference<IndexResponse> indexResponseHolder = new AtomicReference<>(); + AtomicReference<Exception> exceptionHolder = new AtomicReference<>(); + + // Create datafeed config + DatafeedConfig config = createDatafeedConfig(datafeedId, "j1"); + blockingCall(actionListener -> datafeedConfigProvider.putDatafeedConfig(config, actionListener), + indexResponseHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertEquals(RestStatus.CREATED, indexResponseHolder.get().status()); + + // cannot create another with the same id + indexResponseHolder.set(null); + blockingCall(actionListener -> datafeedConfigProvider.putDatafeedConfig(config, actionListener), + indexResponseHolder, exceptionHolder); + assertNull(indexResponseHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(VersionConflictEngineException.class)); + + // delete + exceptionHolder.set(null); + AtomicReference<DeleteResponse> deleteResponseHolder = new AtomicReference<>(); + blockingCall(actionListener -> datafeedConfigProvider.deleteDatafeedConfig(datafeedId, actionListener), + deleteResponseHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertEquals(DocWriteResponse.Result.DELETED, deleteResponseHolder.get().getResult()); + + // error deleting twice + deleteResponseHolder.set(null); + blockingCall(actionListener -> datafeedConfigProvider.deleteDatafeedConfig(datafeedId, actionListener), + deleteResponseHolder, exceptionHolder); + assertNull(deleteResponseHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ResourceNotFoundException.class)); + } + + public void testUpdateWithAValidationError() throws Exception { + final String datafeedId = "df-bad-update"; + + DatafeedConfig config = createDatafeedConfig(datafeedId, "j2"); + putDatafeedConfig(config); + + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedId); + update.setId("wrong-datafeed-id"); + + AtomicReference<Exception> exceptionHolder = new AtomicReference<>(); + AtomicReference<DatafeedConfig> configHolder = new AtomicReference<>(); + blockingCall(actionListener -> + datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(), actionListener), + configHolder, exceptionHolder); + assertNull(configHolder.get()); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), IsInstanceOf.instanceOf(IllegalArgumentException.class)); + assertThat(exceptionHolder.get().getMessage(), containsString("Cannot apply update to datafeedConfig with different id")); + } + + public void testAllowNoDatafeeds() throws InterruptedException { + AtomicReference<Set<String>> datafeedIdsHolder = new AtomicReference<>(); + AtomicReference<Exception> exceptionHolder = new AtomicReference<>(); + + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", false, actionListener), + datafeedIdsHolder, exceptionHolder); + + assertNull(datafeedIdsHolder.get()); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), IsInstanceOf.instanceOf(ResourceNotFoundException.class)); + assertThat(exceptionHolder.get().getMessage(), containsString("No datafeed with id [*] exists")); + + exceptionHolder.set(null); + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", true, actionListener), + datafeedIdsHolder, exceptionHolder); + assertNotNull(datafeedIdsHolder.get()); + assertNull(exceptionHolder.get()); + + AtomicReference<List<DatafeedConfig.Builder>> datafeedsHolder = new AtomicReference<>(); + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", false, actionListener), + datafeedsHolder, exceptionHolder); + + assertNull(datafeedsHolder.get()); + assertNotNull(exceptionHolder.get()); + assertThat(exceptionHolder.get(), IsInstanceOf.instanceOf(ResourceNotFoundException.class)); + assertThat(exceptionHolder.get().getMessage(), containsString("No datafeed with id [*] exists")); + + exceptionHolder.set(null); + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, actionListener), + datafeedsHolder, exceptionHolder); + assertNotNull(datafeedsHolder.get()); + assertNull(exceptionHolder.get()); + } + + public void testExpandDatafeeds() throws Exception { + DatafeedConfig foo1 = putDatafeedConfig(createDatafeedConfig("foo-1", "j1")); + DatafeedConfig foo2 = putDatafeedConfig(createDatafeedConfig("foo-2", "j2")); + DatafeedConfig bar1 = putDatafeedConfig(createDatafeedConfig("bar-1", "j3")); + DatafeedConfig bar2 = putDatafeedConfig(createDatafeedConfig("bar-2", "j4")); + putDatafeedConfig(createDatafeedConfig("not-used", "j5")); + + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + // Test job IDs only + Set<String> expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds); + + expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("*-1", true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1")), expandedIds); + + expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar*", true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("bar-1", "bar-2")), expandedIds); + + expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("b*r-1", true, actionListener)); + assertEquals(new TreeSet<>(Collections.singletonList("bar-1")), expandedIds); + + expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar-1,foo*", true, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1", "foo-2")), expandedIds); + + // Test full job config + List<DatafeedConfig.Builder> expandedDatafeedBuilders = + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("foo*", true, actionListener)); + List<DatafeedConfig> expandedDatafeeds = + expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList()); + assertThat(expandedDatafeeds, containsInAnyOrder(foo1, foo2)); + + expandedDatafeedBuilders = + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*-1", true, actionListener)); + expandedDatafeeds = expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList()); + assertThat(expandedDatafeeds, containsInAnyOrder(foo1, bar1)); + + expandedDatafeedBuilders = + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("bar*", true, actionListener)); + expandedDatafeeds = expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList()); + assertThat(expandedDatafeeds, containsInAnyOrder(bar1, bar2)); + + expandedDatafeedBuilders = + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("b*r-1", true, actionListener)); + expandedDatafeeds = expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList()); + assertThat(expandedDatafeeds, containsInAnyOrder(bar1)); + + expandedDatafeedBuilders = + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("bar-1,foo*", true, actionListener)); + expandedDatafeeds = expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList()); + assertThat(expandedDatafeeds, containsInAnyOrder(bar1, foo1, foo2)); + } + + private DatafeedConfig createDatafeedConfig(String id, String jobId) { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder(id, jobId); + builder.setIndices(Collections.singletonList("beats*")); + + Map<String, String> headers = new HashMap<>(); + // Only security headers are updated, grab the first one + String securityHeader = ClientHelper.SECURITY_HEADER_FILTERS.iterator().next(); + headers.put(securityHeader, "SECURITY_"); + builder.setHeaders(headers); + return builder.build(); + } + + private DatafeedConfig putDatafeedConfig(DatafeedConfig config) throws Exception { + this.<IndexResponse>blockingCall(actionListener -> datafeedConfigProvider.putDatafeedConfig(config, actionListener)); + return config; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index fb82b1c74d0eb..d85d8e1d8cbcd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -7,7 +7,6 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; @@ -32,9 +31,7 @@ import java.util.List; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -315,34 +312,6 @@ private Job.Builder createJob(String jobId, List<String> groups) { return builder; } - private <T> void blockingCall(Consumer<ActionListener<T>> function, AtomicReference<T> response, - AtomicReference<Exception> error) throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - ActionListener<T> listener = ActionListener.wrap( - r -> { - response.set(r); - latch.countDown(); - }, - e -> { - error.set(e); - latch.countDown(); - } - ); - - function.accept(listener); - latch.await(); - } - - private <T> T blockingCall(Consumer<ActionListener<T>> function) throws Exception { - AtomicReference<Exception> exceptionHolder = new AtomicReference<>(); - AtomicReference<T> responseHolder = new AtomicReference<>(); - blockingCall(function, responseHolder, exceptionHolder); - if (exceptionHolder.get() != null) { - assertNotNull(exceptionHolder.get().getMessage(), exceptionHolder.get()); - } - return responseHolder.get(); - } - private Job putJob(Job.Builder job) throws Exception { Job builtJob = job.build(new Date()); this.<IndexResponse>blockingCall(actionListener -> jobConfigProvider.putJob(builtJob, actionListener)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index 09651f554d848..1bf43a0dc72c0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; @@ -500,7 +501,7 @@ private void indexScheduledEvents(List<ScheduledEvent> events) throws IOExceptio for (ScheduledEvent event : events) { IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true")); + ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true")); indexRequest.source(event.toXContent(builder, params)); bulkRequest.add(indexRequest); } @@ -543,7 +544,7 @@ private void indexFilters(List<MlFilter> filters) throws IOException { for (MlFilter filter : filters) { IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId()); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true")); + ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true")); indexRequest.source(filter.toXContent(builder, params)); bulkRequest.add(indexRequest); } @@ -573,7 +574,7 @@ private void indexCalendars(List<Calendar> calendars) throws IOException { for (Calendar calendar: calendars) { IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, calendar.documentId()); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true")); + ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true")); indexRequest.source(calendar.toXContent(builder, params)); bulkRequest.add(indexRequest); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcherTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcherTests.java new file mode 100644 index 0000000000000..4a9a696866e43 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcherTests.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence; + +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.Collections; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.isOneOf; + +public class ExpandedIdsMatcherTests extends ESTestCase { + + public void testMatchingJobIds() { + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(new String[] {"*"}, false); + assertThat(requiredMatches.unmatchedIds(), hasSize(1)); + assertTrue(requiredMatches.hasUnmatchedIds()); + requiredMatches.filterMatchedIds(Collections.singletonList("foo")); + assertFalse(requiredMatches.hasUnmatchedIds()); + assertThat(requiredMatches.unmatchedIds(), empty()); + + requiredMatches = new ExpandedIdsMatcher(ExpandedIdsMatcher.tokenizeExpression(""), false); + assertThat(requiredMatches.unmatchedIds(), hasSize(1)); + requiredMatches.filterMatchedIds(Collections.singletonList("foo")); + assertThat(requiredMatches.unmatchedIds(), empty()); + + requiredMatches = new ExpandedIdsMatcher(ExpandedIdsMatcher.tokenizeExpression(null), false); + assertThat(requiredMatches.unmatchedIds(), hasSize(1)); + requiredMatches.filterMatchedIds(Collections.singletonList("foo")); + assertThat(requiredMatches.unmatchedIds(), empty()); + + requiredMatches = new ExpandedIdsMatcher(ExpandedIdsMatcher.tokenizeExpression(null), false); + assertThat(requiredMatches.unmatchedIds(), hasSize(1)); + requiredMatches.filterMatchedIds(Collections.emptyList()); + assertThat(requiredMatches.unmatchedIds(), hasSize(1)); + assertThat(requiredMatches.unmatchedIds().get(0), equalTo("*")); + + requiredMatches = new ExpandedIdsMatcher(ExpandedIdsMatcher.tokenizeExpression("_all"), false); + assertThat(requiredMatches.unmatchedIds(), hasSize(1)); + requiredMatches.filterMatchedIds(Collections.singletonList("foo")); + assertThat(requiredMatches.unmatchedIds(), empty()); + + requiredMatches = new ExpandedIdsMatcher(new String[] {"foo*"}, false); + assertThat(requiredMatches.unmatchedIds(), hasSize(1)); + requiredMatches.filterMatchedIds(Arrays.asList("foo1","foo2")); + assertThat(requiredMatches.unmatchedIds(), empty()); + + requiredMatches = new ExpandedIdsMatcher(new String[] {"foo*","bar"}, false); + assertThat(requiredMatches.unmatchedIds(), hasSize(2)); + requiredMatches.filterMatchedIds(Arrays.asList("foo1","foo2")); + assertThat(requiredMatches.unmatchedIds(), hasSize(1)); + assertEquals("bar", requiredMatches.unmatchedIds().get(0)); + + requiredMatches = new ExpandedIdsMatcher(new String[] {"foo*","bar"}, false); + assertThat(requiredMatches.unmatchedIds(), hasSize(2)); + requiredMatches.filterMatchedIds(Arrays.asList("foo1","bar")); + assertFalse(requiredMatches.hasUnmatchedIds()); + + requiredMatches = new ExpandedIdsMatcher(new String[] {"foo*","bar"}, false); + assertThat(requiredMatches.unmatchedIds(), hasSize(2)); + requiredMatches.filterMatchedIds(Collections.singletonList("bar")); + assertThat(requiredMatches.unmatchedIds(), hasSize(1)); + assertEquals("foo*", requiredMatches.unmatchedIds().get(0)); + + requiredMatches = new ExpandedIdsMatcher(ExpandedIdsMatcher.tokenizeExpression("foo,bar,baz,wild*"), false); + assertThat(requiredMatches.unmatchedIds(), hasSize(4)); + requiredMatches.filterMatchedIds(Arrays.asList("foo","baz")); + assertThat(requiredMatches.unmatchedIds(), hasSize(2)); + assertThat(requiredMatches.unmatchedIds().get(0), isOneOf("bar", "wild*")); + assertThat(requiredMatches.unmatchedIds().get(1), isOneOf("bar", "wild*")); + } + + public void testMatchingJobIds_allowNoJobs() { + // wildcard all with allow no jobs + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(new String[] {"*"}, true); + assertThat(requiredMatches.unmatchedIds(), empty()); + assertFalse(requiredMatches.hasUnmatchedIds()); + requiredMatches.filterMatchedIds(Collections.emptyList()); + assertThat(requiredMatches.unmatchedIds(), empty()); + assertFalse(requiredMatches.hasUnmatchedIds()); + + requiredMatches = new ExpandedIdsMatcher(new String[] {"foo*","bar"}, true); + assertThat(requiredMatches.unmatchedIds(), hasSize(1)); + assertTrue(requiredMatches.hasUnmatchedIds()); + requiredMatches.filterMatchedIds(Collections.singletonList("bar")); + assertThat(requiredMatches.unmatchedIds(), empty()); + assertFalse(requiredMatches.hasUnmatchedIds()); + + requiredMatches = new ExpandedIdsMatcher(new String[] {"foo*","bar"}, true); + assertThat(requiredMatches.unmatchedIds(), hasSize(1)); + requiredMatches.filterMatchedIds(Collections.emptyList()); + assertThat(requiredMatches.unmatchedIds(), hasSize(1)); + assertEquals("bar", requiredMatches.unmatchedIds().get(0)); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProviderTests.java deleted file mode 100644 index 04bcd57e64fc0..0000000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProviderTests.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.job.persistence; - -import org.elasticsearch.test.ESTestCase; - -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedList; - -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.isOneOf; - -public class JobConfigProviderTests extends ESTestCase { - - public void testMatchingJobIds() { - LinkedList<JobConfigProvider.IdMatcher> requiredMatches = JobConfigProvider.requiredMatches(new String[] {"*"}, false); - assertThat(requiredMatches, hasSize(1)); - JobConfigProvider.filterMatchedIds(requiredMatches, Collections.singletonList("foo")); - assertThat(requiredMatches, empty()); - - requiredMatches = JobConfigProvider.requiredMatches(JobConfigProvider.tokenizeExpression(""), false); - assertThat(requiredMatches, hasSize(1)); - JobConfigProvider.filterMatchedIds(requiredMatches, Collections.singletonList("foo")); - assertThat(requiredMatches, empty()); - - requiredMatches = JobConfigProvider.requiredMatches(JobConfigProvider.tokenizeExpression(null), false); - assertThat(requiredMatches, hasSize(1)); - JobConfigProvider.filterMatchedIds(requiredMatches, Collections.singletonList("foo")); - assertThat(requiredMatches, empty()); - - requiredMatches = JobConfigProvider.requiredMatches(JobConfigProvider.tokenizeExpression(null), false); - assertThat(requiredMatches, hasSize(1)); - JobConfigProvider.filterMatchedIds(requiredMatches, Collections.emptyList()); - assertThat(requiredMatches, hasSize(1)); - assertThat(requiredMatches.get(0).getId(), equalTo("*")); - - requiredMatches = JobConfigProvider.requiredMatches(JobConfigProvider.tokenizeExpression("_all"), false); - assertThat(requiredMatches, hasSize(1)); - JobConfigProvider.filterMatchedIds(requiredMatches, Collections.singletonList("foo")); - assertThat(requiredMatches, empty()); - - requiredMatches = JobConfigProvider.requiredMatches(new String[] {"foo*"}, false); - assertThat(requiredMatches, hasSize(1)); - JobConfigProvider.filterMatchedIds(requiredMatches, Arrays.asList("foo1","foo2")); - assertThat(requiredMatches, empty()); - - requiredMatches = JobConfigProvider.requiredMatches(new String[] {"foo*","bar"}, false); - assertThat(requiredMatches, hasSize(2)); - JobConfigProvider.filterMatchedIds(requiredMatches, Arrays.asList("foo1","foo2")); - assertThat(requiredMatches, hasSize(1)); - assertEquals("bar", requiredMatches.get(0).getId()); - - requiredMatches = JobConfigProvider.requiredMatches(new String[] {"foo*","bar"}, false); - assertThat(requiredMatches, hasSize(2)); - JobConfigProvider.filterMatchedIds(requiredMatches, Arrays.asList("foo1","bar")); - assertThat(requiredMatches, empty()); - - requiredMatches = JobConfigProvider.requiredMatches(new String[] {"foo*","bar"}, false); - assertThat(requiredMatches, hasSize(2)); - JobConfigProvider.filterMatchedIds(requiredMatches, Collections.singletonList("bar")); - assertThat(requiredMatches, hasSize(1)); - assertEquals("foo*", requiredMatches.get(0).getId()); - - requiredMatches = JobConfigProvider.requiredMatches(JobConfigProvider.tokenizeExpression("foo,bar,baz,wild*"), false); - assertThat(requiredMatches, hasSize(4)); - JobConfigProvider.filterMatchedIds(requiredMatches, Arrays.asList("foo","baz")); - assertThat(requiredMatches, hasSize(2)); - assertThat(requiredMatches.get(0).getId(), isOneOf("bar", "wild*")); - assertThat(requiredMatches.get(1).getId(), isOneOf("bar", "wild*")); - } - - public void testMatchingJobIds_allowNoJobs() { - // wildcard all with allow no jobs - LinkedList<JobConfigProvider.IdMatcher> requiredMatches = JobConfigProvider.requiredMatches(new String[] {"*"}, true); - assertThat(requiredMatches, empty()); - JobConfigProvider.filterMatchedIds(requiredMatches, Collections.emptyList()); - assertThat(requiredMatches, empty()); - - requiredMatches = JobConfigProvider.requiredMatches(new String[] {"foo*","bar"}, true); - assertThat(requiredMatches, hasSize(1)); - JobConfigProvider.filterMatchedIds(requiredMatches, Collections.singletonList("bar")); - assertThat(requiredMatches, empty()); - - requiredMatches = JobConfigProvider.requiredMatches(new String[] {"foo*","bar"}, true); - assertThat(requiredMatches, hasSize(1)); - JobConfigProvider.filterMatchedIds(requiredMatches, Collections.emptyList()); - assertThat(requiredMatches, hasSize(1)); - assertEquals("bar", requiredMatches.get(0).getId()); - } -}