From 2679f4c3e54faa1f9a8e728c64c1d24a020a8561 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 22 May 2019 12:49:58 +0100 Subject: [PATCH] Always persist state --- .../transforms/DataFrameTransformTask.java | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 9df6b5e3ab337..926f233c454d1 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -444,7 +444,6 @@ static class ClientDataFrameIndexer extends DataFrameIndexer { private final DataFrameTransformsCheckpointService transformsCheckpointService; private final String transformId; private final DataFrameTransformTask transformTask; - private volatile DataFrameIndexerTransformStats previouslyPersistedStats = null; private final AtomicInteger failureCount; // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index private volatile String lastAuditedExceptionMessage = null; @@ -552,25 +551,18 @@ protected void doSaveState(IndexerState indexerState, Map positi // only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity ActionListener> updateClusterStateListener = ActionListener.wrap( task -> { - // Only persist the stats if something has actually changed - if (previouslyPersistedStats == null || previouslyPersistedStats.equals(getStats()) == false) { - transformsConfigManager.putOrUpdateTransformStats( - new DataFrameTransformStateAndStats(transformId, state, getStats(), - DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null + transformsConfigManager.putOrUpdateTransformStats( + new DataFrameTransformStateAndStats(transformId, state, getStats(), + DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null ActionListener.wrap( - r -> { - previouslyPersistedStats = getStats(); - next.run(); - }, - statsExc -> { - logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); - next.run(); - } + r -> { + next.run(); + }, + statsExc -> { + logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); + next.run(); + } )); - // The stats that we have previously written to the doc is the same as as it is now, no need to update it - } else { - next.run(); - } }, exc -> { logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc);