-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] Migrate unallocated jobs and datafeeds #37430
Conversation
Pinging @elastic/ml-core |
public static Set<String> unallocatedJobIds(@Nullable PersistentTasksCustomMetaData tasks, | ||
DiscoveryNodes nodes) { | ||
return unallocatedJobTasks(tasks, nodes).stream() | ||
.map(task ->task.getId().substring(JOB_TASK_ID_PREFIX.length())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#nit
.map(task ->task.getId().substring(JOB_TASK_ID_PREFIX.length())) | |
.map(task -> task.getId().substring(JOB_TASK_ID_PREFIX.length())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be the macbook butterfly keyboard. Thanks apple I don't need the space bar much
@Nullable PersistentTasksCustomMetaData tasks, | ||
DiscoveryNodes nodes) { | ||
if (tasks == null) { | ||
return Collections.emptySet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#nit so that the collection type is the same as the return below.
return Collections.emptySet(); | |
return Collections.emptyList(); |
@Nullable PersistentTasksCustomMetaData tasks, | ||
DiscoveryNodes nodes) { | ||
if (tasks == null) { | ||
return Collections.emptySet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#nit so that the collection type is the same as the return below.
return Collections.emptySet(); | |
return Collections.emptyList(); |
@@ -215,13 +221,17 @@ public void writeConfigToIndex(Collection<DatafeedConfig> datafeedsToMigrate, | |||
); | |||
} | |||
|
|||
private void removeFromClusterState(List<String> jobsToRemoveIds, List<String> datafeedsToRemoveIds, | |||
private void removeFromClusterState(List<Job> jobsToRemoveIds, List<DatafeedConfig> datafeedsToRemoveIds, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#nit naming change
private void removeFromClusterState(List<Job> jobsToRemoveIds, List<DatafeedConfig> datafeedsToRemoveIds, | |
private void removeFromClusterState(List<Job> jobsToRemove, List<DatafeedConfig> datafeedsToRemove, |
* @param nodes The nodes in the cluster | ||
* @return The argument {@code currentTasks} | ||
*/ | ||
public static PersistentTasksCustomMetaData rewritePersistentTaskParams(Map<String, Job> jobs, Map<String, DatafeedConfig>datafeeds, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static PersistentTasksCustomMetaData rewritePersistentTaskParams(Map<String, Job> jobs, Map<String, DatafeedConfig>datafeeds, | |
public static PersistentTasksCustomMetaData rewritePersistentTaskParams(Map<String, Job> jobs, Map<String, DatafeedConfig> datafeeds, |
clusterstate configs may be null
Job job = jobs.get(params.getJobId()); | ||
if (job != null) { | ||
logger.debug("updating persistent task params for job [{}]", params.getJobId()); | ||
params.setJob(job); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure it's acceptable to update the parameters on currentTasks
? I'm not convinced that it is, because it's changing the current local cluster state. It means this node's opinion of the cluster state version N
will be different from all the other nodes. And version N+1
may not even get broadcast to other nodes if the local node thinks it's identical to version N
.
Unless I'm mistaken, I think you need to build a new PersistentTasksCustomMetaData
object rather than updating the one from current state.
(Same problem for datafeeds immediately below. And we should look at making JobParams
and DatafeedParams
immutable like our other cluster state classes in a followup.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same reservations hence all the javadoc comments. There is an assertion in the full cluster restart test that the task params are actually updated so it does appear to work.
Line 236 in a3d0c2c
Object jobParam = XContentMapValues.extractValue("task.xpack/ml/job.params.job", task); |
However reviewing PersistentTasksCustomMetaData
uses the default implementation of AbstractNamedDiffable.readDiffFrom
which returns the entire object and the TODO here https://github.com/elastic/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java#L66 suggests that will not always be the case. Good catch I can't remember why I didn't use the builder in the first place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I pushed a change to use the task builder and modify a copy of the task params. Making |
Migrate ml job and datafeed config of open jobs and update the parameters of the persistent tasks as they become unallocated during a rolling upgrade. Block allocation of ml persistent tasks until the configs are migrated.
The final step of the process of migrating ml configs in #32905. This change migrates jobs & datafeed configs of open jobs once the persistent task becomes unallocated, the persistent task parameters must be updated for open jobs.
OpenJobPersistentTasksExecutor.getAssignment
will not assign a task that has the missing parametersMlConfigMigrator
to also migrate the configs of unallocated jobs & datafeedsI've made this a non issue for the docs as the backport in #37536 will document the change