Skip to content

Commit

Permalink
[ML Data Frame] Persist data frame after state changes (elastic#42347)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle authored and Gurkan Kaymak committed May 27, 2019
1 parent feac862 commit cd9b7b9
Showing 1 changed file with 10 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,6 @@ static class ClientDataFrameIndexer extends DataFrameIndexer {
private final DataFrameTransformsCheckpointService transformsCheckpointService;
private final String transformId;
private final DataFrameTransformTask transformTask;
private volatile DataFrameIndexerTransformStats previouslyPersistedStats = null;
private final AtomicInteger failureCount;
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
private volatile String lastAuditedExceptionMessage = null;
Expand Down Expand Up @@ -552,25 +551,18 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi
// only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> updateClusterStateListener = ActionListener.wrap(
task -> {
// Only persist the stats if something has actually changed
if (previouslyPersistedStats == null || previouslyPersistedStats.equals(getStats()) == false) {
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
previouslyPersistedStats = getStats();
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
r -> {
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
));
// The stats that we have previously written to the doc is the same as as it is now, no need to update it
} else {
next.run();
}
},
exc -> {
logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc);
Expand Down

0 comments on commit cd9b7b9

Please sign in to comment.