diff --git a/plugin/src/main/java/org/opensearch/ml/action/syncup/TransportSyncUpOnNodeAction.java b/plugin/src/main/java/org/opensearch/ml/action/syncup/TransportSyncUpOnNodeAction.java index e705bdb7cd..3afb2cff51 100644 --- a/plugin/src/main/java/org/opensearch/ml/action/syncup/TransportSyncUpOnNodeAction.java +++ b/plugin/src/main/java/org/opensearch/ml/action/syncup/TransportSyncUpOnNodeAction.java @@ -198,6 +198,10 @@ void cleanUpLocalCache(Map> runningDeployModelTasks) { } for (String taskId : allTaskIds) { MLTaskCache mlTaskCache = mlTaskManager.getMLTaskCache(taskId); + // Task could be a prediction task, and it could be completed and removed from cache in predict thread during the cleaning up. + if (mlTaskCache == null) { + continue; + } MLTask mlTask = mlTaskCache.getMlTask(); Instant lastUpdateTime = mlTask.getLastUpdateTime(); Instant now = Instant.now(); diff --git a/plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java b/plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java index 9e9dea5d22..ca5b5b0abb 100644 --- a/plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java +++ b/plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java @@ -117,7 +117,7 @@ public synchronized void add(MLTask mlTask, List workerNodes) { throw new IllegalArgumentException("Duplicate taskId"); } taskCaches.put(taskId, new MLTaskCache(mlTask, workerNodes)); - log.debug("add ML task to cache " + taskId); + log.debug("add ML task to cache, taskId: {}, taskType: {} ", taskId, mlTask.getTaskType()); } /**