Skip to content

Commit

Permalink
[7.x][ML] Ensure data frame analytics task is only marked completed o…
Browse files Browse the repository at this point in the history
…nce (#47119) (#47157)

Closes #46907
  • Loading branch information
dimitris-athanasiou authored Sep 26, 2019
1 parent 73a09b3 commit 0765bd4
Showing 1 changed file with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
private volatile Long reindexingTaskId;
private volatile boolean isReindexingFinished;
private volatile boolean isStopping;
private volatile boolean isMarkAsCompletedCalled;
private final ProgressTracker progressTracker = new ProgressTracker();

public DataFrameAnalyticsTask(long id, String type, String action, TaskId parentTask, Map<String, String> headers,
Expand Down Expand Up @@ -102,10 +103,17 @@ protected void onCancelled() {
public void markAsCompleted() {
// It is possible that the stop API has been called in the meantime and that
// may also cause this method to be called. We check whether we have already
// been marked completed to avoid doing it twice.
if (isCompleted() == false) {
persistProgress(() -> super.markAsCompleted());
// been marked completed to avoid doing it twice. We need to capture that
// locally instead of relying to isCompleted() because of the asynchronous
// persistence of progress.
synchronized (this) {
if (isMarkAsCompletedCalled) {
return;
}
isMarkAsCompletedCalled = true;
}

persistProgress(() -> super.markAsCompleted());
}

@Override
Expand Down Expand Up @@ -224,6 +232,7 @@ private TaskId getReindexTaskId() {
}

private void persistProgress(Runnable runnable) {
LOGGER.debug("[{}] Persisting progress", taskParams.getId());
GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(taskParams.getId());
executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsStatsAction.INSTANCE, getStatsRequest, ActionListener.wrap(
statsResponse -> {
Expand Down

0 comments on commit 0765bd4

Please sign in to comment.