Skip to content

Commit

Permalink
[ML][Transforms] allow executor to call start on started task (#46347)
Browse files Browse the repository at this point in the history
  • Loading branch information
benwtrent authored Sep 5, 2019
1 parent 2648ece commit 8cdce05
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ private void startTask(DataFrameTransformTask buildTask,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
buildTask.initializeIndexer(indexerBuilder);
// DataFrameTransformTask#start will fail if the task state is FAILED
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener);
// Will continue to attempt to start the indexer, even if the state is STARTED
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, false, listener);
}

private void setNumFailureRetries(int numFailureRetries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,8 @@ public void getCheckpointingInfo(DataFrameTransformsCheckpointService transforms
));
}

/**
* Start the background indexer and set the task's state to started
* @param startingCheckpoint Set the current checkpoint to this value. If null the
* current checkpoint is not set
* @param listener Started listener
*/
public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
// Here `failOnConflict` is usually true, except when the initial start is called when the task is assigned to the node
synchronized void start(Long startingCheckpoint, boolean force, boolean failOnConflict, ActionListener<Response> listener) {
logger.debug("[{}] start called with force [{}] and state [{}].", getTransformId(), force, getState());
if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) {
listener.onFailure(new ElasticsearchStatusException(
Expand All @@ -249,7 +244,7 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis
return;
}
// If we are already in a `STARTED` state, we should not attempt to call `.start` on the indexer again.
if (taskState.get() == DataFrameTransformTaskState.STARTED) {
if (taskState.get() == DataFrameTransformTaskState.STARTED && failOnConflict) {
listener.onFailure(new ElasticsearchStatusException(
"Cannot start transform [{}] as it is already STARTED.",
RestStatus.CONFLICT,
Expand All @@ -260,7 +255,7 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis
final IndexerState newState = getIndexer().start();
if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) {
listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]",
transform.getId(), newState));
transform.getId(), newState));
return;
}
stateReason.set(null);
Expand Down Expand Up @@ -298,10 +293,20 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis
logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc);
getIndexer().stop();
listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
+ transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
+ transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
}
));
}
/**
* Start the background indexer and set the task's state to started
* @param startingCheckpoint Set the current checkpoint to this value. If null the
* current checkpoint is not set
* @param force Whether to force start a failed task or not
* @param listener Started listener
*/
public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
start(startingCheckpoint, force, true, listener);
}

public synchronized void stop(boolean force) {
logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand Down Expand Up @@ -52,7 +51,6 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.oneOf;

@LuceneTestCase.AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/46341")
public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {

private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));
Expand Down

0 comments on commit 8cdce05

Please sign in to comment.