diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 7a057f67eb390..e4b0940803271 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -85,13 +85,14 @@ public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer datafeedJobHandler = ActionListener.wrap( datafeedJob -> { + String jobId = datafeedJob.getJobId(); Holder holder = new Holder(task, datafeedId, datafeedJob, - new ProblemTracker(auditor, datafeedJob.getJobId()), finishHandler); + new ProblemTracker(auditor, jobId), finishHandler); runningDatafeedsOnThisNode.put(task.getAllocationId(), holder); task.updatePersistentTaskState(DatafeedState.STARTED, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { - taskRunner.runWhenJobIsOpened(task); + taskRunner.runWhenJobIsOpened(task, jobId); } @Override @@ -267,17 +268,23 @@ protected void doRun() { } } - private String getJobId(TransportStartDatafeedAction.DatafeedTask task) { - return runningDatafeedsOnThisNode.get(task.getAllocationId()).getJobId(); + /** + * Returns null if the datafeed is not running on this node. + */ + private String getJobIdIfDatafeedRunningOnThisNode(TransportStartDatafeedAction.DatafeedTask task) { + Holder holder = runningDatafeedsOnThisNode.get(task.getAllocationId()); + if (holder == null) { + return null; + } + return holder.getJobId(); } - private JobState getJobState(PersistentTasksCustomMetaData tasks, TransportStartDatafeedAction.DatafeedTask datafeedTask) { - return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), tasks); + private JobState getJobState(PersistentTasksCustomMetaData tasks, String jobId) { + return MlTasks.getJobStateModifiedForReassignments(jobId, tasks); } - private boolean jobHasOpenAutodetectCommunicator(PersistentTasksCustomMetaData tasks, - TransportStartDatafeedAction.DatafeedTask datafeedTask) { - PersistentTasksCustomMetaData.PersistentTask jobTask = MlTasks.getJobTask(getJobId(datafeedTask), tasks); + private boolean jobHasOpenAutodetectCommunicator(PersistentTasksCustomMetaData tasks, String jobId) { + PersistentTasksCustomMetaData.PersistentTask jobTask = MlTasks.getJobTask(jobId, tasks); if (jobTask == null) { return false; } @@ -492,14 +499,14 @@ private class TaskRunner implements ClusterStateListener { private final List tasksToRun = new CopyOnWriteArrayList<>(); - private void runWhenJobIsOpened(TransportStartDatafeedAction.DatafeedTask datafeedTask) { + private void runWhenJobIsOpened(TransportStartDatafeedAction.DatafeedTask datafeedTask, String jobId) { ClusterState clusterState = clusterService.state(); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (getJobState(tasks, datafeedTask) == JobState.OPENED && jobHasOpenAutodetectCommunicator(tasks, datafeedTask)) { + if (getJobState(tasks, jobId) == JobState.OPENED && jobHasOpenAutodetectCommunicator(tasks, jobId)) { runTask(datafeedTask); } else { logger.info("Datafeed [{}] is waiting for job [{}] to be opened", - datafeedTask.getDatafeedId(), getJobId(datafeedTask)); + datafeedTask.getDatafeedId(), jobId); tasksToRun.add(datafeedTask); } } @@ -530,17 +537,19 @@ public void clusterChanged(ClusterChangedEvent event) { List remainingTasks = new ArrayList<>(); for (TransportStartDatafeedAction.DatafeedTask datafeedTask : tasksToRun) { - if (runningDatafeedsOnThisNode.containsKey(datafeedTask.getAllocationId()) == false) { + String jobId = getJobIdIfDatafeedRunningOnThisNode(datafeedTask); + if (jobId == null) { + // Datafeed is not running on this node any more continue; } - JobState jobState = getJobState(currentTasks, datafeedTask); - if (jobState == JobState.OPENING || jobHasOpenAutodetectCommunicator(currentTasks, datafeedTask) == false) { + JobState jobState = getJobState(currentTasks, jobId); + if (jobState == JobState.OPENING || jobHasOpenAutodetectCommunicator(currentTasks, jobId) == false) { remainingTasks.add(datafeedTask); } else if (jobState == JobState.OPENED) { runTask(datafeedTask); } else { logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]", - datafeedTask.getDatafeedId(), getJobId(datafeedTask), jobState); + datafeedTask.getDatafeedId(), jobId, jobState); datafeedTask.stop("job_never_opened", TimeValue.timeValueSeconds(20)); } }