diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 9ed8da61d8feb..443d499dfefd1 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -30,7 +30,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.dataframe.DataFrame; @@ -223,18 +222,8 @@ private void startTask(DataFrameTransformTask buildTask, DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder, Long previousCheckpoint, ActionListener listener) { - // If we are stopped, and it is an initial run, this means we have never been started, - // attempt to start the task - buildTask.initializeIndexer(indexerBuilder); - // TODO isInitialRun is false after relocation?? - if (buildTask.getState().getTaskState().equals(DataFrameTransformTaskState.STOPPED) && buildTask.isInitialRun()) { - logger.info("Data frame transform [{}] created.", buildTask.getTransformId()); - buildTask.start(previousCheckpoint, listener); - } else { - logger.debug("No need to start task. Its current state is: {}", buildTask.getState().getIndexerState()); - listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); - } + buildTask.start(previousCheckpoint, listener); } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 926f233c454d1..13deab6748c94 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -174,13 +174,8 @@ public long getInProgressCheckpoint() { } } - public boolean isStopped() { - IndexerState currentState = getIndexer() == null ? initialIndexerState : getIndexer().getState(); - return currentState.equals(IndexerState.STOPPED); - } - - boolean isInitialRun() { - return getIndexer() != null && getIndexer().initialRun(); + public void setTaskStateStopped() { + taskState.set(DataFrameTransformTaskState.STOPPED); } /** @@ -235,11 +230,9 @@ public synchronized void start(Long startingCheckpoint, ActionListener public synchronized void stop() { if (getIndexer() == null) { - return; - } - // taskState is initialized as STOPPED and is updated in tandem with the indexerState - // Consequently, if it is STOPPED, we consider the whole task STOPPED. - if (taskState.get() == DataFrameTransformTaskState.STOPPED) { + // If there is no indexer the task has not been triggered + // but it still needs to be stopped and removed + shutdown(); return; } @@ -609,6 +602,8 @@ protected void onFinish(ActionListener listener) { protected void onStop() { auditor.info(transformConfig.getId(), "Indexer has stopped"); logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId()); + + transformTask.setTaskStateStopped(); transformsConfigManager.putOrUpdateTransformStats( new DataFrameTransformStateAndStats(transformId, transformTask.getState(), getStats(), DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 7a14359422b33..31f80033e7bdb 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -90,6 +90,9 @@ teardown: - match: { airline-data-by-airline-start-stop.mappings: {} } --- "Test start/stop/start transform": + - skip: + reason: "https://github.com/elastic/elasticsearch/issues/42650" + version: "all" - do: data_frame.start_data_frame_transform: transform_id: "airline-transform-start-stop" @@ -114,8 +117,8 @@ teardown: transform_id: "airline-transform-start-stop" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop" } -# - match: { transforms.0.state.indexer_state: "stopped" } -# - match: { transforms.0.state.task_state: "stopped" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } - do: data_frame.start_data_frame_transform: