diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index 1bd49154ee548..40cd6f454cdab 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -72,6 +72,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.oneOf; public class DataFrameTransformIT extends ESRestHighLevelClientTestCase { @@ -264,7 +265,8 @@ public void testStartStop() throws IOException { GetDataFrameTransformStatsResponse statsResponse = execute(new GetDataFrameTransformStatsRequest(id), client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync); assertThat(statsResponse.getTransformsStateAndStats(), hasSize(1)); - assertEquals(IndexerState.STARTED, statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState()); + IndexerState indexerState = statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState(); + assertThat(indexerState, is(oneOf(IndexerState.STARTED, IndexerState.INDEXING))); StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null); StopDataFrameTransformResponse stopResponse = diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index d0f15197c3cca..5b0c0e7dfc19b 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -106,8 +106,6 @@ static List verifyIndicesPrimaryShardsAreActive(ClusterState clusterStat protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) { final String transformId = params.getId(); final DataFrameTransformTask buildTask = (DataFrameTransformTask) task; - final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(DataFrameTransformTask.SCHEDULE_NAME + "_" + transformId, - next()); final DataFrameTransformState transformState = (DataFrameTransformState) state; final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder = @@ -137,7 +135,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr stats -> { indexerBuilder.setInitialStats(stats); buildTask.initializeIndexer(indexerBuilder); - scheduleAndStartTask(buildTask, schedulerJob, startTaskListener); + startTask(buildTask, startTaskListener); }, error -> { if (error instanceof ResourceNotFoundException == false) { @@ -145,7 +143,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr } indexerBuilder.setInitialStats(new DataFrameIndexerTransformStats(transformId)); buildTask.initializeIndexer(indexerBuilder); - scheduleAndStartTask(buildTask, schedulerJob, startTaskListener); + startTask(buildTask, startTaskListener); } ); @@ -218,30 +216,20 @@ private void markAsFailed(DataFrameTransformTask task, String reason) { } } - private void scheduleAndStartTask(DataFrameTransformTask buildTask, - SchedulerEngine.Job schedulerJob, - ActionListener listener) { - // Note that while the task is added to the scheduler here, the internal state will prevent - // it from doing any work until the task is "started" via the StartTransform api - schedulerEngine.register(buildTask); - schedulerEngine.add(schedulerJob); - logger.info("Data frame transform [{}] created.", buildTask.getTransformId()); + private void startTask(DataFrameTransformTask buildTask, + ActionListener listener) { // If we are stopped, and it is an initial run, this means we have never been started, // attempt to start the task if (buildTask.getState().getTaskState().equals(DataFrameTransformTaskState.STOPPED) && buildTask.isInitialRun()) { + logger.info("Data frame transform [{}] created.", buildTask.getTransformId()); buildTask.start(listener); + } else { logger.debug("No need to start task. Its current state is: {}", buildTask.getState().getIndexerState()); listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); } } - static SchedulerEngine.Schedule next() { - return (startTime, now) -> { - return now + 1000; // to be fixed, hardcode something - }; - } - @Override protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, PersistentTasksCustomMetaData.PersistentTask persistentTask, Map headers) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index bfe0e4f4d77b1..ee8767e2235df 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -208,6 +208,10 @@ public synchronized void start(ActionListener listener) { persistStateToClusterState(state, ActionListener.wrap( task -> { auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]"); + long now = System.currentTimeMillis(); + // kick off the indexer + triggered(new Event(schedulerJobName(), now, now)); + registerWithSchedulerJob(); listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); }, exc -> { @@ -238,7 +242,7 @@ public synchronized void triggered(Event event) { return; } // for now no rerun, so only trigger if checkpoint == 0 - if (currentCheckpoint.get() == 0 && event.getJobName().equals(SCHEDULE_NAME + "_" + transform.getId())) { + if (currentCheckpoint.get() == 0 && event.getJobName().equals(schedulerJobName())) { logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), getIndexer().getState()); getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis()); } @@ -249,13 +253,7 @@ public synchronized void triggered(Event event) { * This tries to remove the job from the scheduler and completes the persistent task */ synchronized void shutdown() { - try { - schedulerEngine.remove(SCHEDULE_NAME + "_" + transform.getId()); - schedulerEngine.unregister(this); - } catch (Exception e) { - markAsFailed(e); - return; - } + deregisterSchedulerJob(); markAsCompleted(); } @@ -311,6 +309,27 @@ public synchronized void onCancelled() { } } + private void registerWithSchedulerJob() { + schedulerEngine.register(this); + final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(schedulerJobName(), next()); + schedulerEngine.add(schedulerJob); + } + + private void deregisterSchedulerJob() { + schedulerEngine.remove(schedulerJobName()); + schedulerEngine.unregister(this); + } + + private String schedulerJobName() { + return DataFrameTransformTask.SCHEDULE_NAME + "_" + getTransformId(); + } + + private SchedulerEngine.Schedule next() { + return (startTime, now) -> { + return now + 1000; // to be fixed, hardcode something + }; + } + synchronized void initializeIndexer(ClientDataFrameIndexerBuilder indexerBuilder) { indexer.set(indexerBuilder.build(this)); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 1e9223b79f201..8b30fd1186b5b 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -100,7 +100,7 @@ teardown: transform_id: "airline-transform-start-stop" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop" } - - match: { transforms.0.state.indexer_state: "started" } + - match: { transforms.0.state.indexer_state: "/started|indexing/" } - match: { transforms.0.state.task_state: "started" } - do: @@ -127,7 +127,7 @@ teardown: transform_id: "airline-transform-start-stop" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop" } - - match: { transforms.0.state.indexer_state: "started" } + - match: { transforms.0.state.indexer_state: "/started|indexing/" } - match: { transforms.0.state.task_state: "started" } --- @@ -168,7 +168,7 @@ teardown: transform_id: "airline-transform-start-stop" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop" } - - match: { transforms.0.state.indexer_state: "started" } + - match: { transforms.0.state.indexer_state: "/started|indexing/" } - match: { transforms.0.state.task_state: "started" } - do: @@ -194,7 +194,7 @@ teardown: transform_id: "airline-transform-start-later" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-later" } - - match: { transforms.0.state.indexer_state: "started" } + - match: { transforms.0.state.indexer_state: "/started|indexing/" } - match: { transforms.0.state.task_state: "started" } - do: diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml index 33b0f40863a79..bedeea18a1545 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml @@ -47,13 +47,13 @@ teardown: transform_id: "airline-transform-stats" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-stats" } - - match: { transforms.0.state.indexer_state: "started" } + - match: { transforms.0.state.indexer_state: "/started|indexing/" } - match: { transforms.0.state.task_state: "started" } - match: { transforms.0.state.checkpoint: 0 } - match: { transforms.0.stats.pages_processed: 0 } - match: { transforms.0.stats.documents_processed: 0 } - match: { transforms.0.stats.documents_indexed: 0 } - - match: { transforms.0.stats.trigger_count: 0 } + - match: { transforms.0.stats.trigger_count: 1 } - match: { transforms.0.stats.index_time_in_ms: 0 } - match: { transforms.0.stats.index_total: 0 } - match: { transforms.0.stats.index_failures: 0 } @@ -172,7 +172,7 @@ teardown: transform_id: "_all" - match: { count: 2 } - match: { transforms.0.id: "airline-transform-stats" } - - match: { transforms.0.state.indexer_state: "started" } + - match: { transforms.0.state.indexer_state: "/started|indexing/" } - match: { transforms.1.id: "airline-transform-stats-dos" } - match: { transforms.1.state.indexer_state: "stopped" }