From 24cb417e8611496212ac88a9c0df3f10f43abd46 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 4 May 2020 20:23:47 +0300 Subject: [PATCH] [ML] Fix race condition updating reindexing progress In #55763 I thought I could remove the flag that marks reindexing was finished on a data frame analytics task. However, that exposed a race condition. It is possible that between updating reindexing progress to 100 because we have called `DataFrameAnalyticsManager.startAnalytics()` and a call to the _stats API which updates reindexing progress via the method `DataFrameAnalyticsTask.updateReindexTaskProgress()` we end up overwriting the 100 with a lower progress value. This commit fixes this issue by bringing back the help of a `isReindexingFinished` flag as it was prior to #55763. Closes #56128 --- .../xpack/ml/dataframe/DataFrameAnalyticsManager.java | 2 +- .../xpack/ml/dataframe/DataFrameAnalyticsTask.java | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index ec9f0755492f3..9c0f81461b4d1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -338,7 +338,7 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi ActionListener refreshListener = ActionListener.wrap( refreshResponse -> { // Now we can ensure reindexing progress is complete - task.getStatsHolder().getProgressTracker().updateReindexingProgress(100); + task.setReindexingFinished(); // TODO This could fail with errors. In that case we get stuck with the copied index. // We could delete the index in case of failure or we could try building the factory before reindexing diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java index 35ff2f0eb6f77..76480a60379c1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java @@ -67,6 +67,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S private final StartDataFrameAnalyticsAction.TaskParams taskParams; @Nullable private volatile Long reindexingTaskId; + private volatile boolean isReindexingFinished; private volatile boolean isStopping; private volatile boolean isMarkAsCompletedCalled; private final StatsHolder statsHolder; @@ -92,6 +93,10 @@ public void setReindexingTaskId(Long reindexingTaskId) { this.reindexingTaskId = reindexingTaskId; } + public void setReindexingFinished() { + isReindexingFinished = true; + } + public boolean isStopping() { return isStopping; } @@ -228,7 +233,7 @@ public void updateReindexTaskProgress(ActionListener listener) { private void getReindexTaskProgress(ActionListener listener) { TaskId reindexTaskId = getReindexTaskId(); if (reindexTaskId == null) { - listener.onResponse(statsHolder.getProgressTracker().getReindexingProgressPercent()); + listener.onResponse(isReindexingFinished ? 100 : 0); return; } @@ -244,7 +249,7 @@ private void getReindexTaskProgress(ActionListener listener) { error -> { if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) { // The task is not present which means either it has not started yet or it finished. - listener.onResponse(statsHolder.getProgressTracker().getReindexingProgressPercent()); + listener.onResponse(isReindexingFinished ? 100 : 0); } else { listener.onFailure(error); }