-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] complete machine learning plugin feature state clean up integration #71011
[ML] complete machine learning plugin feature state clean up integration #71011
Conversation
Pinging @elastic/ml-core (Team:ML) |
|
||
Map<String, Boolean> results = new ConcurrentHashMap<>(); | ||
|
||
ActionListener<ListTasksResponse> afterWaitingForTasks = ActionListener.wrap( | ||
listTasksResponse -> { | ||
listTasksResponse.rethrowFailures("Waiting for indexing requests for .ml-* indices"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this throws unsetResetModeListener
will not be called is that intentional? All other paths call unsetResetModeListener
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elasticsearch/server/src/main/java/org/elasticsearch/action/ActionListener.java
Lines 128 to 150 in 318ae89
static <Response> ActionListener<Response> wrap(CheckedConsumer<Response, ? extends Exception> onResponse, | |
Consumer<Exception> onFailure) { | |
return new ActionListener<Response>() { | |
@Override | |
public void onResponse(Response response) { | |
try { | |
onResponse.accept(response); | |
} catch (Exception e) { | |
onFailure(e); | |
} | |
} | |
@Override | |
public void onFailure(Exception e) { | |
onFailure.accept(e); | |
} | |
@Override | |
public String toString() { | |
return "WrappedActionListener{" + onResponse + "}{" + onFailure + "}"; | |
} | |
}; | |
} |
Is the definition of the action listener created. The contents are all wrapped in a try catch
.setWaitForCompletion(true) | ||
.execute(ActionListener.wrap( | ||
listMlTasks -> { | ||
listMlTasks.rethrowFailures("Waiting for machine learning tasks"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the idea to leave the plugin in reset mode after a timeout or something so it can be tried again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the idea is reset mode to be unset.
See
elasticsearch/server/src/main/java/org/elasticsearch/action/ActionListener.java
Lines 128 to 150 in 318ae89
static <Response> ActionListener<Response> wrap(CheckedConsumer<Response, ? extends Exception> onResponse, | |
Consumer<Exception> onFailure) { | |
return new ActionListener<Response>() { | |
@Override | |
public void onResponse(Response response) { | |
try { | |
onResponse.accept(response); | |
} catch (Exception e) { | |
onFailure(e); | |
} | |
} | |
@Override | |
public void onFailure(Exception e) { | |
onFailure.accept(e); | |
} | |
@Override | |
public String toString() { | |
return "WrappedActionListener{" + onResponse + "}{" + onFailure + "}"; | |
} | |
}; | |
} |
@@ -151,6 +151,10 @@ private void triggerTasks() { | |||
LOGGER.warn("skipping scheduled [ML] maintenance tasks because upgrade mode is enabled"); | |||
return; | |||
} | |||
if (MlMetadata.getMlMetadata(clusterService.state()).isResetMode()) { | |||
LOGGER.warn("skipping scheduled [ML] maintenance tasks because reset mode is enabled"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ good catch
This means we have to be careful not to leave reset mode enabled and should try to ensure cleanUpFeature
does not leave it set.
|
||
@Override | ||
protected AcknowledgedResponse newResponse(boolean acknowledged) { | ||
logger.trace("Cluster update response built: " + acknowledged); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.trace("Cluster update response built: " + acknowledged); | |
logger.trace(() -> {"Cluster update response built: " + acknowledged}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or alternatively:
logger.trace("Cluster update response built: " + acknowledged); | |
logger.trace("Cluster update response built: {}", acknowledged); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I just left some comments on a few details.
public class SetResetModeAction extends ActionType<AcknowledgedResponse> { | ||
|
||
public static final SetResetModeAction INSTANCE = new SetResetModeAction(); | ||
public static final String NAME = "cluster:admin/xpack/ml/reset_mode"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use internal
instead of admin
here, to make absolutely clear that this is not intended to be called from outside the cluster. This is what our other actions that would be dangerous to call from outside the cluster have, e.g.:
Line 21 in a92a647
public static final String NAME = "cluster:internal/xpack/ml/job/kill/process"; |
) | ||
); | ||
client().execute(DeletePipelineAction.INSTANCE, new DeletePipelineRequest("feature_reset_failure_inference_pipeline")).actionGet(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Building on what Dave K said, all the tests in this class could have a final assertion that the reset flag is false in the ML custom cluster state. You could have a helper method that gets the cluster state using ClusterStateAction
and returns the value of the ML reset flag. Then every test can assert that the return value of that method is false.
ClusterService clusterService, | ||
Client client, | ||
ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> listener) { | ||
ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> finalListener) { | ||
logger.info("Starting machine learning cleanup"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.info("Starting machine learning cleanup"); | |
logger.info("Starting machine learning feature reset"); |
(Just to make it crystal clear to support that this was triggered by a user calling the reset API.
if (numberInferenceProcessors > 0) { | ||
unsetResetModeListener.onFailure( | ||
new RuntimeException( | ||
"Unable to reset component as there are ingest pipelines still referencing trained machine learning models" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean we're still going to need custom cleanup in between integration tests? If so it devalues this API a lot.
I would say that if we blow away ML entirely we blow away the ingest pipelines that are referencing ML. To paraphrase Tony Blair: tough on ML, tough on the users of ML.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@droberts195 that feels very rough to me.
All other ML interactions are sort of their own thing. If we go and delete ingest pipelines, that could blow up a users cluster. Maybe they don't honestly know if a pipeline is in use.
I think safety here is better than causing a user to potentially lose data.
Also, we already have this custom cleanup code in the tests that use pipelines (we have this outside of the normal clean up code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we already have this custom cleanup code in the tests that use pipelines (we have this outside of the normal clean up code).
OK, in that case this PR can stay as-is and we can decide what to do about ingest pipelines separately.
Once this PR is merged we should be able to dramatically simplify our test cleanup and see if anything breaks as a result (which would reveal more things our reset needs to do).
} else { | ||
final List<String> failedComponents = results.entrySet().stream() | ||
.filter(result -> result.getValue() == false) | ||
.map(Map.Entry::getKey) | ||
.collect(Collectors.toList()); | ||
listener.onFailure(new RuntimeException("Some components failed to reset: " + failedComponents)); | ||
unsetResetModeListener.onFailure(new RuntimeException("Some components failed to reset: " + failedComponents)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unsetResetModeListener.onFailure(new RuntimeException("Some components failed to reset: " + failedComponents)); | |
unsetResetModeListener.onFailure(new RuntimeException("Some machine learning components failed to reset: " + failedComponents)); |
Since the user could be resetting multiple features simultaneously, I think all the error responses should make clear they relate to ML.
failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeAction.Request.disabled(), ActionListener.wrap( | ||
resetSuccess -> finalListener.onFailure(failure), | ||
resetFailure -> { | ||
logger.warn("failed to disable reset mode after state clean up failure", resetFailure); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.warn("failed to disable reset mode after state clean up failure", resetFailure); | |
logger.error("failed to disable reset mode after machine learning reset failure", resetFailure); |
success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeAction.Request.disabled(), ActionListener.wrap( | ||
resetSuccess -> finalListener.onResponse(success), | ||
resetFailure -> { | ||
logger.warn("failed to disable reset mode after state clean up success", resetFailure); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.warn("failed to disable reset mode after state clean up success", resetFailure); | |
logger.error("failed to disable reset mode after otherwise successful machine learning reset", resetFailure); |
resetSuccess -> finalListener.onResponse(success), | ||
resetFailure -> { | ||
logger.warn("failed to disable reset mode after state clean up success", resetFailure); | ||
finalListener.onResponse(success); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should count as a failure, as it leaves ML in a sub-optimal state - there will be no notifications and no nightly cleanup. So we'd want the user to try the reset again to have another go at removing the reset-in-progress flag.
@@ -151,6 +151,10 @@ private void triggerTasks() { | |||
LOGGER.warn("skipping scheduled [ML] maintenance tasks because upgrade mode is enabled"); | |||
return; | |||
} | |||
if (MlMetadata.getMlMetadata(clusterService.state()).isResetMode()) { | |||
LOGGER.warn("skipping scheduled [ML] maintenance tasks because reset mode is enabled"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER.warn("skipping scheduled [ML] maintenance tasks because reset mode is enabled"); | |
LOGGER.warn("skipping scheduled [ML] maintenance tasks because machine learning feature reset is in progress"); |
(Because "reset mode" won't be a documented thing, but the "feature reset API" will be.)
case 2: | ||
metadataBuilder.isUpgradeMode(isUpgrade == false); | ||
break; | ||
case 3: | ||
metadataBuilder.isResetMode(isReset == false); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…ion (elastic#71011) This completes the machine learning feature state cleanup integration. This commit handles waiting for machine learning tasks to complete and adds a new field to the ML Metadata cluster state to indicate when a reset is in progress for machine learning. relates: elastic#70008
…tegration (#71011) (#71071) * [ML] complete machine learning plugin feature state clean up integration (#71011) This completes the machine learning feature state cleanup integration. This commit handles waiting for machine learning tasks to complete and adds a new field to the ML Metadata cluster state to indicate when a reset is in progress for machine learning. relates: #70008 * [ML] fixing feature reset integration tests (#71081) previously created pipelines referencing ML models were not being appropriately deleted in upstream tests. This commit ensures that machine learning removes relevant pipelines from cluster state after tests complete closes #71072
This completes the machine learning feature state cleanup integration.
This commit handles waiting for machine learning tasks to complete and adds a new
field to the ML Metadata cluster state to indicate when a reset is in progress for machine
learning.
relates: #70008