From 75067e7e9e1a0c167ad71b1feb7dd8ee084e6d09 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 5 Sep 2019 15:05:30 -0500 Subject: [PATCH 1/2] [ML][Transforms] remove `force` flag from _start --- .../xpack/core/XPackClientPlugin.java | 2 - .../core/dataframe/DataFrameMessages.java | 2 +- .../action/StartDataFrameTransformAction.java | 17 +- .../StartDataFrameTransformTaskAction.java | 152 ------------------ ...tDataFrameTransformActionRequestTests.java | 2 +- ...aFrameTransformTaskActionRequestTests.java | 23 --- ...FrameTransformTaskActionResponseTests.java | 23 --- .../integration/DataFrameRestTestCase.java | 11 +- .../DataFrameTaskFailedStateIT.java | 23 +-- .../xpack/dataframe/DataFrame.java | 3 - ...ransportStartDataFrameTransformAction.java | 45 ++---- ...portStartDataFrameTransformTaskAction.java | 93 ----------- .../RestStartDataFrameTransformAction.java | 3 +- ...FrameTransformPersistentTasksExecutor.java | 17 +- .../transforms/DataFrameTransformTask.java | 39 ++--- 15 files changed, 58 insertions(+), 397 deletions(-) delete mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java delete mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionRequestTests.java delete mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionResponseTests.java delete mode 100644 x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index fbb919490d0ae..6ffe5f29382c4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -43,7 +43,6 @@ import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; -import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; @@ -387,7 +386,6 @@ public List> getClientActions() { // Data Frame PutDataFrameTransformAction.INSTANCE, StartDataFrameTransformAction.INSTANCE, - StartDataFrameTransformTaskAction.INSTANCE, StopDataFrameTransformAction.INSTANCE, DeleteDataFrameTransformAction.INSTANCE, GetDataFrameTransformsAction.INSTANCE, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java index 32f639a1fac21..1801b146101ab 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -36,7 +36,7 @@ public class DataFrameMessages { " Use force stop to stop the data frame transform."; public static final String DATA_FRAME_CANNOT_START_FAILED_TRANSFORM = "Unable to start data frame transform [{0}] as it is in a failed state with failure: [{1}]. " + - "Use force start to restart data frame transform once error is resolved."; + "Use force stop and then restart the data frame transform once error is resolved."; public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]"; public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION = diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java index 7473193e4390e..02598027ec8fd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.dataframe.action; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.AcknowledgedRequest; @@ -33,32 +34,30 @@ private StartDataFrameTransformAction() { public static class Request extends AcknowledgedRequest { private final String id; - private final boolean force; - public Request(String id, boolean force) { + public Request(String id) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); - this.force = force; } public Request(StreamInput in) throws IOException { super(in); id = in.readString(); - force = in.readBoolean(); + if(in.getVersion().before(Version.V_8_0_0)) { + in.readBoolean(); + } } public String getId() { return id; } - public boolean isForce() { - return force; - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(id); - out.writeBoolean(force); + if(out.getVersion().before(Version.V_8_0_0)) { + out.writeBoolean(false); + } } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java deleted file mode 100644 index 4fe87d9727f93..0000000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.core.dataframe.action; - -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.tasks.BaseTasksRequest; -import org.elasticsearch.action.support.tasks.BaseTasksResponse; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.xpack.core.dataframe.DataFrameField; -import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper; - -import java.io.IOException; -import java.util.Collections; -import java.util.Objects; - -public class StartDataFrameTransformTaskAction extends ActionType { - - public static final StartDataFrameTransformTaskAction INSTANCE = new StartDataFrameTransformTaskAction(); - public static final String NAME = "cluster:admin/data_frame/start_task"; - - private StartDataFrameTransformTaskAction() { - super(NAME, StartDataFrameTransformTaskAction.Response::new); - } - - public static class Request extends BaseTasksRequest { - - private final String id; - private final boolean force; - - public Request(String id, boolean force) { - this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); - this.force = force; - } - - public Request(StreamInput in) throws IOException { - super(in); - id = in.readString(); - if (in.getVersion().onOrAfter(Version.V_7_4_0)) { - force = in.readBoolean(); - } else { - // The behavior before V_7_4_0 was that this flag did not exist, - // assuming previous checks allowed this task to be started. - force = true; - } - } - - public String getId() { - return id; - } - - public boolean isForce() { - return force; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(id); - if (out.getVersion().onOrAfter(Version.V_7_4_0)) { - out.writeBoolean(force); - } - } - - @Override - public boolean match(Task task) { - return task.getDescription().equals(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id); - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public int hashCode() { - return Objects.hash(id, force); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - Request other = (Request) obj; - return Objects.equals(id, other.id) && force == other.force; - } - } - - public static class Response extends BaseTasksResponse implements ToXContentObject { - private final boolean started; - - public Response(StreamInput in) throws IOException { - super(in); - started = in.readBoolean(); - } - - public Response(boolean started) { - super(Collections.emptyList(), Collections.emptyList()); - this.started = started; - } - - public boolean isStarted() { - return started; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(started); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - toXContentCommon(builder, params); - builder.field("started", started); - builder.endObject(); - return builder; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - - if (obj == null || getClass() != obj.getClass()) { - return false; - } - Response response = (Response) obj; - return started == response.started; - } - - @Override - public int hashCode() { - return Objects.hash(started); - } - } -} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformActionRequestTests.java index 6220a08fb1084..976db70c45f49 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformActionRequestTests.java @@ -13,7 +13,7 @@ public class StartDataFrameTransformActionRequestTests extends AbstractWireSerializingTestCase { @Override protected Request createTestInstance() { - return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean()); + return new Request(randomAlphaOfLengthBetween(1, 20)); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionRequestTests.java deleted file mode 100644 index b6fb6b94d5418..0000000000000 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionRequestTests.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.core.dataframe.action; - -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.test.AbstractWireSerializingTestCase; - -public class StartDataFrameTransformTaskActionRequestTests extends - AbstractWireSerializingTestCase { - @Override - protected StartDataFrameTransformTaskAction.Request createTestInstance() { - return new StartDataFrameTransformTaskAction.Request(randomAlphaOfLength(4), randomBoolean()); - } - - @Override - protected Writeable.Reader instanceReader() { - return StartDataFrameTransformTaskAction.Request::new; - } -} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionResponseTests.java deleted file mode 100644 index 62165f87968e0..0000000000000 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionResponseTests.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.core.dataframe.action; - -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.test.AbstractWireSerializingTestCase; - -public class StartDataFrameTransformTaskActionResponseTests extends - AbstractWireSerializingTestCase { - @Override - protected StartDataFrameTransformTaskAction.Response createTestInstance() { - return new StartDataFrameTransformTaskAction.Response(randomBoolean()); - } - - @Override - protected Writeable.Reader instanceReader() { - return StartDataFrameTransformTaskAction.Response::new; - } -} diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 455009b49691e..50561b8f4b1ae 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -224,14 +224,13 @@ protected void createPivotReviewsTransform(String transformId, String dataFrameI assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } - protected void startDataframeTransform(String transformId, boolean force) throws IOException { - startDataframeTransform(transformId, force, null); + protected void startDataframeTransform(String transformId) throws IOException { + startDataframeTransform(transformId, null); } - protected void startDataframeTransform(String transformId, boolean force, String authHeader, String... warnings) throws IOException { + protected void startDataframeTransform(String transformId, String authHeader, String... warnings) throws IOException { // start the transform final Request startTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_start", authHeader); - startTransformRequest.addParameter(DataFrameField.FORCE.getPreferredName(), Boolean.toString(force)); if (warnings.length > 0) { startTransformRequest.setOptions(expectWarnings(warnings)); } @@ -259,7 +258,7 @@ protected void startAndWaitForTransform(String transformId, String dataFrameInde protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader, String... warnings) throws Exception { // start the transform - startDataframeTransform(transformId, false, authHeader, warnings); + startDataframeTransform(transformId, authHeader, warnings); assertTrue(indexExists(dataFrameIndex)); // wait until the dataframe has been created and all data is available waitForDataFrameCheckpoint(transformId); @@ -279,7 +278,7 @@ protected void startAndWaitForContinuousTransform(String transformId, String authHeader, long checkpoint) throws Exception { // start the transform - startDataframeTransform(transformId, false, authHeader, new String[0]); + startDataframeTransform(transformId, authHeader, new String[0]); assertTrue(indexExists(dataFrameIndex)); // wait until the dataframe has been created and all data is available waitForDataFrameCheckpoint(transformId, checkpoint); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java index edd8eb44a9feb..e0dc97d2533dc 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java @@ -28,8 +28,6 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.oneOf; public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { @@ -65,7 +63,7 @@ public void testForceStopFailedTransform() throws Exception { createDestinationIndexWithBadMapping(dataFrameIndex); createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null); failureTransforms.add(transformId); - startDataframeTransform(transformId, false); + startDataframeTransform(transformId); awaitState(transformId, DataFrameTransformStats.State.FAILED); Map fullState = getDataFrameState(transformId); final String failureReason = "task encountered more than 0 failures; latest failure: " + @@ -89,14 +87,14 @@ public void testForceStopFailedTransform() throws Exception { assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue())); } - public void testForceStartFailedTransform() throws Exception { + public void testStartFailedTransform() throws Exception { String transformId = "test-force-start-failed-transform"; createReviewsIndex(REVIEWS_INDEX_NAME, 10); String dataFrameIndex = "failure_pivot_reviews"; createDestinationIndexWithBadMapping(dataFrameIndex); createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null); failureTransforms.add(transformId); - startDataframeTransform(transformId, false); + startDataframeTransform(transformId); awaitState(transformId, DataFrameTransformStats.State.FAILED); Map fullState = getDataFrameState(transformId); final String failureReason = "task encountered more than 0 failures; latest failure: " + @@ -106,26 +104,15 @@ public void testForceStartFailedTransform() throws Exception { final String expectedFailure = "Unable to start data frame transform [test-force-start-failed-transform] " + "as it is in a failed state with failure: [" + failureReason + - "]. Use force start to restart data frame transform once error is resolved."; + "]. Use force stop and then restart the data frame transform once error is resolved."; // Verify that we cannot start the transform when the task is in a failed state assertBusy(() -> { - ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false)); + ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId)); assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), equalTo(expectedFailure)); }, 60, TimeUnit.SECONDS); - // Correct the failure by deleting the destination index - deleteIndex(dataFrameIndex); - // Force start the data frame to indicate failure correction - startDataframeTransform(transformId, true); - - // Verify that we have started and that our reason is cleared - fullState = getDataFrameState(transformId); - assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue())); - assertThat(XContentMapValues.extractValue("state", fullState), oneOf("started", "indexing")); - assertThat((Integer)XContentMapValues.extractValue("stats.index_failures", fullState), greaterThanOrEqualTo(1)); - stopDataFrameTransform(transformId, true); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java index 383c7b5e75c88..5bc4e7172c891 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java @@ -50,7 +50,6 @@ import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; -import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.UpdateDataFrameTransformAction; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; @@ -60,7 +59,6 @@ import org.elasticsearch.xpack.dataframe.action.TransportPreviewDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportPutDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformAction; -import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformTaskAction; import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportUpdateDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; @@ -143,7 +141,6 @@ public List getRestHandlers(final Settings settings, final RestCont return Arrays.asList( new ActionHandler<>(PutDataFrameTransformAction.INSTANCE, TransportPutDataFrameTransformAction.class), new ActionHandler<>(StartDataFrameTransformAction.INSTANCE, TransportStartDataFrameTransformAction.class), - new ActionHandler<>(StartDataFrameTransformTaskAction.INSTANCE, TransportStartDataFrameTransformTaskAction.class), new ActionHandler<>(StopDataFrameTransformAction.INSTANCE, TransportStopDataFrameTransformAction.class), new ActionHandler<>(DeleteDataFrameTransformAction.INSTANCE, TransportDeleteDataFrameTransformAction.class), new ActionHandler<>(GetDataFrameTransformsAction.INSTANCE, TransportGetDataFrameTransformsAction.class), diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index 622a373b50216..d0505a6c28785 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -37,7 +37,6 @@ import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; -import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; @@ -56,6 +55,8 @@ import java.util.function.Consumer; import java.util.function.Predicate; +import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_START_FAILED_TRANSFORM; + public class TransportStartDataFrameTransformAction extends TransportMasterNodeAction { @@ -95,7 +96,7 @@ protected StartDataFrameTransformAction.Response read(StreamInput in) throws IOE @Override protected void masterOperation(Task ignoredTask, StartDataFrameTransformAction.Request request, ClusterState state, - ActionListener listener) throws Exception { + ActionListener listener) { if (!licenseState.isDataFrameAllowed()) { listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME)); return; @@ -133,38 +134,20 @@ protected void masterOperation(Task ignoredTask, StartDataFrameTransformAction.R newPersistentTaskActionListener); } else { DataFrameTransformState transformState = (DataFrameTransformState)existingTask.getState(); - if(transformState.getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) { + if(transformState.getTaskState() == DataFrameTransformTaskState.FAILED) { listener.onFailure(new ElasticsearchStatusException( - "Unable to start data frame transform [" + request.getId() + - "] as it is in a failed state with failure: [" + transformState.getReason() + - "]. Use force start to restart data frame transform once error is resolved.", + DataFrameMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM, + request.getId(), + transformState.getReason()), RestStatus.CONFLICT)); - } else if (transformState.getTaskState() != DataFrameTransformTaskState.STOPPED && - transformState.getTaskState() != DataFrameTransformTaskState.FAILED) { - listener.onFailure(new ElasticsearchStatusException( - "Unable to start data frame transform [" + request.getId() + - "] as it is in state [" + transformState.getTaskState() + "]", RestStatus.CONFLICT)); } else { - // If the task already exists but is not assigned to a node, something is weird - // return a failure that includes the current assignment explanation (if one exists) - if (existingTask.isAssigned() == false) { - String assignmentExplanation = "unknown reason"; - if (existingTask.getAssignment() != null) { - assignmentExplanation = existingTask.getAssignment().getExplanation(); - } - listener.onFailure(new ElasticsearchStatusException("Unable to start data frame transform [" + - request.getId() + "] as it is not assigned to a node, explanation: " + assignmentExplanation, - RestStatus.CONFLICT)); - return; - } - // If the task already exists and is assigned to a node, simply attempt to set it to start - ClientHelper.executeAsyncWithOrigin(client, - ClientHelper.DATA_FRAME_ORIGIN, - StartDataFrameTransformTaskAction.INSTANCE, - new StartDataFrameTransformTaskAction.Request(request.getId(), request.isForce()), - ActionListener.wrap( - r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)), - listener::onFailure)); + // If the task already exists that means that it is either running or failed + // Since it is not failed, that means it is running, we return a conflict. + listener.onFailure(new ElasticsearchStatusException( + "Cannot start transform [{}] as it is already started.", + RestStatus.CONFLICT, + request.getId() + )); } } }, diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java deleted file mode 100644 index 17df98e0c2b33..0000000000000 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.dataframe.action; - -import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.TaskOperationFailure; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.tasks.TransportTasksAction; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.license.LicenseUtils; -import org.elasticsearch.license.XPackLicenseState; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.XPackField; -import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; -import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; - -import java.util.List; - -/** - * Internal only transport class to change an allocated persistent task's state to started - */ -public class TransportStartDataFrameTransformTaskAction extends - TransportTasksAction { - - private final XPackLicenseState licenseState; - - @Inject - public TransportStartDataFrameTransformTaskAction(TransportService transportService, ActionFilters actionFilters, - ClusterService clusterService, XPackLicenseState licenseState) { - super(StartDataFrameTransformTaskAction.NAME, clusterService, transportService, actionFilters, - StartDataFrameTransformTaskAction.Request::new, StartDataFrameTransformTaskAction.Response::new, - StartDataFrameTransformTaskAction.Response::new, ThreadPool.Names.SAME); - this.licenseState = licenseState; - } - - @Override - protected void doExecute(Task task, StartDataFrameTransformTaskAction.Request request, - ActionListener listener) { - - if (!licenseState.isDataFrameAllowed()) { - listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME)); - return; - } - - super.doExecute(task, request, listener); - } - - @Override - protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask, - ActionListener listener) { - if (transformTask.getTransformId().equals(request.getId())) { - transformTask.start(null, request.isForce(), listener); - } else { - listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId() - + "] does not match request's ID [" + request.getId() + "]")); - } - } - - @Override - protected StartDataFrameTransformTaskAction.Response newResponse(StartDataFrameTransformTaskAction.Request request, - List tasks, - List taskOperationFailures, - List failedNodeExceptions) { - - if (taskOperationFailures.isEmpty() == false) { - throw org.elasticsearch.ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause()); - } else if (failedNodeExceptions.isEmpty() == false) { - throw org.elasticsearch.ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0)); - } - - // Either the transform doesn't exist (the user didn't create it yet) or was deleted - // after the StartAPI executed. - // In either case, let the user know - if (tasks.size() == 0) { - throw new ResourceNotFoundException("Task for data frame transform [" + request.getId() + "] not found"); - } - - assert tasks.size() == 1; - - boolean allStarted = tasks.stream().allMatch(StartDataFrameTransformTaskAction.Response::isStarted); - return new StartDataFrameTransformTaskAction.Response(allStarted); - } -} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java index 44c2c66fbb769..d276c0ceb1db0 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java @@ -24,8 +24,7 @@ public RestStartDataFrameTransformAction(RestController controller) { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) { String id = restRequest.param(DataFrameField.ID.getPreferredName()); - boolean force = restRequest.paramAsBoolean(DataFrameField.FORCE.getPreferredName(), false); - StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id, force); + StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id); request.timeout(restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT)); return channel -> client.execute(StartDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index b1d8645193681..bd1a445b962ea 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -30,7 +30,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; -import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; +import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; @@ -139,9 +139,13 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr final SetOnce stateHolder = new SetOnce<>(); - ActionListener startTaskListener = ActionListener.wrap( - response -> logger.info("Successfully completed and scheduled task in node operation"), - failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure) + ActionListener startTaskListener = ActionListener.wrap( + response -> logger.info("[{}] successfully completed and scheduled task in node operation", transformId), + failure -> { + auditor.error(transformId, "Failed to start data frame transform. " + + "Please stop and attempt to start again. Failure: " + failure.getMessage()); + logger.error("Failed to start task ["+ transformId +"] in node operation", failure); + } ); // <5> load next checkpoint @@ -302,11 +306,10 @@ private void markAsFailed(DataFrameTransformTask task, String reason) { private void startTask(DataFrameTransformTask buildTask, DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder, Long previousCheckpoint, - ActionListener listener) { + ActionListener listener) { buildTask.initializeIndexer(indexerBuilder); // DataFrameTransformTask#start will fail if the task state is FAILED - // Will continue to attempt to start the indexer, even if the state is STARTED - buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, false, listener); + buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener); } private void setNumFailureRetries(int numFailureRetries) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index a31c148a1ed4d..f06b37f8d23fd 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -35,8 +35,7 @@ import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; -import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; -import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response; +import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction.Response; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; @@ -219,10 +218,17 @@ public void getCheckpointingInfo(DataFrameTransformsCheckpointService transforms )); } - // Here `failOnConflict` is usually true, except when the initial start is called when the task is assigned to the node - synchronized void start(Long startingCheckpoint, boolean force, boolean failOnConflict, ActionListener listener) { - logger.debug("[{}] start called with force [{}] and state [{}].", getTransformId(), force, getState()); - if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) { + /** + * Starts the dataframe transform and schedules it to be triggered in the future. + * + * NOTE: This should ONLY be called via {@link DataFrameTransformPersistentTasksExecutor} + * + * @param startingCheckpoint The starting checkpoint, could null. Null indicates that there is no starting checkpoint + * @param listener The listener to alert once started + */ + synchronized void start(Long startingCheckpoint, ActionListener listener) { + logger.debug("[{}] start called with state [{}].", getTransformId(), getState()); + if (taskState.get() == DataFrameTransformTaskState.FAILED) { listener.onFailure(new ElasticsearchStatusException( DataFrameMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM, getTransformId(), @@ -243,15 +249,6 @@ synchronized void start(Long startingCheckpoint, boolean force, boolean failOnCo msg)); return; } - // If we are already in a `STARTED` state, we should not attempt to call `.start` on the indexer again. - if (taskState.get() == DataFrameTransformTaskState.STARTED && failOnConflict) { - listener.onFailure(new ElasticsearchStatusException( - "Cannot start transform [{}] as it is already STARTED.", - RestStatus.CONFLICT, - getTransformId() - )); - return; - } final IndexerState newState = getIndexer().start(); if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) { listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]", @@ -285,7 +282,7 @@ synchronized void start(Long startingCheckpoint, boolean force, boolean failOnCo // kick off the indexer triggered(new Event(schedulerJobName(), now, now)); registerWithSchedulerJob(); - listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); + listener.onResponse(new Response(true)); }, exc -> { auditor.warning(transform.getId(), @@ -297,16 +294,6 @@ synchronized void start(Long startingCheckpoint, boolean force, boolean failOnCo } )); } - /** - * Start the background indexer and set the task's state to started - * @param startingCheckpoint Set the current checkpoint to this value. If null the - * current checkpoint is not set - * @param force Whether to force start a failed task or not - * @param listener Started listener - */ - public synchronized void start(Long startingCheckpoint, boolean force, ActionListener listener) { - start(startingCheckpoint, force, true, listener); - } public synchronized void stop(boolean force) { logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState()); From e99e5529f594b27be68e1cf7bd12e955f8f75995 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 5 Sep 2019 15:57:28 -0500 Subject: [PATCH 2/2] fixing expected error message --- .../rest-api-spec/test/data_frame/transforms_start_stop.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 044f5212a993c..d35e4b989c18a 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -59,7 +59,7 @@ teardown: - match: { acknowledged: true } - do: - catch: /Unable to start data frame transform \[airline-transform-start-stop\] as it is in state \[STARTED\]/ + catch: /Cannot start transform \[airline-transform-start-stop\] as it is already started/ data_frame.start_data_frame_transform: transform_id: "airline-transform-start-stop"