From eac38e9847ae3cc89b89acdffade306fe78dda71 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 27 Feb 2020 13:43:25 -0500 Subject: [PATCH] [ML] Add indices_options to datafeed config and update (#52793) (#52905) This adds a new configurable field called `indices_options`. This allows users to create or update the indices_options used when a datafeed reads from an index. This is necessary for the following use cases: - Reading from frozen indices - Allowing certain indices in multiple index patterns to not exist yet These index options are available on datafeed creation and update. Users may specify them as URL parameters or within the configuration object. closes https://github.com/elastic/elasticsearch/issues/48056 --- .../client/ml/datafeed/DatafeedConfig.java | 32 ++- .../client/ml/datafeed/DatafeedUpdate.java | 31 ++- .../ml/datafeed/DatafeedConfigTests.java | 8 + .../ml/datafeed/DatafeedUpdateTests.java | 8 + .../apis/put-datafeed.asciidoc | 5 + .../apis/update-datafeed.asciidoc | 3 + docs/reference/ml/ml-shared.asciidoc | 13 ++ .../core/ml/action/PutDatafeedAction.java | 6 +- .../core/ml/action/StartDatafeedAction.java | 31 ++- .../core/ml/action/UpdateDatafeedAction.java | 7 +- .../core/ml/datafeed/DatafeedConfig.java | 47 ++++- .../core/ml/datafeed/DatafeedUpdate.java | 52 ++++- .../ml/job/results/ReservedFieldNames.java | 1 + .../xpack/core/ml/config_index_mappings.json | 4 + .../action/PutDatafeedActionRequestTests.java | 8 +- .../UpdateDatafeedActionRequestTests.java | 2 +- .../core/ml/datafeed/DatafeedConfigTests.java | 31 ++- .../core/ml/datafeed/DatafeedUpdateTests.java | 40 +++- .../ml/integration/DatafeedJobsRestIT.java | 76 +++++++ .../action/TransportStartDatafeedAction.java | 10 +- .../ml/datafeed/DatafeedNodeSelector.java | 26 ++- .../DatafeedDelayedDataDetector.java | 8 +- .../DelayedDataDetectorFactory.java | 1 + .../extractor/DataExtractorFactory.java | 2 +- .../aggregation/AggregationDataExtractor.java | 1 + .../AggregationDataExtractorContext.java | 5 +- .../AggregationDataExtractorFactory.java | 3 +- .../aggregation/RollupDataExtractor.java | 4 +- .../RollupDataExtractorFactory.java | 3 +- .../chunked/ChunkedDataExtractor.java | 5 +- .../chunked/ChunkedDataExtractorContext.java | 5 +- .../chunked/ChunkedDataExtractorFactory.java | 3 +- .../extractor/scroll/ScrollDataExtractor.java | 1 + .../scroll/ScrollDataExtractorContext.java | 5 +- .../scroll/ScrollDataExtractorFactory.java | 13 +- .../rest/datafeeds/RestPutDatafeedAction.java | 5 +- .../datafeeds/RestUpdateDatafeedAction.java | 11 +- .../datafeed/DatafeedNodeSelectorTests.java | 189 ++++++++++++++---- .../AggregationDataExtractorTests.java | 3 +- .../chunked/ChunkedDataExtractorTests.java | 3 +- .../scroll/ScrollDataExtractorTests.java | 5 +- .../rest-api-spec/api/ml.put_datafeed.json | 25 +++ .../rest-api-spec/api/ml.update_datafeed.json | 25 +++ .../rest-api-spec/test/ml/datafeeds_crud.yml | 75 +++++++ .../test/old_cluster/40_ml_datafeed_crud.yml | 1 - .../upgraded_cluster/40_ml_datafeed_crud.yml | 1 - 46 files changed, 732 insertions(+), 111 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java index f192b420eba0e..0b53c2e185268 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.client.ml.datafeed; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.ml.job.config.Job; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesArray; @@ -63,6 +64,7 @@ public class DatafeedConfig implements ToXContentObject { public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config"); public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config"); public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches"); + public static final ParseField INDICES_OPTIONS = new ParseField("indices_options"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "datafeed_config", true, a -> new Builder((String)a[0], (String)a[1])); @@ -90,6 +92,9 @@ public class DatafeedConfig implements ToXContentObject { PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG); PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DELAYED_DATA_CHECK_CONFIG); PARSER.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES); + PARSER.declareObject(Builder::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), new IndicesOptions(IndicesOptions.Option.NONE, IndicesOptions.WildcardStates.NONE)), + INDICES_OPTIONS); } private static BytesReference parseBytes(XContentParser parser) throws IOException { @@ -110,11 +115,12 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti private final ChunkingConfig chunkingConfig; private final DelayedDataCheckConfig delayedDataCheckConfig; private final Integer maxEmptySearches; + private final IndicesOptions indicesOptions; private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, BytesReference query, BytesReference aggregations, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig, - Integer maxEmptySearches) { + Integer maxEmptySearches, IndicesOptions indicesOptions) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -127,6 +133,7 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue this.chunkingConfig = chunkingConfig; this.delayedDataCheckConfig = delayedDataCheckConfig; this.maxEmptySearches = maxEmptySearches; + this.indicesOptions = indicesOptions; } public String getId() { @@ -177,6 +184,10 @@ public Integer getMaxEmptySearches() { return maxEmptySearches; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -216,6 +227,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (maxEmptySearches != null) { builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches); } + if (indicesOptions != null) { + builder.startObject(INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + } builder.endObject(); return builder; @@ -257,7 +273,8 @@ public boolean equals(Object other) { && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig) && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) - && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) + && Objects.equals(this.indicesOptions, that.indicesOptions); } /** @@ -268,7 +285,7 @@ public boolean equals(Object other) { @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } public static Builder builder(String id, String jobId) { @@ -289,6 +306,7 @@ public static class Builder { private ChunkingConfig chunkingConfig; private DelayedDataCheckConfig delayedDataCheckConfig; private Integer maxEmptySearches; + private IndicesOptions indicesOptions; public Builder(String id, String jobId) { this.id = Objects.requireNonNull(id, ID.getPreferredName()); @@ -308,6 +326,7 @@ public Builder(DatafeedConfig config) { this.chunkingConfig = config.chunkingConfig; this.delayedDataCheckConfig = config.getDelayedDataCheckConfig(); this.maxEmptySearches = config.getMaxEmptySearches(); + this.indicesOptions = config.indicesOptions; } public Builder setIndices(List indices) { @@ -395,9 +414,14 @@ public Builder setMaxEmptySearches(int maxEmptySearches) { return this; } + public Builder setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + public DatafeedConfig build() { return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } private static BytesReference xContentToBytes(ToXContentObject object) throws IOException { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java index e11e2e1d9b375..3acdda3761588 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java @@ -19,6 +19,7 @@ package org.elasticsearch.client.ml.datafeed; import org.elasticsearch.client.ml.job.config.Job; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -80,6 +81,9 @@ public class DatafeedUpdate implements ToXContentObject { DelayedDataCheckConfig.PARSER, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG); PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES); + PARSER.declareObject(Builder::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), new IndicesOptions(IndicesOptions.Option.NONE, IndicesOptions.WildcardStates.NONE)), + DatafeedConfig.INDICES_OPTIONS); } private static BytesReference parseBytes(XContentParser parser) throws IOException { @@ -100,11 +104,12 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti private final ChunkingConfig chunkingConfig; private final DelayedDataCheckConfig delayedDataCheckConfig; private final Integer maxEmptySearches; + private final IndicesOptions indicesOptions; private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, BytesReference query, BytesReference aggregations, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig, - Integer maxEmptySearches) { + Integer maxEmptySearches, IndicesOptions indicesOptions) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -117,6 +122,7 @@ private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue this.chunkingConfig = chunkingConfig; this.delayedDataCheckConfig = delayedDataCheckConfig; this.maxEmptySearches = maxEmptySearches; + this.indicesOptions = indicesOptions; } /** @@ -157,6 +163,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize); addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig); addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches); + if (indicesOptions != null) { + builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + } builder.endObject(); return builder; } @@ -211,6 +222,10 @@ public Integer getMaxEmptySearches() { return maxEmptySearches; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + private static Map asMap(BytesReference bytesReference) { return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2(); } @@ -247,7 +262,8 @@ public boolean equals(Object other) { && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig) - && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) + && Objects.equals(this.indicesOptions, that.indicesOptions); } /** @@ -258,7 +274,7 @@ public boolean equals(Object other) { @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } public static Builder builder(String id) { @@ -279,6 +295,7 @@ public static class Builder { private ChunkingConfig chunkingConfig; private DelayedDataCheckConfig delayedDataCheckConfig; private Integer maxEmptySearches; + private IndicesOptions indicesOptions; public Builder(String id) { this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName()); @@ -297,6 +314,7 @@ public Builder(DatafeedUpdate config) { this.chunkingConfig = config.chunkingConfig; this.delayedDataCheckConfig = config.delayedDataCheckConfig; this.maxEmptySearches = config.maxEmptySearches; + this.indicesOptions = config.indicesOptions; } @Deprecated @@ -381,9 +399,14 @@ public Builder setMaxEmptySearches(int maxEmptySearches) { return this; } + public Builder setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + public DatafeedUpdate build() { return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } private static BytesReference xContentToBytes(ToXContentObject object) throws IOException { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java index 7f7a03ab2e182..16bd25bb49298 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.client.ml.datafeed; import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -109,6 +110,13 @@ public static DatafeedConfig.Builder createRandomBuilder() { if (randomBoolean()) { builder.setMaxEmptySearches(randomIntBetween(10, 100)); } + if (randomBoolean()) { + builder.setIndicesOptions(IndicesOptions.fromOptions(randomBoolean(), + randomBoolean(), + randomBoolean(), + randomBoolean(), + randomBoolean())); + } return builder; } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java index 59b1f4cc50ee3..68d4ef14a2092 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.client.ml.datafeed; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryBuilders; @@ -86,6 +87,13 @@ public static DatafeedUpdate createRandom() { if (randomBoolean()) { builder.setMaxEmptySearches(randomIntBetween(10, 100)); } + if (randomBoolean()) { + builder.setIndicesOptions(IndicesOptions.fromOptions(randomBoolean(), + randomBoolean(), + randomBoolean(), + randomBoolean(), + randomBoolean())); + } return builder.build(); } diff --git a/docs/reference/ml/anomaly-detection/apis/put-datafeed.asciidoc b/docs/reference/ml/anomaly-detection/apis/put-datafeed.asciidoc index c811b8f867334..82f8d82bbfac5 100644 --- a/docs/reference/ml/anomaly-detection/apis/put-datafeed.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/put-datafeed.asciidoc @@ -98,6 +98,11 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=script-fields] (Optional, unsigned integer) include::{docdir}/ml/ml-shared.asciidoc[tag=scroll-size] +`indices_options`:: +(Optional, object) +include::{docdir}/ml/ml-shared.asciidoc[tag=indices-options] + + [[ml-put-datafeed-example]] ==== {api-examples-title} diff --git a/docs/reference/ml/anomaly-detection/apis/update-datafeed.asciidoc b/docs/reference/ml/anomaly-detection/apis/update-datafeed.asciidoc index 80023f5c2256b..33c315890606e 100644 --- a/docs/reference/ml/anomaly-detection/apis/update-datafeed.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/update-datafeed.asciidoc @@ -101,6 +101,9 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=script-fields] (Optional, unsigned integer) include::{docdir}/ml/ml-shared.asciidoc[tag=scroll-size] +`indices_options`:: +(Optional, object) +include::{docdir}/ml/ml-shared.asciidoc[tag=indices-options] [[ml-update-datafeed-example]] ==== {api-examples-title} diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index 964e5c2f12b9e..c44c688f4c3f9 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -665,6 +665,19 @@ not be set to `false` on any {ml} nodes. -- end::indices[] +tag::indices-options[] +Object specifying index expansion options used during search. +For example: +``` +{ + "expand_wildcards": ["all"], + "ignore_unavailable": true, + "allow_no_indices": "false", + "ignore_throttled": true +} +``` +end::indices-options[] + tag::influencers[] A comma separated list of influencer field names. Typically these can be the by, over, or partition fields that are used in the detector configuration. You might diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java index 7680044f6603a..88f56941a6e87 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; @@ -33,8 +34,11 @@ private PutDatafeedAction() { public static class Request extends AcknowledgedRequest implements ToXContentObject { - public static Request parseRequest(String datafeedId, XContentParser parser) { + public static Request parseRequest(String datafeedId, IndicesOptions indicesOptions, XContentParser parser) { DatafeedConfig.Builder datafeed = DatafeedConfig.STRICT_PARSER.apply(parser, null); + if (datafeed.getIndicesOptions() == null) { + datafeed.setIndicesOptions(indicesOptions); + } datafeed.setId(datafeedId); return new Request(datafeed.build()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java index 33102cf93a9e9..d9676a526749b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.ElasticsearchClient; @@ -150,6 +152,9 @@ public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskPar params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); PARSER.declareString(DatafeedParams::setJobId, Job.ID); PARSER.declareStringArray(DatafeedParams::setDatafeedIndices, INDICES); + PARSER.declareObject(DatafeedParams::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS), + DatafeedConfig.INDICES_OPTIONS); } static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) { @@ -193,6 +198,11 @@ public DatafeedParams(StreamInput in) throws IOException { jobId = in.readOptionalString(); datafeedIndices = in.readStringList(); } + if (in.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions = IndicesOptions.readIndicesOptions(in); + } else { + indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + } } DatafeedParams() { @@ -204,6 +214,7 @@ public DatafeedParams(StreamInput in) throws IOException { private TimeValue timeout = TimeValue.timeValueSeconds(20); private List datafeedIndices = Collections.emptyList(); private String jobId; + private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; public String getDatafeedId() { @@ -250,6 +261,15 @@ public void setDatafeedIndices(List datafeedIndices) { this.datafeedIndices = datafeedIndices; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + + public DatafeedParams setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = ExceptionsHelper.requireNonNull(indicesOptions, DatafeedConfig.INDICES_OPTIONS); + return this; + } + @Override public String getWriteableName() { return MlTasks.DATAFEED_TASK_NAME; @@ -270,6 +290,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(jobId); out.writeStringCollection(datafeedIndices); } + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions.writeIndicesOptions(out); + } } @Override @@ -287,13 +310,18 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par if (datafeedIndices.isEmpty() == false) { builder.field(INDICES.getPreferredName(), datafeedIndices); } + + builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + builder.endObject(); return builder; } @Override public int hashCode() { - return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices); + return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices, indicesOptions); } @Override @@ -310,6 +338,7 @@ public boolean equals(Object obj) { Objects.equals(endTime, other.endTime) && Objects.equals(timeout, other.timeout) && Objects.equals(jobId, other.jobId) && + Objects.equals(indicesOptions, other.indicesOptions) && Objects.equals(datafeedIndices, other.datafeedIndices); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java index 5d82a21662648..a5aebc61df227 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedAction.java @@ -7,9 +7,11 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -31,8 +33,11 @@ private UpdateDatafeedAction() { public static class Request extends AcknowledgedRequest implements ToXContentObject { - public static Request parseRequest(String datafeedId, XContentParser parser) { + public static Request parseRequest(String datafeedId, @Nullable IndicesOptions indicesOptions, XContentParser parser) { DatafeedUpdate.Builder update = DatafeedUpdate.PARSER.apply(parser, null); + if (indicesOptions != null) { + update.setIndicesOptions(indicesOptions); + } update.setId(datafeedId); return new Request(update.build()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 9c0f46e0feaaf..b7621ba75815c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -9,6 +9,8 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; @@ -92,6 +94,7 @@ public class DatafeedConfig extends AbstractDiffable implements public static final ParseField HEADERS = new ParseField("headers"); public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config"); public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches"); + public static final ParseField INDICES_OPTIONS = new ParseField("indices_options"); // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly public static final ObjectParser LENIENT_PARSER = createParser(true); @@ -154,6 +157,9 @@ private static ObjectParser createParser(boolean ignoreUnknownFie ignoreUnknownFields ? DelayedDataCheckConfig.LENIENT_PARSER : DelayedDataCheckConfig.STRICT_PARSER, DELAYED_DATA_CHECK_CONFIG); parser.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES); + parser.declareObject(Builder::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS), + INDICES_OPTIONS); return parser; } @@ -179,11 +185,12 @@ private static ObjectParser createParser(boolean ignoreUnknownFie private final Map headers; private final DelayedDataCheckConfig delayedDataCheckConfig; private final Integer maxEmptySearches; + private final IndicesOptions indicesOptions; private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, QueryProvider queryProvider, AggProvider aggProvider, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, Map headers, - DelayedDataCheckConfig delayedDataCheckConfig, Integer maxEmptySearches) { + DelayedDataCheckConfig delayedDataCheckConfig, Integer maxEmptySearches, IndicesOptions indicesOptions) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -197,6 +204,7 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue this.headers = Collections.unmodifiableMap(headers); this.delayedDataCheckConfig = delayedDataCheckConfig; this.maxEmptySearches = maxEmptySearches; + this.indicesOptions = ExceptionsHelper.requireNonNull(indicesOptions, INDICES_OPTIONS); } public DatafeedConfig(StreamInput in) throws IOException { @@ -242,6 +250,11 @@ public DatafeedConfig(StreamInput in) throws IOException { } else { maxEmptySearches = null; } + if (in.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions = IndicesOptions.readIndicesOptions(in); + } else { + indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + } } /** @@ -414,6 +427,10 @@ public Integer getMaxEmptySearches() { return maxEmptySearches; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); @@ -455,6 +472,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_5_0)) { out.writeOptionalVInt(maxEmptySearches); } + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions.writeIndicesOptions(out); + } } @Override @@ -494,6 +514,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (maxEmptySearches != null) { builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches); } + builder.startObject(INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + builder.endObject(); return builder; } @@ -527,13 +551,14 @@ public boolean equals(Object other) { && Objects.equals(this.chunkingConfig, that.chunkingConfig) && Objects.equals(this.headers, that.headers) && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) - && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) + && Objects.equals(this.indicesOptions, that.indicesOptions); } @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig, - headers, delayedDataCheckConfig, maxEmptySearches); + headers, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } @Override @@ -607,6 +632,7 @@ public static class Builder { private Map headers = Collections.emptyMap(); private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig(); private Integer maxEmptySearches; + private IndicesOptions indicesOptions; public Builder() { } @@ -630,6 +656,7 @@ public Builder(DatafeedConfig config) { this.headers = new HashMap<>(config.headers); this.delayedDataCheckConfig = config.getDelayedDataCheckConfig(); this.maxEmptySearches = config.getMaxEmptySearches(); + this.indicesOptions = config.indicesOptions; } public void setId(String datafeedId) { @@ -735,6 +762,15 @@ public void setMaxEmptySearches(int maxEmptySearches) { } } + public Builder setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + + public IndicesOptions getIndicesOptions() { + return this.indicesOptions; + } + public DatafeedConfig build() { ExceptionsHelper.requireNonNull(id, ID.getPreferredName()); ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); @@ -749,8 +785,11 @@ public DatafeedConfig build() { setDefaultChunkingConfig(); setDefaultQueryDelay(); + if (indicesOptions == null) { + indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + } return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize, - chunkingConfig, headers, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, headers, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } void validateScriptFields() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index f9b590730cdd3..2fedc1f127ce0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -7,6 +7,8 @@ import org.apache.logging.log4j.LogManager; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -82,6 +84,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { DelayedDataCheckConfig.STRICT_PARSER, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG); PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES); + PARSER.declareObject(Builder::setIndicesOptions, + (p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS), + DatafeedConfig.INDICES_OPTIONS); } private final String id; @@ -96,12 +101,13 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { private final ChunkingConfig chunkingConfig; private final DelayedDataCheckConfig delayedDataCheckConfig; private final Integer maxEmptySearches; + private final IndicesOptions indicesOptions; private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, QueryProvider queryProvider, AggProvider aggProvider, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig, - Integer maxEmptySearches) { + Integer maxEmptySearches, IndicesOptions indicesOptions) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -114,6 +120,7 @@ private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue this.chunkingConfig = chunkingConfig; this.delayedDataCheckConfig = delayedDataCheckConfig; this.maxEmptySearches = maxEmptySearches; + this.indicesOptions = indicesOptions; } public DatafeedUpdate(StreamInput in) throws IOException { @@ -156,6 +163,11 @@ public DatafeedUpdate(StreamInput in) throws IOException { } else { maxEmptySearches = null; } + if (in.getVersion().onOrAfter(Version.V_7_7_0)) { + indicesOptions = in.readBoolean() ? IndicesOptions.readIndicesOptions(in) : null; + } else { + indicesOptions = null; + } } /** @@ -204,6 +216,14 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_5_0)) { out.writeOptionalInt(maxEmptySearches); } + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + if (indicesOptions != null) { + out.writeBoolean(true); + indicesOptions.writeIndicesOptions(out); + } else { + out.writeBoolean(false); + } + } } @Override @@ -235,7 +255,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig); addOptionalField(builder, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG, delayedDataCheckConfig); addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches); - + if (indicesOptions != null) { + builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName()); + indicesOptions.toXContent(builder, params); + builder.endObject(); + } builder.endObject(); return builder; } @@ -308,6 +332,10 @@ public Integer getMaxEmptySearches() { return maxEmptySearches; } + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + /** * Applies the update to the given {@link DatafeedConfig} * @return a new {@link DatafeedConfig} that contains the update @@ -355,6 +383,9 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, Map h if (maxEmptySearches != null) { builder.setMaxEmptySearches(maxEmptySearches); } + if (indicesOptions != null) { + builder.setIndicesOptions(indicesOptions); + } if (headers.isEmpty() == false) { // Adjust the request, adding security headers from the current thread context @@ -395,13 +426,14 @@ public boolean equals(Object other) { && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig) - && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches) + && Objects.equals(this.indicesOptions, that.indicesOptions); } @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig, - delayedDataCheckConfig, maxEmptySearches); + delayedDataCheckConfig, maxEmptySearches, indicesOptions); } @Override @@ -420,7 +452,8 @@ boolean isNoop(DatafeedConfig datafeed) { && (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig())) && (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig())) && (maxEmptySearches == null || Objects.equals(maxEmptySearches, datafeed.getMaxEmptySearches()) - || (maxEmptySearches == -1 && datafeed.getMaxEmptySearches() == null)); + || (maxEmptySearches == -1 && datafeed.getMaxEmptySearches() == null)) + && (indicesOptions == null || Objects.equals(indicesOptions, datafeed.getIndicesOptions())); } public static class Builder { @@ -437,6 +470,7 @@ public static class Builder { private ChunkingConfig chunkingConfig; private DelayedDataCheckConfig delayedDataCheckConfig; private Integer maxEmptySearches; + private IndicesOptions indicesOptions; public Builder() { } @@ -458,6 +492,7 @@ public Builder(DatafeedUpdate config) { this.chunkingConfig = config.chunkingConfig; this.delayedDataCheckConfig = config.delayedDataCheckConfig; this.maxEmptySearches = config.maxEmptySearches; + this.indicesOptions = config.indicesOptions; } public Builder setId(String datafeedId) { @@ -535,9 +570,14 @@ public Builder setMaxEmptySearches(int maxEmptySearches) { return this; } + public Builder setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + public DatafeedUpdate build() { return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize, - chunkingConfig, delayedDataCheckConfig, maxEmptySearches); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index 890ecbb87e539..c0b915f8632f5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -287,6 +287,7 @@ public final class ReservedFieldNames { DatafeedConfig.CHUNKING_CONFIG.getPreferredName(), DatafeedConfig.HEADERS.getPreferredName(), DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName(), + DatafeedConfig.INDICES_OPTIONS.getPreferredName(), DelayedDataCheckConfig.ENABLED.getPreferredName(), DelayedDataCheckConfig.CHECK_WINDOW.getPreferredName(), diff --git a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json index aa7211cf44836..ec1daf22e35f4 100644 --- a/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json +++ b/x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json @@ -290,6 +290,10 @@ "indices" : { "type" : "keyword" }, + "indices_options": { + "type" : "object", + "enabled" : false + }, "job_id" : { "type" : "keyword" }, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedActionRequestTests.java index dfbc7e7a55a10..633e54a479fc1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDatafeedActionRequestTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.action; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; @@ -13,7 +14,6 @@ import org.elasticsearch.search.SearchModule; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction.Request; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests; import org.junit.Before; @@ -30,9 +30,7 @@ public void setUpDatafeedId() { @Override protected Request createTestInstance() { - DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, randomAlphaOfLength(10)); - datafeedConfig.setIndices(Collections.singletonList(randomAlphaOfLength(10))); - return new Request(datafeedConfig.build()); + return new Request(DatafeedConfigTests.createRandomizedDatafeedConfig(randomAlphaOfLength(10), datafeedId, 3600)); } @Override @@ -47,7 +45,7 @@ protected boolean supportsUnknownFields() { @Override protected Request doParseInstance(XContentParser parser) { - return Request.parseRequest(datafeedId, parser); + return Request.parseRequest(datafeedId, SearchRequest.DEFAULT_INDICES_OPTIONS, parser); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedActionRequestTests.java index 63f695db00e32..512011fb3b064 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedActionRequestTests.java @@ -45,7 +45,7 @@ protected boolean supportsUnknownFields() { @Override protected Request doParseInstance(XContentParser parser) { - return Request.parseRequest(datafeedId, parser); + return Request.parseRequest(datafeedId, null, parser); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index 694e6d6c718b7..6beb2f613859c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -9,6 +9,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; @@ -57,6 +59,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import static org.elasticsearch.xpack.core.ml.utils.QueryProviderTests.createRandomValidQueryProvider; @@ -99,11 +102,15 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId) { } public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long bucketSpanMillis) { - return createRandomizedDatafeedConfigBuilder(jobId, bucketSpanMillis).build(); + return createRandomizedDatafeedConfig(jobId, randomValidDatafeedId(), bucketSpanMillis); } - private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(String jobId, long bucketSpanMillis) { - DatafeedConfig.Builder builder = new DatafeedConfig.Builder(randomValidDatafeedId(), jobId); + public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, String datafeedId, long bucketSpanMillis) { + return createRandomizedDatafeedConfigBuilder(jobId, datafeedId, bucketSpanMillis).build(); + } + + private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(String jobId, String datafeedId, long bucketSpanMillis) { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId); builder.setIndices(randomStringList(1, 10)); if (randomBoolean()) { builder.setQueryProvider(createRandomValidQueryProvider(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))); @@ -154,6 +161,12 @@ private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(Stri if (randomBoolean()) { builder.setMaxEmptySearches(randomIntBetween(10, 100)); } + builder.setIndicesOptions(IndicesOptions.fromParameters( + randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT), + Boolean.toString(randomBoolean()), + Boolean.toString(randomBoolean()), + Boolean.toString(randomBoolean()), + SearchRequest.DEFAULT_INDICES_OPTIONS)); return builder; } @@ -340,7 +353,7 @@ public void testMultipleDefinedAggParse() throws IOException { } public void testToXContentForInternalStorage() throws IOException { - DatafeedConfig.Builder builder = createRandomizedDatafeedConfigBuilder("foo", 300); + DatafeedConfig.Builder builder = createRandomizedDatafeedConfigBuilder("foo", randomValidDatafeedId(), 300); // headers are only persisted to cluster state Map headers = new HashMap<>(); @@ -843,7 +856,7 @@ private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggre @Override protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOException { DatafeedConfig.Builder builder = new DatafeedConfig.Builder(instance); - switch (between(0, 10)) { + switch (between(0, 11)) { case 0: builder.setId(instance.getId() + randomValidDatafeedId()); break; @@ -912,6 +925,14 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOExcept builder.setMaxEmptySearches(instance.getMaxEmptySearches() + 1); } break; + case 11: + builder.setIndicesOptions(IndicesOptions.fromParameters( + randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT), + Boolean.toString(instance.getIndicesOptions().ignoreUnavailable() == false), + Boolean.toString(instance.getIndicesOptions().allowNoIndices() == false), + Boolean.toString(instance.getIndicesOptions().ignoreThrottled() == false), + SearchRequest.DEFAULT_INDICES_OPTIONS)); + break; default: throw new AssertionError("Illegal randomisation branch"); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java index b2856cc8e836f..8f2d3d72227ea 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.core.ml.datafeed; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; @@ -46,6 +48,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Locale; import static org.elasticsearch.xpack.core.ml.datafeed.AggProviderTests.createRandomValidAggProvider; import static org.elasticsearch.xpack.core.ml.utils.QueryProviderTests.createRandomValidQueryProvider; @@ -124,6 +127,14 @@ public static DatafeedUpdate createRandomized(String datafeedId, @Nullable Dataf if (randomBoolean()) { builder.setMaxEmptySearches(randomBoolean() ? -1 : randomIntBetween(10, 100)); } + if (randomBoolean()) { + builder.setIndicesOptions(IndicesOptions.fromParameters( + randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT), + Boolean.toString(randomBoolean()), + Boolean.toString(randomBoolean()), + Boolean.toString(randomBoolean()), + SearchRequest.DEFAULT_INDICES_OPTIONS)); + } return builder.build(); } @@ -271,6 +282,16 @@ public void testApply_givenAggregations() throws IOException { assertThat(updatedDatafeed.getAggregations(), equalTo(aggProvider.getAggs())); } + public void testApply_givenIndicesOptions() { + DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo"); + DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN) + .build() + .apply(datafeed, Collections.emptyMap()); + assertThat(datafeed.getIndicesOptions(), is(not(equalTo(updatedDatafeed.getIndicesOptions())))); + assertThat(updatedDatafeed.getIndicesOptions(), equalTo(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN)); + } + public void testApply_GivenRandomUpdates_AssertImmutability() { for (int i = 0; i < 100; ++i) { DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig(JobTests.randomValidJobId()); @@ -342,7 +363,7 @@ public void testSerializationOfComplexAggsBetweenVersions() throws IOException { @Override protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) throws IOException { DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(instance); - switch (between(0, 10)) { + switch (between(0, 11)) { case 0: builder.setId(instance.getId() + DatafeedConfigTests.randomValidDatafeedId()); break; @@ -423,6 +444,23 @@ protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) throws IOExcept builder.setMaxEmptySearches(instance.getMaxEmptySearches() + 100); } break; + case 11: + if (instance.getIndicesOptions() != null) { + builder.setIndicesOptions(IndicesOptions.fromParameters( + randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT), + Boolean.toString(instance.getIndicesOptions().ignoreUnavailable() == false), + Boolean.toString(instance.getIndicesOptions().allowNoIndices() == false), + Boolean.toString(instance.getIndicesOptions().ignoreThrottled() == false), + SearchRequest.DEFAULT_INDICES_OPTIONS)); + } else { + builder.setIndicesOptions(IndicesOptions.fromParameters( + randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT), + Boolean.toString(randomBoolean()), + Boolean.toString(randomBoolean()), + Boolean.toString(randomBoolean()), + SearchRequest.DEFAULT_INDICES_OPTIONS)); + } + break; default: throw new AssertionError("Illegal randomisation branch"); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java index f5e229d572bcd..b78cdfa953645 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java @@ -389,6 +389,75 @@ public void testLookbackWithGeo() throws Exception { assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); } + public void testLookbackWithIndicesOptions() throws Exception { + String jobId = "test-lookback-only-with-indices-options"; + Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); + createJobRequest.setJsonEntity("{\n" + + " \"description\": \"custom indices options\",\n" + + " \"analysis_config\": {\n" + + " \"bucket_span\": \"15m\",\n" + + " \"detectors\": [\n" + + " {\n" + + " \"function\": \"count\"\n" + + " }\n" + + " ]\n" + + " }," + + " \"data_description\": {\"time_field\": \"time\"}\n" + + "}"); + client().performRequest(createJobRequest); + String datafeedId = jobId + "-datafeed"; + new DatafeedBuilder(datafeedId, jobId, "*hidden-*") + .setIndicesOptions("{" + + "\"expand_wildcards\": [\"all\"]," + + "\"allow_no_indices\": true"+ + "}") + .build(); + + StringBuilder bulk = new StringBuilder(); + + Request createGeoData = new Request("PUT", "/.hidden-index"); + createGeoData.setJsonEntity("{" + + " \"mappings\": {" + + " \"properties\": {" + + " \"time\": { \"type\":\"date\"}," + + " \"value\": { \"type\":\"long\"}" + + " }" + + " }, \"settings\": {\"index.hidden\": true} " + + "}"); + client().performRequest(createGeoData); + + bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 1}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:00:00Z\",\"value\": 1000}\n"); + bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 2}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:05:00Z\",\"value\":1500}\n"); + bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 3}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:10:00Z\",\"value\":1600}\n"); + bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 4}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:15:00Z\",\"value\":100}\n"); + bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 5}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:20:00Z\",\"value\":1}\n"); + bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 6}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:25:00Z\",\"value\":1500}\n"); + bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 7}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:30:00Z\",\"value\":1500}\n"); + bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 8}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:40:00Z\",\"value\":2100}\n"); + bulk.append("{\"index\": {\"_index\": \".hidden-index\", \"_id\": 9}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:41:00Z\",\"value\":0}\n"); + bulkIndex(bulk.toString()); + + openJob(client(), jobId); + + startDatafeedAndWaitUntilStopped(datafeedId); + waitUntilJobIsClosed(jobId); + Response jobStatsResponse = client().performRequest( + new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")); + String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity()); + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":9")); + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":9")); + assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); + } + public void testLookbackOnlyGivenEmptyIndex() throws Exception { new LookbackOnlyTestHelper("test-lookback-only-given-empty-index", "airline-data-empty") .setShouldSucceedInput(false).setShouldSucceedProcessing(false).execute(); @@ -1229,6 +1298,7 @@ private static class DatafeedBuilder { String aggregations; String authHeader = BASIC_AUTH_VALUE_SUPER_USER; String chunkingTimespan; + String indicesOptions; DatafeedBuilder(String datafeedId, String jobId, String index) { this.datafeedId = datafeedId; @@ -1261,6 +1331,11 @@ DatafeedBuilder setChunkingTimespan(String timespan) { return this; } + DatafeedBuilder setIndicesOptions(String indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + Response build() throws IOException { Request request = new Request("PUT", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId); request.setJsonEntity("{" @@ -1268,6 +1343,7 @@ Response build() throws IOException { + (source ? ",\"_source\":true" : "") + (scriptedFields == null ? "" : ",\"script_fields\":" + scriptedFields) + (aggregations == null ? "" : ",\"aggs\":" + aggregations) + + (indicesOptions == null ? "" : ",\"indices_options\":" + indicesOptions) + (chunkingTimespan == null ? "" : ",\"chunking_config\":{\"mode\":\"MANUAL\",\"time_span\":\"" + chunkingTimespan + "\"}") + "}"); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 4e1da422d7c29..34db83aa1fecb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -242,6 +242,7 @@ public void onFailure(Exception e) { DatafeedConfig datafeedConfig = datafeedBuilder.build(); params.setDatafeedIndices(datafeedConfig.getIndices()); params.setJobId(datafeedConfig.getJobId()); + params.setIndicesOptions(datafeedConfig.getIndicesOptions()); datafeedConfigHolder.set(datafeedConfig); jobConfigProvider.getJob(datafeedConfig.getJobId(), jobListener); } catch (Exception e) { @@ -377,12 +378,17 @@ public StartDatafeedPersistentTasksExecutor(DatafeedManager datafeedManager, Ind public PersistentTasksCustomMetaData.Assignment getAssignment(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) { return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getJobId(), - params.getDatafeedIndices()).selectNode(); + params.getDatafeedIndices(), params.getIndicesOptions()).selectNode(); } @Override public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) { - new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getJobId(), params.getDatafeedIndices()) + new DatafeedNodeSelector(clusterState, + resolver, + params.getDatafeedId(), + params.getJobId(), + params.getDatafeedIndices(), + params.getIndicesOptions()) .checkDatafeedTaskCanBeCreated(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index e29fc8ef5c326..03c8272a979d8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -7,6 +7,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; @@ -37,9 +38,10 @@ public class DatafeedNodeSelector { private final PersistentTasksCustomMetaData.PersistentTask jobTask; private final ClusterState clusterState; private final IndexNameExpressionResolver resolver; + private final IndicesOptions indicesOptions; public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId, - String jobId, List datafeedIndices) { + String jobId, List datafeedIndices, IndicesOptions indicesOptions) { PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); this.datafeedId = datafeedId; this.jobId = jobId; @@ -47,6 +49,7 @@ public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolv this.jobTask = MlTasks.getJobTask(jobId, tasks); this.clusterState = Objects.requireNonNull(clusterState); this.resolver = Objects.requireNonNull(resolver); + this.indicesOptions = Objects.requireNonNull(indicesOptions); } public void checkDatafeedTaskCanBeCreated() { @@ -117,25 +120,28 @@ private AssignmentFailure verifyIndicesActive() { } String[] concreteIndices; - String reason = "cannot start datafeed [" + datafeedId + "] because index [" - + index + "] does not exist, is closed, or is still initializing."; try { - concreteIndices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), index); + concreteIndices = resolver.concreteIndexNames(clusterState, indicesOptions, index); if (concreteIndices.length == 0) { - return new AssignmentFailure(reason, true); + return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index [" + + index + "] does not exist, is closed, or is still initializing.", true); } } catch (Exception e) { - LOGGER.debug(reason, e); - return new AssignmentFailure(reason, true); + String msg = new ParameterizedMessage("failed resolving indices given [{}] and indices_options [{}]", + index, + indicesOptions).getFormattedMessage(); + LOGGER.debug("[" + datafeedId + "] " + msg, e); + return new AssignmentFailure( + "cannot start datafeed [" + datafeedId + "] because it " + msg + " with exception [" + e.getMessage() + "]", + true); } for (String concreteIndex : concreteIndices) { IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex); if (routingTable == null || !routingTable.allPrimaryShardsActive()) { - reason = "cannot start datafeed [" + datafeedId + "] because index [" - + concreteIndex + "] does not have all primary shards active yet."; - return new AssignmentFailure(reason, false); + return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index [" + + concreteIndex + "] does not have all primary shards active yet.", false); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java index d2a933ffa466f..0fa4411d839e3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.query.QueryBuilder; @@ -27,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -46,15 +48,17 @@ public class DatafeedDelayedDataDetector implements DelayedDataDetector { private final String jobId; private final QueryBuilder datafeedQuery; private final String[] datafeedIndices; + private final IndicesOptions indicesOptions; DatafeedDelayedDataDetector(long bucketSpan, long window, String jobId, String timeField, QueryBuilder datafeedQuery, - String[] datafeedIndices, Client client) { + String[] datafeedIndices, IndicesOptions indicesOptions, Client client) { this.bucketSpan = bucketSpan; this.window = window; this.jobId = jobId; this.timeField = timeField; this.datafeedQuery = datafeedQuery; this.datafeedIndices = datafeedIndices; + this.indicesOptions = Objects.requireNonNull(indicesOptions); this.client = client; } @@ -115,7 +119,7 @@ private Map checkCurrentBucketEventCount(long start, long end) { .fixedInterval(new DateHistogramInterval(bucketSpan + "ms")).field(timeField)) .query(ExtractorUtils.wrapInTimeRangeQuery(datafeedQuery, timeField, start, end)); - SearchRequest searchRequest = new SearchRequest(datafeedIndices).source(searchSourceBuilder); + SearchRequest searchRequest = new SearchRequest(datafeedIndices).source(searchSourceBuilder).indicesOptions(indicesOptions); try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { SearchResponse response = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); List buckets = ((Histogram)response.getAggregations().get(DATE_BUCKETS)).getBuckets(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java index 88f8e6caadf09..23a5de6baa0c5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java @@ -51,6 +51,7 @@ public static DelayedDataDetector buildDetector(Job job, job.getDataDescription().getTimeField(), datafeedConfig.getParsedQuery(xContentRegistry), datafeedConfig.getIndices().toArray(new String[0]), + datafeedConfig.getIndicesOptions(), client); } else { return new NullDelayedDataDetector(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java index d43ede48d0578..5863a4e05fd27 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java @@ -79,7 +79,7 @@ static void create(Client client, client, ClientHelper.ML_ORIGIN, GetRollupIndexCapsAction.INSTANCE, - new GetRollupIndexCapsAction.Request(datafeed.getIndices().toArray(new String[0])), + new GetRollupIndexCapsAction.Request(datafeed.getIndices().toArray(new String[0]), datafeed.getIndicesOptions()), getRollupIndexCapsActionHandler); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java index 031db35ebac15..aea9c9d7a7553 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java @@ -28,6 +28,7 @@ class AggregationDataExtractor extends AbstractAggregationDataExtractor headers; + final IndicesOptions indicesOptions; AggregationDataExtractorContext(String jobId, String timeField, Set fields, List indices, QueryBuilder query, AggregatorFactories.Builder aggs, long start, long end, boolean includeDocCount, - Map headers) { + Map headers, IndicesOptions indicesOptions) { this.jobId = Objects.requireNonNull(jobId); this.timeField = Objects.requireNonNull(timeField); this.fields = Objects.requireNonNull(fields); @@ -39,5 +41,6 @@ class AggregationDataExtractorContext { this.end = end; this.includeDocCount = includeDocCount; this.headers = headers; + this.indicesOptions = Objects.requireNonNull(indicesOptions); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java index 40d186542df04..93c5a12d82d55 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java @@ -50,7 +50,8 @@ public DataExtractor newExtractor(long start, long end) { Intervals.alignToCeil(start, histogramInterval), Intervals.alignToFloor(end, histogramInterval), job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT), - datafeedConfig.getHeaders()); + datafeedConfig.getHeaders(), + datafeedConfig.getIndicesOptions()); return new AggregationDataExtractor(client, dataExtractorContext, timingStatsReporter); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java index 9341161057717..75b215e34c54f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java @@ -26,7 +26,9 @@ class RollupDataExtractor extends AbstractAggregationDataExtractor headers; final boolean hasAggregations; final Long histogramInterval; + final IndicesOptions indicesOptions; ChunkedDataExtractorContext(String jobId, String timeField, List indices, QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan, TimeAligner timeAligner, Map headers, - boolean hasAggregations, @Nullable Long histogramInterval) { + boolean hasAggregations, @Nullable Long histogramInterval, IndicesOptions indicesOptions) { this.jobId = Objects.requireNonNull(jobId); this.timeField = Objects.requireNonNull(timeField); this.indices = indices.toArray(new String[indices.size()]); @@ -48,5 +50,6 @@ interface TimeAligner { this.headers = headers; this.hasAggregations = hasAggregations; this.histogramInterval = histogramInterval; + this.indicesOptions = Objects.requireNonNull(indicesOptions); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java index 028409f69398b..e153bafa096e1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java @@ -54,7 +54,8 @@ public DataExtractor newExtractor(long start, long end) { timeAligner, datafeedConfig.getHeaders(), datafeedConfig.hasAggregations(), - datafeedConfig.hasAggregations() ? datafeedConfig.getHistogramIntervalMillis(xContentRegistry) : null + datafeedConfig.hasAggregations() ? datafeedConfig.getHistogramIntervalMillis(xContentRegistry) : null, + datafeedConfig.getIndicesOptions() ); return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext, timingStatsReporter); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index e0bf14b1bb1b3..8745ee49a1166 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -125,6 +125,7 @@ private SearchRequestBuilder buildSearchRequest(long start) { .setScroll(SCROLL_TIMEOUT) .addSort(context.extractedFields.timeField(), SortOrder.ASC) .setIndices(context.indices) + .setIndicesOptions(context.indicesOptions) .setSize(context.scrollSize) .setQuery(ExtractorUtils.wrapInTimeRangeQuery( context.query, context.extractedFields.timeField(), start, context.end)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java index 19dbfa669b111..7ec6b3b65c381 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -23,10 +24,11 @@ class ScrollDataExtractorContext { final long start; final long end; final Map headers; + final IndicesOptions indicesOptions; ScrollDataExtractorContext(String jobId, TimeBasedExtractedFields extractedFields, List indices, QueryBuilder query, List scriptFields, int scrollSize, long start, long end, - Map headers) { + Map headers, IndicesOptions indicesOptions) { this.jobId = Objects.requireNonNull(jobId); this.extractedFields = Objects.requireNonNull(extractedFields); this.indices = indices.toArray(new String[indices.size()]); @@ -36,5 +38,6 @@ class ScrollDataExtractorContext { this.start = start; this.end = end; this.headers = headers; + this.indicesOptions = Objects.requireNonNull(indicesOptions); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java index 0c626ea7e22f7..7bc995df5b56c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java @@ -25,7 +25,6 @@ import java.util.Objects; public class ScrollDataExtractorFactory implements DataExtractorFactory { - private final Client client; private final DatafeedConfig datafeedConfig; private final Job job; @@ -54,7 +53,9 @@ public DataExtractor newExtractor(long start, long end) { datafeedConfig.getScrollSize(), start, end, - datafeedConfig.getHeaders()); + datafeedConfig.getHeaders(), + datafeedConfig.getIndicesOptions() + ); return new ScrollDataExtractor(client, dataExtractorContext, timingStatsReporter); } @@ -87,11 +88,13 @@ public static void create(Client client, // Step 1. Get field capabilities necessary to build the information of how to extract fields FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest(); - fieldCapabilitiesRequest.indices(datafeed.getIndices().toArray(new String[datafeed.getIndices().size()])); + fieldCapabilitiesRequest.indices(datafeed.getIndices().toArray(new String[0])).indicesOptions(datafeed.getIndicesOptions()); // We need capabilities for all fields matching the requested fields' parents so that we can work around // multi-fields that are not in source. - String[] requestFields = job.allInputFields().stream().map(f -> MlStrings.getParentField(f) + "*") - .toArray(size -> new String[size]); + String[] requestFields = job.allInputFields() + .stream() + .map(f -> MlStrings.getParentField(f) + "*") + .toArray(String[]::new); fieldCapabilitiesRequest.fields(requestFields); ClientHelper. executeWithHeaders(datafeed.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> { client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java index 8b71e137bea8f..4b79abdfef662 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPutDatafeedAction.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.rest.datafeeds; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.BaseRestHandler; @@ -44,8 +46,9 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName()); + IndicesOptions indicesOptions = IndicesOptions.fromRequest(restRequest, SearchRequest.DEFAULT_INDICES_OPTIONS); XContentParser parser = restRequest.contentParser(); - PutDatafeedAction.Request putDatafeedRequest = PutDatafeedAction.Request.parseRequest(datafeedId, parser); + PutDatafeedAction.Request putDatafeedRequest = PutDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, parser); putDatafeedRequest.timeout(restRequest.paramAsTime("timeout", putDatafeedRequest.timeout())); putDatafeedRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putDatafeedRequest.masterNodeTimeout())); return channel -> client.execute(PutDatafeedAction.INSTANCE, putDatafeedRequest, new RestToXContentListener<>(channel)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java index 0375be2907b35..cf8046921336a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.rest.datafeeds; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.BaseRestHandler; @@ -44,8 +46,15 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName()); + IndicesOptions indicesOptions = null; + if (restRequest.hasParam("expand_wildcards") + || restRequest.hasParam("ignore_unavailable") + || restRequest.hasParam("allow_no_indices") + || restRequest.hasParam("ignore_throttled")) { + indicesOptions = IndicesOptions.fromRequest(restRequest, SearchRequest.DEFAULT_INDICES_OPTIONS); + } XContentParser parser = restRequest.contentParser(); - UpdateDatafeedAction.Request updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, parser); + UpdateDatafeedAction.Request updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, indicesOptions, parser); updateDatafeedRequest.timeout(restRequest.paramAsTime("timeout", updateDatafeedRequest.timeout())); updateDatafeedRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", updateDatafeedRequest.masterNodeTimeout())); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index 39f3a3f4889c1..3c986b764a0aa 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -7,6 +7,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -75,10 +76,19 @@ public void testSelectNode_GivenJobIsOpened() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobIsOpening() { @@ -91,10 +101,19 @@ public void testSelectNode_GivenJobIsOpening() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertEquals("node_id", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testNoJobTask() { @@ -107,15 +126,23 @@ public void testNoJobTask() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id], because the job's [job_id] state is " + "[closed] while state [opened] is required")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [closed] while state [opened] is required]")); } @@ -131,15 +158,23 @@ public void testSelectNode_GivenJobFailedOrClosed() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is [" + jobState + "] while state [opened] is required]")); @@ -160,13 +195,22 @@ public void testShardUnassigned() { givenClusterState("foo", 1, 0, states); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testShardNotAllActive() { @@ -185,13 +229,22 @@ public void testShardNotAllActive() { givenClusterState("foo", 2, 0, states); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [foo] " + "does not have all primary shards active yet.")); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testIndexDoesntExist() { @@ -204,17 +257,32 @@ public void testIndexDoesntExist() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); - assertThat(result.getExplanation(), equalTo("cannot start datafeed [datafeed_id] because index [not_foo] " + - "does not exist, is closed, or is still initializing.")); + assertThat(result.getExplanation(), + equalTo("cannot start datafeed [datafeed_id] because it failed resolving indices given [not_foo] and " + + "indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, " + + "expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, " + + "forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] with exception [no such index [not_foo]]")); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); + + "[cannot start datafeed [datafeed_id] because it failed resolving " + + "indices given [not_foo] and indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, " + + "expand_wildcards_open=true, expand_wildcards_closed=false, expand_wildcards_hidden=false, " + + "allow_aliases_to_multiple_indices=true, forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] " + + "with exception [no such index [not_foo]]]")); } public void testRemoteIndex() { @@ -227,8 +295,12 @@ public void testRemoteIndex() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNotNull(result.getExecutorNode()); } @@ -245,15 +317,23 @@ public void testSelectNode_jobTaskStale() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertNull(result.getExecutorNode()); assertEquals("cannot start datafeed [datafeed_id], because the job's [job_id] state is stale", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " + "[cannot start datafeed [datafeed_id], because the job's [job_id] state is stale]")); @@ -261,9 +341,19 @@ public void testSelectNode_jobTaskStale() { addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder); tasks = tasksBuilder.build(); givenClusterState("foo", 1, 0); - result = new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertEquals("node_id1", result.getExecutorNode()); - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).checkDatafeedTaskCanBeCreated(); + new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated(); } public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() { @@ -280,10 +370,17 @@ public void testSelectNode_GivenJobOpeningAndIndexDoesNotExist() { givenClusterState("foo", 1, 0); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]")); + + "[cannot start datafeed [datafeed_id] because it failed resolving indices given [not_foo] and " + + "indices_options [IndicesOptions[ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, " + + "expand_wildcards_closed=false, expand_wildcards_hidden=false, allow_aliases_to_multiple_indices=true, " + + "forbid_closed_indices=true, ignore_aliases=false, ignore_throttled=true]] with exception [no such index [not_foo]]]")); } public void testSelectNode_GivenMlUpgradeMode() { @@ -297,8 +394,12 @@ public void testSelectNode_GivenMlUpgradeMode() { givenClusterState("foo", 1, 0); - PersistentTasksCustomMetaData.Assignment result = - new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode(); + PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).selectNode(); assertThat(result, equalTo(MlTasks.AWAITING_UPGRADE)); } @@ -314,8 +415,12 @@ public void testCheckDatafeedTaskCanBeCreated_GivenMlUpgradeMode() { givenClusterState("foo", 1, 0); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()) - .checkDatafeedTaskCanBeCreated()); + () -> new DatafeedNodeSelector(clusterState, + resolver, + df.getId(), + df.getJobId(), + df.getIndices(), + SearchRequest.DEFAULT_INDICES_OPTIONS).checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), equalTo("Could not start datafeed [datafeed_id] as indices are being upgraded")); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java index 537a6b44d0bc6..72c3edb33cd21 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; @@ -272,7 +273,7 @@ public void testExtractionGivenInitSearchResponseEncounteredUnavailableShards() private AggregationDataExtractorContext createContext(long start, long end) { return new AggregationDataExtractorContext(jobId, timeField, fields, indices, query, aggs, start, end, true, - Collections.emptyMap()); + Collections.emptyMap(), SearchRequest.DEFAULT_INDICES_OPTIONS); } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java index e7ac362543945..657545d5f3e8f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java @@ -565,7 +565,8 @@ private ChunkedDataExtractorContext createContext(long start, long end) { private ChunkedDataExtractorContext createContext(long start, long end, boolean hasAggregations, Long histogramInterval) { return new ChunkedDataExtractorContext(jobId, timeField, indices, query, scrollSize, start, end, chunkSpan, - ChunkedDataExtractorFactory.newIdentityTimeAligner(), Collections.emptyMap(), hasAggregations, histogramInterval); + ChunkedDataExtractorFactory.newIdentityTimeAligner(), Collections.emptyMap(), hasAggregations, histogramInterval, + SearchRequest.DEFAULT_INDICES_OPTIONS); } private static class StubSubExtractor implements DataExtractor { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java index 962bd9ee6d3df..76e86a9a82170 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; @@ -444,7 +445,7 @@ public void testDomainSplitScriptField() throws IOException { List sFields = Arrays.asList(withoutSplit, withSplit); ScrollDataExtractorContext context = new ScrollDataExtractorContext(jobId, extractedFields, indices, - query, sFields, scrollSize, 1000, 2000, Collections.emptyMap()); + query, sFields, scrollSize, 1000, 2000, Collections.emptyMap(), SearchRequest.DEFAULT_INDICES_OPTIONS); TestDataExtractor extractor = new TestDataExtractor(context); @@ -490,7 +491,7 @@ public void testDomainSplitScriptField() throws IOException { private ScrollDataExtractorContext createContext(long start, long end) { return new ScrollDataExtractorContext(jobId, extractedFields, indices, query, scriptFields, scrollSize, start, end, - Collections.emptyMap()); + Collections.emptyMap(), SearchRequest.DEFAULT_INDICES_OPTIONS); } private SearchResponse createEmptySearchResponse() { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.put_datafeed.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.put_datafeed.json index 3991c682ac2eb..30985f178005e 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.put_datafeed.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.put_datafeed.json @@ -23,6 +23,31 @@ "body":{ "description":"The datafeed config", "required":true + }, + "params": { + "ignore_unavailable":{ + "type":"boolean", + "description":"Ignore unavailable indexes (default: false)" + }, + "allow_no_indices":{ + "type":"boolean", + "description":"Ignore if the source indices expressions resolves to no concrete indices (default: true)" + }, + "ignore_throttled":{ + "type":"boolean", + "description":"Ignore indices that are marked as throttled (default: true)" + }, + "expand_wildcards":{ + "type":"enum", + "options":[ + "open", + "closed", + "hidden", + "none", + "all" + ], + "description":"Whether source index expressions should get expanded to open or closed indices (default: open)" + } } } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.update_datafeed.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.update_datafeed.json index d4c4a1f63d78d..f820933c76955 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.update_datafeed.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.update_datafeed.json @@ -23,6 +23,31 @@ "body":{ "description":"The datafeed update settings", "required":true + }, + "params": { + "ignore_unavailable":{ + "type":"boolean", + "description":"Ignore unavailable indexes (default: false)" + }, + "allow_no_indices":{ + "type":"boolean", + "description":"Ignore if the source indices expressions resolves to no concrete indices (default: true)" + }, + "ignore_throttled":{ + "type":"boolean", + "description":"Ignore indices that are marked as throttled (default: true)" + }, + "expand_wildcards":{ + "type":"enum", + "options":[ + "open", + "closed", + "hidden", + "none", + "all" + ], + "description":"Whether source index expressions should get expanded to open or closed indices (default: open)" + } } } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml index da4dfe7ec23d4..bca3996f22756 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml @@ -415,3 +415,78 @@ setup: datafeed_id: test-datafeed-1 force: true - match: { acknowledged: true } +--- +"Test put and update datafeed with indices options": + - do: + ml.put_datafeed: + datafeed_id: test-datafeed-indices-options-1 + body: > + { + "job_id":"datafeeds-crud-1", + "indexes":["index-foo"], + "indices_options": { + "expand_wildcards": ["closed", "open"], + "ignore_throttled": false + } + } + - match: { datafeed_id: "test-datafeed-indices-options-1" } + + - do: + ml.get_datafeeds: + datafeed_id: test-datafeed-indices-options-1 + + - match: { datafeeds.0.indices_options.ignore_throttled: false } + - length: { datafeeds.0.indices_options.expand_wildcards: 2 } + - match: { datafeeds.0.indices_options.expand_wildcards.0: open } + - match: { datafeeds.0.indices_options.expand_wildcards.1: closed } + + - do: + ml.update_datafeed: + datafeed_id: test-datafeed-indices-options-1 + body: > + { + "indices_options": { + "ignore_throttled": true + } + } + - match: { datafeed_id: "test-datafeed-indices-options-1" } + - do: + ml.get_datafeeds: + datafeed_id: test-datafeed-indices-options-1 + + - match: { datafeeds.0.indices_options.ignore_throttled: true } +--- +"Test put and update datafeed with indices options in params": + - do: + ml.put_datafeed: + datafeed_id: test-datafeed-indices-options-params-1 + ignore_throttled: false + body: > + { + "job_id":"datafeeds-crud-1", + "indexes":["index-foo"] + } + - match: { datafeed_id: "test-datafeed-indices-options-params-1" } + + - do: + ml.get_datafeeds: + datafeed_id: test-datafeed-indices-options-params-1 + + - match: { datafeeds.0.indices_options.ignore_throttled: false } + + + - do: + ml.update_datafeed: + datafeed_id: test-datafeed-indices-options-params-1 + ignore_throttled: false + allow_no_indices: false + body: > + { + } + - match: { datafeed_id: "test-datafeed-indices-options-params-1" } + - do: + ml.get_datafeeds: + datafeed_id: test-datafeed-indices-options-params-1 + + - match: { datafeeds.0.indices_options.ignore_throttled: false } + - match: { datafeeds.0.indices_options.allow_no_indices: false } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml index c24318e0b1973..08ea8e32df973 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml @@ -2,7 +2,6 @@ setup: - skip: version: "all" reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/42258" - --- "Put job and datafeed without aggs in old cluster": diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml index 5d824714e0097..15e907db715de 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml @@ -2,7 +2,6 @@ setup: - skip: version: "all" reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/42258" - - do: cluster.health: wait_for_status: green