Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML Data Frame] Set DF task state to stopped when stopping #42516

Merged
merged 1 commit into from
May 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.dataframe.DataFrame;
Expand Down Expand Up @@ -223,18 +222,8 @@ private void startTask(DataFrameTransformTask buildTask,
DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder,
Long previousCheckpoint,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
// If we are stopped, and it is an initial run, this means we have never been started,
// attempt to start the task

buildTask.initializeIndexer(indexerBuilder);
// TODO isInitialRun is false after relocation??
if (buildTask.getState().getTaskState().equals(DataFrameTransformTaskState.STOPPED) && buildTask.isInitialRun()) {
Copy link
Member Author

@davidkyle davidkyle May 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the change to start the DF directly rather than waiting for the scheduler, performing this check means the task would never start if state == STARTED i.e. after relocating from a failed node.

The task will always need to be started even if it has run before

logger.info("Data frame transform [{}] created.", buildTask.getTransformId());
buildTask.start(previousCheckpoint, listener);
} else {
logger.debug("No need to start task. Its current state is: {}", buildTask.getState().getIndexerState());
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
}
buildTask.start(previousCheckpoint, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,8 @@ public long getInProgressCheckpoint() {
}
}

public boolean isStopped() {
IndexerState currentState = getIndexer() == null ? initialIndexerState : getIndexer().getState();
return currentState.equals(IndexerState.STOPPED);
}

boolean isInitialRun() {
return getIndexer() != null && getIndexer().initialRun();
public void setTaskStateStopped() {
taskState.set(DataFrameTransformTaskState.STOPPED);
}

/**
Expand Down Expand Up @@ -235,11 +230,9 @@ public synchronized void start(Long startingCheckpoint, ActionListener<Response>

public synchronized void stop() {
if (getIndexer() == null) {
return;
}
// taskState is initialized as STOPPED and is updated in tandem with the indexerState
// Consequently, if it is STOPPED, we consider the whole task STOPPED.
if (taskState.get() == DataFrameTransformTaskState.STOPPED) {
// If there is no indexer the task has not been triggered
// but it still needs to be stopped and removed
shutdown();
return;
}

Expand Down Expand Up @@ -609,6 +602,8 @@ protected void onFinish(ActionListener<Void> listener) {
protected void onStop() {
auditor.info(transformConfig.getId(), "Indexer has stopped");
logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());

transformTask.setTaskStateStopped();
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, transformTask.getState(), getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
# - match: { transforms.0.state.indexer_state: "stopped" }
# - match: { transforms.0.state.task_state: "stopped" }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }

- do:
data_frame.start_data_frame_transform:
Expand Down