Skip to content

Commit

Permalink
[ML Data Frame] Start directly data frame rather than via the schedul…
Browse files Browse the repository at this point in the history
…er (elastic#42067)

Trigger indexer start directly to put the indexer in INDEXING state immediately
  • Loading branch information
davidkyle authored and Gurkan Kaymak committed May 27, 2019
1 parent 10cd4cf commit 273a305
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;

public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {

Expand Down Expand Up @@ -264,7 +265,8 @@ public void testStartStop() throws IOException {
GetDataFrameTransformStatsResponse statsResponse = execute(new GetDataFrameTransformStatsRequest(id),
client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
assertThat(statsResponse.getTransformsStateAndStats(), hasSize(1));
assertEquals(IndexerState.STARTED, statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState());
IndexerState indexerState = statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState();
assertThat(indexerState, is(oneOf(IndexerState.STARTED, IndexerState.INDEXING)));

StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
StopDataFrameTransformResponse stopResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterStat
protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
final String transformId = params.getId();
final DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(DataFrameTransformTask.SCHEDULE_NAME + "_" + transformId,
next());
final DataFrameTransformState transformState = (DataFrameTransformState) state;

final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder =
Expand Down Expand Up @@ -137,15 +135,15 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
stats -> {
indexerBuilder.setInitialStats(stats);
buildTask.initializeIndexer(indexerBuilder);
scheduleAndStartTask(buildTask, schedulerJob, startTaskListener);
startTask(buildTask, startTaskListener);
},
error -> {
if (error instanceof ResourceNotFoundException == false) {
logger.error("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
}
indexerBuilder.setInitialStats(new DataFrameIndexerTransformStats(transformId));
buildTask.initializeIndexer(indexerBuilder);
scheduleAndStartTask(buildTask, schedulerJob, startTaskListener);
startTask(buildTask, startTaskListener);
}
);

Expand Down Expand Up @@ -218,30 +216,20 @@ private void markAsFailed(DataFrameTransformTask task, String reason) {
}
}

private void scheduleAndStartTask(DataFrameTransformTask buildTask,
SchedulerEngine.Job schedulerJob,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
// Note that while the task is added to the scheduler here, the internal state will prevent
// it from doing any work until the task is "started" via the StartTransform api
schedulerEngine.register(buildTask);
schedulerEngine.add(schedulerJob);
logger.info("Data frame transform [{}] created.", buildTask.getTransformId());
private void startTask(DataFrameTransformTask buildTask,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
// If we are stopped, and it is an initial run, this means we have never been started,
// attempt to start the task
if (buildTask.getState().getTaskState().equals(DataFrameTransformTaskState.STOPPED) && buildTask.isInitialRun()) {
logger.info("Data frame transform [{}] created.", buildTask.getTransformId());
buildTask.start(listener);

} else {
logger.debug("No need to start task. Its current state is: {}", buildTask.getState().getIndexerState());
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
}
}

static SchedulerEngine.Schedule next() {
return (startTime, now) -> {
return now + 1000; // to be fixed, hardcode something
};
}

@Override
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> persistentTask, Map<String, String> headers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ public synchronized void start(ActionListener<Response> listener) {
persistStateToClusterState(state, ActionListener.wrap(
task -> {
auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]");
long now = System.currentTimeMillis();
// kick off the indexer
triggered(new Event(schedulerJobName(), now, now));
registerWithSchedulerJob();
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
},
exc -> {
Expand Down Expand Up @@ -238,7 +242,7 @@ public synchronized void triggered(Event event) {
return;
}
// for now no rerun, so only trigger if checkpoint == 0
if (currentCheckpoint.get() == 0 && event.getJobName().equals(SCHEDULE_NAME + "_" + transform.getId())) {
if (currentCheckpoint.get() == 0 && event.getJobName().equals(schedulerJobName())) {
logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), getIndexer().getState());
getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
}
Expand All @@ -249,13 +253,7 @@ public synchronized void triggered(Event event) {
* This tries to remove the job from the scheduler and completes the persistent task
*/
synchronized void shutdown() {
try {
schedulerEngine.remove(SCHEDULE_NAME + "_" + transform.getId());
schedulerEngine.unregister(this);
} catch (Exception e) {
markAsFailed(e);
return;
}
deregisterSchedulerJob();
markAsCompleted();
}

Expand Down Expand Up @@ -311,6 +309,27 @@ public synchronized void onCancelled() {
}
}

private void registerWithSchedulerJob() {
schedulerEngine.register(this);
final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(schedulerJobName(), next());
schedulerEngine.add(schedulerJob);
}

private void deregisterSchedulerJob() {
schedulerEngine.remove(schedulerJobName());
schedulerEngine.unregister(this);
}

private String schedulerJobName() {
return DataFrameTransformTask.SCHEDULE_NAME + "_" + getTransformId();
}

private SchedulerEngine.Schedule next() {
return (startTime, now) -> {
return now + 1000; // to be fixed, hardcode something
};
}

synchronized void initializeIndexer(ClientDataFrameIndexerBuilder indexerBuilder) {
indexer.set(indexerBuilder.build(this));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.0.state.task_state: "started" }

- do:
Expand All @@ -127,7 +127,7 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.0.state.task_state: "started" }

---
Expand Down Expand Up @@ -168,7 +168,7 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.0.state.task_state: "started" }

- do:
Expand All @@ -194,7 +194,7 @@ teardown:
transform_id: "airline-transform-start-later"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-later" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.0.state.task_state: "started" }

- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ teardown:
transform_id: "airline-transform-stats"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.0.state.task_state: "started" }
- match: { transforms.0.state.checkpoint: 0 }
- match: { transforms.0.stats.pages_processed: 0 }
- match: { transforms.0.stats.documents_processed: 0 }
- match: { transforms.0.stats.documents_indexed: 0 }
- match: { transforms.0.stats.trigger_count: 0 }
- match: { transforms.0.stats.trigger_count: 1 }
- match: { transforms.0.stats.index_time_in_ms: 0 }
- match: { transforms.0.stats.index_total: 0 }
- match: { transforms.0.stats.index_failures: 0 }
Expand Down Expand Up @@ -172,7 +172,7 @@ teardown:
transform_id: "_all"
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.1.id: "airline-transform-stats-dos" }
- match: { transforms.1.state.indexer_state: "stopped" }

Expand Down

0 comments on commit 273a305

Please sign in to comment.