Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] clear job size estimate cache when feature is reset #74494

Original file line number Diff line number Diff line change
Expand Up @@ -1312,9 +1312,12 @@ public String getFeatureDescription() {
public void cleanUpFeature(
ClusterService clusterService,
Client client,
ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> finalListener) {
ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> finalListener
) {
logger.info("Starting machine learning feature reset");

final Map<String, Boolean> results = new ConcurrentHashMap<>();

ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> unsetResetModeListener = ActionListener.wrap(
success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap(
resetSuccess -> finalListener.onResponse(success),
Expand All @@ -1337,25 +1340,43 @@ public void cleanUpFeature(
)
);

Map<String, Boolean> results = new ConcurrentHashMap<>();
ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> cleanedUpIndicesListener = ActionListener.wrap(
success -> {
if (memoryTracker.get() != null) {
memoryTracker.get().awaitAndClear(ActionListener.wrap(
cacheCleared -> unsetResetModeListener.onResponse(success),
clearFailed -> {
logger.error("failed to clear memory tracker cache via machine learning reset feature API", clearFailed);
unsetResetModeListener.onResponse(success);
}
));
return;
}
unsetResetModeListener.onResponse(success);
},
failure -> {
logger.error("failed to clear .ml-* indices via reset feature API", failure);
unsetResetModeListener.onFailure(failure);
}
);

ActionListener<ListTasksResponse> afterWaitingForTasks = ActionListener.wrap(
listTasksResponse -> {
listTasksResponse.rethrowFailures("Waiting for indexing requests for .ml-* indices");
if (results.values().stream().allMatch(b -> b)) {
// Call into the original listener to clean up the indices
SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener);
// Call into the original listener to clean up the indices and then clear ml memory cache
SystemIndexPlugin.super.cleanUpFeature(clusterService, client, cleanedUpIndicesListener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the memory tracker cleanup waits for refreshes to finish, I would do this index cleanup after clearing the memory tracker. It should avoid logging of spurious errors from refreshes that fail because the indices they're accessing get deleted.

Or was there a good reason for clearing the memory tracker last?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@droberts195, I figured if the job's potentially still existed, it would be good to keep around their estimates. But, since all jobs should be closed by this point, clearing the tracker earlier is probably ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the jobs should be closed, and banned from reopening by the reset-in-progress cluster setting. I think flipping the order may avoid log spam.

} else {
final List<String> failedComponents = results.entrySet().stream()
.filter(result -> result.getValue() == false)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
unsetResetModeListener.onFailure(
cleanedUpIndicesListener.onFailure(
new RuntimeException("Some machine learning components failed to reset: " + failedComponents)
);
}
},
unsetResetModeListener::onFailure
cleanedUpIndicesListener::onFailure
);

ActionListener<StopDataFrameAnalyticsAction.Response> afterDataframesStopped = ActionListener.wrap(dataFrameStopResponse -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -72,6 +73,7 @@ public class MlMemoryTracker implements LocalNodeMasterListener {
private final JobResultsProvider jobResultsProvider;
private final DataFrameAnalyticsConfigProvider configProvider;
private final Phaser stopPhaser;
private volatile AtomicInteger phase = new AtomicInteger(0);
private volatile boolean isMaster;
private volatile Instant lastUpdateTime;
private volatile Duration reassignmentRecheckInterval;
Expand Down Expand Up @@ -115,6 +117,37 @@ public void onMaster() {
public void offMaster() {
isMaster = false;
logger.trace("ML memory tracker off master");
clear();
}

public void awaitAndClear(ActionListener<Void> listener) {
// We never terminate the phaser
assert stopPhaser.isTerminated() == false;
// If there are no registered parties or no unarrived parties then there is a flaw
// in the register/arrive/unregister logic in another method that uses the phaser
assert stopPhaser.getRegisteredParties() > 0;
assert stopPhaser.getUnarrivedParties() > 0;
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(
() -> {
try {
// We await all current refreshes to complete, this increments the "current phase" and prevents
// further interaction while we clear contents
int newPhase = stopPhaser.arriveAndAwaitAdvance();
assert newPhase > 0;
clear();
phase.incrementAndGet();
listener.onResponse(null);
} catch (Exception e) {
logger.warn("failed to wait for all refresh requests to complete", e);
listener.onFailure(e);
}
}
);

}

private void clear() {
logger.trace("clearing ML Memory tracker contents");
for (Map<String, Long> memoryRequirementByJob : memoryRequirementByTaskName.values()) {
memoryRequirementByJob.clear();
}
Expand Down Expand Up @@ -401,8 +434,9 @@ public void refreshAnomalyDetectorJobMemory(String jobId, ActionListener<Long> l
}

// The phaser prevents searches being started after the memory tracker's stop() method has returned
if (stopPhaser.register() != 0) {
// Phases above 0 mean we've been stopped, so don't do any operations that involve external interaction
// Note: `phase` is incremented if cache is reset via the feature reset API
if (stopPhaser.register() != phase.get()) {
// Phases above not equal to `phase` mean we've been stopped, so don't do any operations that involve external interaction
stopPhaser.arriveAndDeregister();
listener.onFailure(new EsRejectedExecutionException("Couldn't run ML memory update - node is shutting down"));
return;
Expand Down