Skip to content

Commit

Permalink
addressing PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
benwtrent committed Feb 26, 2020
1 parent f6db6dc commit 2070eb8
Show file tree
Hide file tree
Showing 19 changed files with 228 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -242,11 +241,6 @@ private static Map<String, Object> asMap(BytesReference bytesReference) {
return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2();
}

@Override
public String toString() {
return Strings.toString(this, true, true);
}

/**
* The lists of indices and types are compared for equality but they are not
* sorted first so this test could fail simply because the indices and types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
Expand All @@ -34,9 +33,9 @@ private PutDatafeedAction() {

public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {

public static Request parseRequest(String datafeedId, @Nullable IndicesOptions indicesOptions, XContentParser parser) {
public static Request parseRequest(String datafeedId, IndicesOptions indicesOptions, XContentParser parser) {
DatafeedConfig.Builder datafeed = DatafeedConfig.STRICT_PARSER.apply(parser, null);
if (indicesOptions != null) {
if (datafeed.getIndicesOptions() == null) {
datafeed.setIndicesOptions(indicesOptions);
}
datafeed.setId(datafeedId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
Expand Down Expand Up @@ -152,7 +153,7 @@ public static class DatafeedParams implements PersistentTaskParams {
PARSER.declareString(DatafeedParams::setJobId, Job.ID);
PARSER.declareStringArray(DatafeedParams::setDatafeedIndices, INDICES);
PARSER.declareObject(DatafeedParams::setIndicesOptions,
(p, c) -> IndicesOptions.fromMap(p.map(), IndicesOptions.lenientExpandOpen()),
(p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS),
DatafeedConfig.INDICES_OPTIONS);
}

Expand Down Expand Up @@ -196,9 +197,9 @@ public DatafeedParams(StreamInput in) throws IOException {
jobId = in.readOptionalString();
datafeedIndices = in.readStringList();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
indicesOptions = in.readBoolean() ? IndicesOptions.readIndicesOptions(in) : null;
indicesOptions = IndicesOptions.readIndicesOptions(in);
} else {
indicesOptions = null;
indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
}
}

Expand All @@ -211,7 +212,7 @@ public DatafeedParams(StreamInput in) throws IOException {
private TimeValue timeout = TimeValue.timeValueSeconds(20);
private List<String> datafeedIndices = Collections.emptyList();
private String jobId;
private IndicesOptions indicesOptions;
private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;


public String getDatafeedId() {
Expand Down Expand Up @@ -263,7 +264,7 @@ public IndicesOptions getIndicesOptions() {
}

public DatafeedParams setIndicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
this.indicesOptions = ExceptionsHelper.requireNonNull(indicesOptions, DatafeedConfig.INDICES_OPTIONS);
return this;
}

Expand All @@ -286,12 +287,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(jobId);
out.writeStringCollection(datafeedIndices);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
if (indicesOptions != null) {
out.writeBoolean(true);
indicesOptions.writeIndicesOptions(out);
} else {
out.writeBoolean(false);
}
indicesOptions.writeIndicesOptions(out);
}
}

Expand All @@ -310,11 +306,11 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
if (datafeedIndices.isEmpty() == false) {
builder.field(INDICES.getPreferredName(), datafeedIndices);
}
if (indicesOptions != null) {
builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
builder.endObject();
}

builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
builder.endObject();

builder.endObject();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -203,7 +204,7 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue
this.headers = Collections.unmodifiableMap(headers);
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.maxEmptySearches = maxEmptySearches;
this.indicesOptions = indicesOptions;
this.indicesOptions = ExceptionsHelper.requireNonNull(indicesOptions, INDICES_OPTIONS);
}

public DatafeedConfig(StreamInput in) throws IOException {
Expand Down Expand Up @@ -236,9 +237,9 @@ public DatafeedConfig(StreamInput in) throws IOException {
maxEmptySearches = null;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
indicesOptions = in.readBoolean() ? IndicesOptions.readIndicesOptions(in) : null;
indicesOptions = IndicesOptions.readIndicesOptions(in);
} else {
indicesOptions = null;
indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
}
}

Expand Down Expand Up @@ -448,12 +449,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalVInt(maxEmptySearches);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
if (indicesOptions != null) {
out.writeBoolean(true);
indicesOptions.writeIndicesOptions(out);
} else {
out.writeBoolean(false);
}
indicesOptions.writeIndicesOptions(out);
}
}

Expand Down Expand Up @@ -494,11 +490,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (maxEmptySearches != null) {
builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches);
}
if (indicesOptions != null) {
builder.startObject(INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
builder.endObject();
}
builder.startObject(INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
builder.endObject();

builder.endObject();
return builder;
}
Expand Down Expand Up @@ -748,6 +743,10 @@ public Builder setIndicesOptions(IndicesOptions indicesOptions) {
return this;
}

public IndicesOptions getIndicesOptions() {
return this.indicesOptions;
}

public DatafeedConfig build() {
ExceptionsHelper.requireNonNull(id, ID.getPreferredName());
ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
Expand All @@ -762,6 +761,9 @@ public DatafeedConfig build() {
setDefaultChunkingConfig();

setDefaultQueryDelay();
if (indicesOptions == null) {
indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
}
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize,
chunkingConfig, headers, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -13,7 +14,6 @@
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction.Request;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests;
import org.junit.Before;

Expand All @@ -30,9 +30,7 @@ public void setUpDatafeedId() {

@Override
protected Request createTestInstance() {
DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, randomAlphaOfLength(10));
datafeedConfig.setIndices(Collections.singletonList(randomAlphaOfLength(10)));
return new Request(datafeedConfig.build());
return new Request(DatafeedConfigTests.createRandomizedDatafeedConfig(randomAlphaOfLength(10), datafeedId, 3600));
}

@Override
Expand All @@ -47,7 +45,7 @@ protected boolean supportsUnknownFields() {

@Override
protected Request doParseInstance(XContentParser parser) {
return Request.parseRequest(datafeedId, null, parser);
return Request.parseRequest(datafeedId, SearchRequest.DEFAULT_INDICES_OPTIONS, parser);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,15 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId) {
}

public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long bucketSpanMillis) {
return createRandomizedDatafeedConfigBuilder(jobId, bucketSpanMillis).build();
return createRandomizedDatafeedConfig(jobId, randomValidDatafeedId(), bucketSpanMillis);
}

private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(String jobId, long bucketSpanMillis) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(randomValidDatafeedId(), jobId);
public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, String datafeedId, long bucketSpanMillis) {
return createRandomizedDatafeedConfigBuilder(jobId, datafeedId, bucketSpanMillis).build();
}

private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(String jobId, String datafeedId, long bucketSpanMillis) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedId, jobId);
builder.setIndices(randomStringList(1, 10));
if (randomBoolean()) {
builder.setQueryProvider(createRandomValidQueryProvider(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
Expand Down Expand Up @@ -139,13 +143,11 @@ private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(Stri
if (randomBoolean()) {
builder.setMaxEmptySearches(randomIntBetween(10, 100));
}
if (randomBoolean()) {
builder.setIndicesOptions(IndicesOptions.fromOptions(randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean()));
}
builder.setIndicesOptions(IndicesOptions.fromOptions(randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean()));
return builder;
}

Expand Down Expand Up @@ -372,7 +374,7 @@ public void testMultipleDefinedAggParse() throws IOException {
}

public void testToXContentForInternalStorage() throws IOException {
DatafeedConfig.Builder builder = createRandomizedDatafeedConfigBuilder("foo", 300);
DatafeedConfig.Builder builder = createRandomizedDatafeedConfigBuilder("foo", randomValidDatafeedId(), 300);

// headers are only persisted to cluster state
Map<String, String> headers = new HashMap<>();
Expand Down Expand Up @@ -757,7 +759,7 @@ public void testSerializationOfComplexAggs() throws IOException {
xContentType);

DatafeedConfig parsedDatafeedConfig = doParseInstance(parser);
assertEquals(datafeedConfig, parsedDatafeedConfig);
assertEquals(datafeedConfig.getAggregations(), parsedDatafeedConfig.getAggregations());

// Assert that the parsed versions of our aggs and queries work as well
assertEquals(aggBuilder, parsedDatafeedConfig.getParsedAggregations(xContentRegistry()));
Expand All @@ -769,7 +771,7 @@ public void testSerializationOfComplexAggs() throws IOException {
datafeedConfig.writeTo(output);
try(StreamInput streamInput = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
DatafeedConfig streamedDatafeedConfig = new DatafeedConfig(streamInput);
assertEquals(datafeedConfig, streamedDatafeedConfig);
assertEquals(datafeedConfig.getAggregations(), streamedDatafeedConfig.getAggregations());

// Assert that the parsed versions of our aggs and queries work as well
assertEquals(aggBuilder, streamedDatafeedConfig.getParsedAggregations(xContentRegistry()));
Expand Down Expand Up @@ -947,10 +949,6 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOExcept
break;
case 11:
builder.setIndicesOptions(IndicesOptions.fromOptions(randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,6 @@ protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) throws IOExcept
break;
case 11:
builder.setIndicesOptions(IndicesOptions.fromOptions(randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -48,7 +49,7 @@ public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolv
this.jobTask = MlTasks.getJobTask(jobId, tasks);
this.clusterState = Objects.requireNonNull(clusterState);
this.resolver = Objects.requireNonNull(resolver);
this.indicesOptions = indicesOptions == null ? IndicesOptions.lenientExpandOpen() : indicesOptions;
this.indicesOptions = Objects.requireNonNull(indicesOptions);
}

public void checkDatafeedTaskCanBeCreated() {
Expand Down Expand Up @@ -119,25 +120,28 @@ private AssignmentFailure verifyIndicesActive() {
}

String[] concreteIndices;
String reason = "cannot start datafeed [" + datafeedId + "] because index ["
+ index + "] does not exist, is closed, or is still initializing.";

try {
concreteIndices = resolver.concreteIndexNames(clusterState, indicesOptions, index);
if (concreteIndices.length == 0) {
return new AssignmentFailure(reason, true);
return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index ["
+ index + "] does not exist, is closed, or is still initializing.", true);
}
} catch (Exception e) {
LOGGER.debug(reason, e);
return new AssignmentFailure(reason, true);
String msg = new ParameterizedMessage("failed resolving indices given [{}] and indices_options [{}]",
index,
indicesOptions).getFormattedMessage();
LOGGER.debug("[" + datafeedId + "] " + msg, e);
return new AssignmentFailure(
"cannot start datafeed [" + datafeedId + "] because it " + msg + " with exception [" + e.getMessage() + "]",
true);
}

for (String concreteIndex : concreteIndices) {
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(concreteIndex);
if (routingTable == null || !routingTable.allPrimaryShardsActive()) {
reason = "cannot start datafeed [" + datafeedId + "] because index ["
+ concreteIndex + "] does not have all primary shards active yet.";
return new AssignmentFailure(reason, false);
return new AssignmentFailure("cannot start datafeed [" + datafeedId + "] because index ["
+ concreteIndex + "] does not have all primary shards active yet.", false);
}
}
}
Expand Down
Loading

0 comments on commit 2070eb8

Please sign in to comment.