diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java
index 134c391b5df21..07d2e4f442035 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java
@@ -18,7 +18,9 @@
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -39,6 +41,8 @@
 import java.util.Map;
 import java.util.Objects;
 
+import static org.elasticsearch.core.Strings.format;
+
 public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAnalyticsStatsAction.Response> {
 
     public static final GetDataFrameAnalyticsStatsAction INSTANCE = new GetDataFrameAnalyticsStatsAction();
@@ -141,6 +145,11 @@ public boolean equals(Object obj) {
             Request other = (Request) obj;
             return Objects.equals(id, other.id) && allowNoMatch == other.allowNoMatch && Objects.equals(pageParams, other.pageParams);
         }
+
+        @Override
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, format("get_data_frame_analytics_stats[%s]", id), parentTaskId, headers);
+        }
     }
 
     public static class Response extends BaseTasksResponse implements ToXContentObject {
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java
index e24e12b850f94..09c116737aa09 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedRunningStateAction.java
@@ -15,7 +15,9 @@
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.core.ml.MlTasks;
@@ -29,6 +31,8 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.core.Strings.format;
+
 /**
  * Internal only action to get the current running state of a datafeed
  */
@@ -68,6 +72,11 @@ public Set<String> getDatafeedTaskIds() {
         public boolean match(Task task) {
             return task instanceof StartDatafeedAction.DatafeedTaskMatcher && datafeedTaskIds.contains(task.getDescription());
         }
+
+        @Override
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, format("get_datafeed_running_state[%s]", datafeedTaskIds), parentTaskId, headers);
+        }
     }
 
     public static class Response extends BaseTasksResponse {
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java
index a860cb863ba33..99ec6205e831e 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java
@@ -16,6 +16,9 @@
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.core.action.AbstractGetResourcesResponse;
@@ -36,6 +39,8 @@
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.core.Strings.format;
+
 public class GetDatafeedsStatsAction extends ActionType<GetDatafeedsStatsAction.Response> {
 
     public static final GetDatafeedsStatsAction INSTANCE = new GetDatafeedsStatsAction();
@@ -114,6 +119,11 @@ public boolean equals(Object obj) {
             Request other = (Request) obj;
             return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(allowNoMatch, other.allowNoMatch);
         }
+
+        @Override
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, format("get_datafeed_stats[%s]", datafeedId), parentTaskId, headers);
+        }
     }
 
     public static class Response extends AbstractGetResourcesResponse<Response.DatafeedStats> implements ToXContentObject {
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobModelSnapshotsUpgradeStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobModelSnapshotsUpgradeStatsAction.java
index 89da3f21bde63..44502531afc4b 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobModelSnapshotsUpgradeStatsAction.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobModelSnapshotsUpgradeStatsAction.java
@@ -15,6 +15,9 @@
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -28,6 +31,7 @@
 import java.util.Map;
 import java.util.Objects;
 
+import static org.elasticsearch.core.Strings.format;
 import static org.elasticsearch.xpack.core.ml.action.UpgradeJobModelSnapshotAction.Request.SNAPSHOT_ID;
 
 public class GetJobModelSnapshotsUpgradeStatsAction extends ActionType<GetJobModelSnapshotsUpgradeStatsAction.Response> {
@@ -115,6 +119,18 @@ public boolean equals(Object obj) {
                 && Objects.equals(snapshotId, other.snapshotId)
                 && Objects.equals(allowNoMatch, other.allowNoMatch);
         }
+
+        @Override
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(
+                id,
+                type,
+                action,
+                format("get_job_model_snapshot_upgrade_stats[%s:%s]", jobId, snapshotId),
+                parentTaskId,
+                headers
+            );
+        }
     }
 
     public static class Response extends AbstractGetResourcesResponse<Response.JobModelSnapshotUpgradeStats> implements ToXContentObject {
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java
index 0a83672c141af..8ecef84b34066 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java
@@ -19,7 +19,9 @@
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.core.action.util.QueryPage;
@@ -38,6 +40,8 @@
 import java.util.Map;
 import java.util.Objects;
 
+import static org.elasticsearch.core.Strings.format;
+
 public class GetJobsStatsAction extends ActionType<GetJobsStatsAction.Response> {
 
     public static final GetJobsStatsAction INSTANCE = new GetJobsStatsAction();
@@ -134,6 +138,11 @@ public boolean equals(Object obj) {
                 && Objects.equals(allowNoMatch, other.allowNoMatch)
                 && Objects.equals(getTimeout(), other.getTimeout());
         }
+
+        @Override
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, format("get_job_stats[%s]", id), parentTaskId, headers);
+        }
     }
 
     public static class Response extends BaseTasksResponse implements ToXContentObject {
diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java
index 6792d7927d346..477d827b3417a 100644
--- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java
+++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java
@@ -248,9 +248,10 @@ public void testUpdateWithValidatorFunctionThatErrors() throws Exception {
         List<String> updateIndices = Collections.singletonList("a-different-index");
         update.setIndices(updateIndices);
 
-        BiConsumer<DatafeedConfig, ActionListener<Boolean>> validateErrorFunction = (updatedConfig, listener) -> {
-            new Thread(() -> listener.onFailure(new IllegalArgumentException("this is a bad update")), getTestName()).start();
-        };
+        BiConsumer<DatafeedConfig, ActionListener<Boolean>> validateErrorFunction = (updatedConfig, listener) -> new Thread(
+            () -> listener.onFailure(new IllegalArgumentException("this is a bad update")),
+            getTestName()
+        ).start();
 
         AtomicReference<DatafeedConfig> configHolder = new AtomicReference<>();
         AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
@@ -277,7 +278,7 @@ public void testAllowNoMatch() throws InterruptedException {
         AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
 
         blockingCall(
-            actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", false, null, false, actionListener),
+            actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", false, null, false, null, actionListener),
             datafeedIdsHolder,
             exceptionHolder
         );
@@ -289,7 +290,7 @@ public void testAllowNoMatch() throws InterruptedException {
 
         exceptionHolder.set(null);
         blockingCall(
-            actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", true, null, false, actionListener),
+            actionListener -> datafeedConfigProvider.expandDatafeedIds("_all", true, null, false, null, actionListener),
             datafeedIdsHolder,
             exceptionHolder
         );
@@ -298,7 +299,7 @@ public void testAllowNoMatch() throws InterruptedException {
 
         AtomicReference<List<DatafeedConfig.Builder>> datafeedsHolder = new AtomicReference<>();
         blockingCall(
-            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", false, actionListener),
+            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", false, null, actionListener),
             datafeedsHolder,
             exceptionHolder
         );
@@ -310,7 +311,7 @@ public void testAllowNoMatch() throws InterruptedException {
 
         exceptionHolder.set(null);
         blockingCall(
-            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, actionListener),
+            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, null, actionListener),
             datafeedsHolder,
             exceptionHolder
         );
@@ -329,27 +330,33 @@ public void testExpandDatafeeds() throws Exception {
 
         // Test datafeed IDs only
         SortedSet<String> expandedIds = blockingCall(
-            actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", true, null, false, actionListener)
+            actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", true, null, false, null, actionListener)
         );
         assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds);
 
-        expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("*-1", true, null, false, actionListener));
+        expandedIds = blockingCall(
+            actionListener -> datafeedConfigProvider.expandDatafeedIds("*-1", true, null, false, null, actionListener)
+        );
         assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1")), expandedIds);
 
-        expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar*", true, null, false, actionListener));
+        expandedIds = blockingCall(
+            actionListener -> datafeedConfigProvider.expandDatafeedIds("bar*", true, null, false, null, actionListener)
+        );
         assertEquals(new TreeSet<>(Arrays.asList("bar-1", "bar-2")), expandedIds);
 
-        expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("b*r-1", true, null, false, actionListener));
+        expandedIds = blockingCall(
+            actionListener -> datafeedConfigProvider.expandDatafeedIds("b*r-1", true, null, false, null, actionListener)
+        );
         assertEquals(new TreeSet<>(Collections.singletonList("bar-1")), expandedIds);
 
         expandedIds = blockingCall(
-            actionListener -> datafeedConfigProvider.expandDatafeedIds("bar-1,foo*", true, null, false, actionListener)
+            actionListener -> datafeedConfigProvider.expandDatafeedIds("bar-1,foo*", true, null, false, null, actionListener)
         );
         assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1", "foo-2")), expandedIds);
 
         // Test full datafeed config
         List<DatafeedConfig.Builder> expandedDatafeedBuilders = blockingCall(
-            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("foo*", true, actionListener)
+            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("foo*", true, null, actionListener)
         );
         List<DatafeedConfig> expandedDatafeeds = expandedDatafeedBuilders.stream()
             .map(DatafeedConfig.Builder::build)
@@ -357,25 +364,25 @@ public void testExpandDatafeeds() throws Exception {
         assertThat(expandedDatafeeds, containsInAnyOrder(foo1, foo2));
 
         expandedDatafeedBuilders = blockingCall(
-            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*-1", true, actionListener)
+            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*-1", true, null, actionListener)
         );
         expandedDatafeeds = expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList());
         assertThat(expandedDatafeeds, containsInAnyOrder(foo1, bar1));
 
         expandedDatafeedBuilders = blockingCall(
-            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("bar*", true, actionListener)
+            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("bar*", true, null, actionListener)
         );
         expandedDatafeeds = expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList());
         assertThat(expandedDatafeeds, containsInAnyOrder(bar1, bar2));
 
         expandedDatafeedBuilders = blockingCall(
-            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("b*r-1", true, actionListener)
+            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("b*r-1", true, null, actionListener)
         );
         expandedDatafeeds = expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList());
         assertThat(expandedDatafeeds, containsInAnyOrder(bar1));
 
         expandedDatafeedBuilders = blockingCall(
-            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("bar-1,foo*", true, actionListener)
+            actionListener -> datafeedConfigProvider.expandDatafeedConfigs("bar-1,foo*", true, null, actionListener)
         );
         expandedDatafeeds = expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList());
         assertThat(expandedDatafeeds, containsInAnyOrder(bar1, foo1, foo2));
@@ -398,12 +405,12 @@ public void testExpandDatafeedsWithTaskData() throws Exception {
         AtomicReference<SortedSet<String>> datafeedIdsHolder = new AtomicReference<>();
         // Test datafeed IDs only
         SortedSet<String> expandedIds = blockingCall(
-            actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", false, tasks, true, actionListener)
+            actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", false, tasks, true, null, actionListener)
         );
         assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds);
 
         blockingCall(
-            actionListener -> datafeedConfigProvider.expandDatafeedIds("foo-1*,foo-2*", false, tasks, false, actionListener),
+            actionListener -> datafeedConfigProvider.expandDatafeedIds("foo-1*,foo-2*", false, tasks, false, null, actionListener),
             datafeedIdsHolder,
             exceptionHolder
         );
diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java
index f842ba13d7564..3099b1e0d8c50 100644
--- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java
+++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java
@@ -237,7 +237,7 @@ public void testUpdateWithValidator() throws Exception {
 
         JobUpdate jobUpdate = new JobUpdate.Builder(jobId).setDescription("This job has been updated").build();
 
-        JobConfigProvider.UpdateValidator validator = (job, update, listener) -> { listener.onResponse(null); };
+        JobConfigProvider.UpdateValidator validator = (job, update, listener) -> listener.onResponse(null);
 
         AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
         AtomicReference<Job> updateJobResponseHolder = new AtomicReference<>();
@@ -258,9 +258,9 @@ public void testUpdateWithValidator() throws Exception {
         assertNotNull(updateJobResponseHolder.get());
         assertEquals("This job has been updated", updateJobResponseHolder.get().getDescription());
 
-        JobConfigProvider.UpdateValidator validatorWithAnError = (job, update, listener) -> {
-            listener.onFailure(new IllegalStateException("I don't like this update"));
-        };
+        JobConfigProvider.UpdateValidator validatorWithAnError = (job, update, listener) -> listener.onFailure(
+            new IllegalStateException("I don't like this update")
+        );
 
         updateJobResponseHolder.set(null);
         // Update with a validator that errors
@@ -287,7 +287,7 @@ public void testAllowNoMatch() throws InterruptedException {
         AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
 
         blockingCall(
-            actionListener -> jobConfigProvider.expandJobsIds("_all", false, true, null, false, actionListener),
+            actionListener -> jobConfigProvider.expandJobsIds("_all", false, true, null, false, null, actionListener),
             jobIdsHolder,
             exceptionHolder
         );
@@ -299,7 +299,7 @@ public void testAllowNoMatch() throws InterruptedException {
 
         exceptionHolder.set(null);
         blockingCall(
-            actionListener -> jobConfigProvider.expandJobsIds("_all", true, false, null, false, actionListener),
+            actionListener -> jobConfigProvider.expandJobsIds("_all", true, false, null, false, null, actionListener),
             jobIdsHolder,
             exceptionHolder
         );
@@ -307,7 +307,7 @@ public void testAllowNoMatch() throws InterruptedException {
         assertNull(exceptionHolder.get());
 
         AtomicReference<List<Job.Builder>> jobsHolder = new AtomicReference<>();
-        blockingCall(actionListener -> jobConfigProvider.expandJobs("*", false, true, actionListener), jobsHolder, exceptionHolder);
+        blockingCall(actionListener -> jobConfigProvider.expandJobs("*", false, true, null, actionListener), jobsHolder, exceptionHolder);
 
         assertNull(jobsHolder.get());
         assertNotNull(exceptionHolder.get());
@@ -315,7 +315,7 @@ public void testAllowNoMatch() throws InterruptedException {
         assertThat(exceptionHolder.get().getMessage(), containsString("No known job with id"));
 
         exceptionHolder.set(null);
-        blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener), jobsHolder, exceptionHolder);
+        blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, null, actionListener), jobsHolder, exceptionHolder);
         assertNotNull(jobsHolder.get());
         assertNull(exceptionHolder.get());
     }
@@ -330,27 +330,27 @@ public void testExpandJobs_GroupsAndJobIds() throws Exception {
 
         // Job Ids
         SortedSet<String> expandedIds = blockingCall(
-            actionListener -> jobConfigProvider.expandJobsIds("_all", true, false, null, false, actionListener)
+            actionListener -> jobConfigProvider.expandJobsIds("_all", true, false, null, false, null, actionListener)
         );
         assertEquals(new TreeSet<>(Arrays.asList("tom", "dick", "harry", "harry-jnr")), expandedIds);
 
-        expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, null, false, actionListener));
+        expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, null, false, null, actionListener));
         assertEquals(new TreeSet<>(Arrays.asList("tom", "dick", "harry", "harry-jnr")), expandedIds);
 
         expandedIds = blockingCall(
-            actionListener -> jobConfigProvider.expandJobsIds("tom,harry", true, false, null, false, actionListener)
+            actionListener -> jobConfigProvider.expandJobsIds("tom,harry", true, false, null, false, null, actionListener)
         );
         assertEquals(new TreeSet<>(Arrays.asList("tom", "harry")), expandedIds);
 
         expandedIds = blockingCall(
-            actionListener -> jobConfigProvider.expandJobsIds("harry-group,tom", true, false, null, false, actionListener)
+            actionListener -> jobConfigProvider.expandJobsIds("harry-group,tom", true, false, null, false, null, actionListener)
         );
         assertEquals(new TreeSet<>(Arrays.asList("harry", "harry-jnr", "tom")), expandedIds);
 
         AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
         AtomicReference<SortedSet<String>> jobIdsHolder = new AtomicReference<>();
         blockingCall(
-            actionListener -> jobConfigProvider.expandJobsIds("tom,missing1,missing2", true, false, null, false, actionListener),
+            actionListener -> jobConfigProvider.expandJobsIds("tom,missing1,missing2", true, false, null, false, null, actionListener),
             jobIdsHolder,
             exceptionHolder
         );
@@ -361,26 +361,28 @@ public void testExpandJobs_GroupsAndJobIds() throws Exception {
 
         // Job builders
         List<Job.Builder> expandedJobsBuilders = blockingCall(
-            actionListener -> jobConfigProvider.expandJobs("harry-group,tom", false, true, actionListener)
+            actionListener -> jobConfigProvider.expandJobs("harry-group,tom", false, true, null, actionListener)
         );
         List<Job> expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
         assertThat(expandedJobs, containsInAnyOrder(harry, harryJnr, tom));
 
-        expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("_all", false, true, actionListener));
+        expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("_all", false, true, null, actionListener));
         expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
         assertThat(expandedJobs, containsInAnyOrder(tom, dick, harry, harryJnr));
 
-        expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("tom,harry", false, false, actionListener));
+        expandedJobsBuilders = blockingCall(
+            actionListener -> jobConfigProvider.expandJobs("tom,harry", false, false, null, actionListener)
+        );
         expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
         assertThat(expandedJobs, containsInAnyOrder(tom, harry));
 
-        expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("", false, false, actionListener));
+        expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("", false, false, null, actionListener));
         expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
         assertThat(expandedJobs, containsInAnyOrder(tom, dick, harry, harryJnr));
 
         AtomicReference<List<Job.Builder>> jobsHolder = new AtomicReference<>();
         blockingCall(
-            actionListener -> jobConfigProvider.expandJobs("tom,missing1,missing2", false, true, actionListener),
+            actionListener -> jobConfigProvider.expandJobs("tom,missing1,missing2", false, true, null, actionListener),
             jobsHolder,
             exceptionHolder
         );
@@ -401,35 +403,39 @@ public void testExpandJobs_WildCardExpansion() throws Exception {
 
         // Test job IDs only
         SortedSet<String> expandedIds = blockingCall(
-            actionListener -> jobConfigProvider.expandJobsIds("foo*", true, true, null, false, actionListener)
+            actionListener -> jobConfigProvider.expandJobsIds("foo*", true, true, null, false, null, actionListener)
         );
         assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds);
 
-        expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*-1", true, true, null, false, actionListener));
+        expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*-1", true, true, null, false, null, actionListener));
         assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1")), expandedIds);
 
-        expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("bar*", true, true, null, false, actionListener));
+        expandedIds = blockingCall(
+            actionListener -> jobConfigProvider.expandJobsIds("bar*", true, true, null, false, null, actionListener)
+        );
         assertEquals(new TreeSet<>(Arrays.asList("bar-1", "bar-2", "nbar")), expandedIds);
 
-        expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("b*r-1", true, true, null, false, actionListener));
+        expandedIds = blockingCall(
+            actionListener -> jobConfigProvider.expandJobsIds("b*r-1", true, true, null, false, null, actionListener)
+        );
         assertEquals(new TreeSet<>(Collections.singletonList("bar-1")), expandedIds);
 
         // Test full job config
         List<Job.Builder> expandedJobsBuilders = blockingCall(
-            actionListener -> jobConfigProvider.expandJobs("foo*", true, true, actionListener)
+            actionListener -> jobConfigProvider.expandJobs("foo*", true, true, null, actionListener)
         );
         List<Job> expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
         assertThat(expandedJobs, containsInAnyOrder(foo1, foo2));
 
-        expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("*-1", true, true, actionListener));
+        expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("*-1", true, true, null, actionListener));
         expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
         assertThat(expandedJobs, containsInAnyOrder(foo1, bar1));
 
-        expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("bar*", true, true, actionListener));
+        expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("bar*", true, true, null, actionListener));
         expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
         assertThat(expandedJobs, containsInAnyOrder(bar1, bar2, nbar));
 
-        expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("b*r-1", true, true, actionListener));
+        expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("b*r-1", true, true, null, actionListener));
         expandedJobs = expandedJobsBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
         assertThat(expandedJobs, containsInAnyOrder(bar1));
     }
@@ -452,25 +458,27 @@ public void testExpandJobIds_excludeDeleting() throws Exception {
         client().admin().indices().prepareRefresh(MlConfigIndex.indexName()).get();
 
         SortedSet<String> expandedIds = blockingCall(
-            actionListener -> jobConfigProvider.expandJobsIds("foo*", true, true, null, false, actionListener)
+            actionListener -> jobConfigProvider.expandJobsIds("foo*", true, true, null, false, null, actionListener)
         );
         assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds);
 
-        expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("foo*", true, false, null, false, actionListener));
+        expandedIds = blockingCall(
+            actionListener -> jobConfigProvider.expandJobsIds("foo*", true, false, null, false, null, actionListener)
+        );
         assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2", "foo-deleting")), expandedIds);
 
-        expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, null, false, actionListener));
+        expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, true, null, false, null, actionListener));
         assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2", "bar")), expandedIds);
 
-        expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, false, null, false, actionListener));
+        expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIds("*", true, false, null, false, null, actionListener));
         assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2", "foo-deleting", "bar")), expandedIds);
 
         List<Job.Builder> expandedJobsBuilders = blockingCall(
-            actionListener -> jobConfigProvider.expandJobs("foo*", true, true, actionListener)
+            actionListener -> jobConfigProvider.expandJobs("foo*", true, true, null, actionListener)
         );
         assertThat(expandedJobsBuilders, hasSize(2));
 
-        expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("foo*", true, false, actionListener));
+        expandedJobsBuilders = blockingCall(actionListener -> jobConfigProvider.expandJobs("foo*", true, false, null, actionListener));
         assertThat(expandedJobsBuilders, hasSize(3));
     }
 
@@ -494,12 +502,12 @@ public void testExpandJobIdsWithTaskData() throws Exception {
         AtomicReference<SortedSet<String>> jobIdsHolder = new AtomicReference<>();
         // Test job IDs only
         SortedSet<String> expandedIds = blockingCall(
-            actionListener -> jobConfigProvider.expandJobsIds("foo*", false, false, tasks, true, actionListener)
+            actionListener -> jobConfigProvider.expandJobsIds("foo*", false, false, tasks, true, null, actionListener)
         );
         assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds);
 
         blockingCall(
-            actionListener -> jobConfigProvider.expandJobsIds("foo-1*,foo-2*", false, false, tasks, false, actionListener),
+            actionListener -> jobConfigProvider.expandJobsIds("foo-1*,foo-2*", false, false, tasks, false, null, actionListener),
             jobIdsHolder,
             exceptionHolder
         );
diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java
index 933a1e6d1edc4..168e20fb93c09 100644
--- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java
+++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java
@@ -600,7 +600,7 @@ private void getDataCountsModelSizeAndTimingStats(
         Consumer<Exception> exceptionConsumer
     ) throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
-        jobProvider.getDataCountsModelSizeAndTimingStats(jobId, (dataCounts, modelSizeStats, timingStats) -> {
+        jobProvider.getDataCountsModelSizeAndTimingStats(jobId, null, (dataCounts, modelSizeStats, timingStats) -> {
             dataCountsConsumer.accept(dataCounts);
             modelSizeStatsConsumer.accept(modelSizeStats);
             timingStatsConsumer.accept(timingStats);
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCancelJobModelSnapshotUpgradeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCancelJobModelSnapshotUpgradeAction.java
index cefdff1870f87..fa8523b0c77a0 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCancelJobModelSnapshotUpgradeAction.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCancelJobModelSnapshotUpgradeAction.java
@@ -82,7 +82,7 @@ public void doExecute(Task task, Request request, ActionListener<Response> liste
         }, listener::onFailure);
 
         // 1. Expand jobs - this will throw if a required job ID match isn't made. Jobs being deleted are included here.
-        jobConfigProvider.expandJobs(request.getJobId(), request.allowNoMatch(), false, expandIdsListener);
+        jobConfigProvider.expandJobs(request.getJobId(), request.allowNoMatch(), false, null, expandIdsListener);
     }
 
     private void removePersistentTasks(
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java
index 10a50568b2f5d..0400418039bf1 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java
@@ -154,6 +154,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen
                 true,
                 tasksMetadata,
                 isForce,
+                null,
                 ActionListener.wrap(
                     expandedJobIds -> validate(
                         expandedJobIds,
@@ -340,7 +341,7 @@ private void stopDatafeeds(List<String> runningDatafeedIds, boolean isForce, Tim
 
     void isolateDatafeeds(List<String> openJobs, List<String> runningDatafeedIds, ActionListener<Void> listener) {
 
-        GroupedActionListener<IsolateDatafeedAction.Response> groupedListener = new GroupedActionListener<IsolateDatafeedAction.Response>(
+        GroupedActionListener<IsolateDatafeedAction.Response> groupedListener = new GroupedActionListener<>(
             ActionListener.wrap(c -> listener.onResponse(null), e -> {
                 // This is deliberately NOT an error. The reasoning is as follows:
                 // - Isolate datafeed just sets a flag on the datafeed, so cannot fail IF it reaches the running datafeed code
@@ -507,51 +508,48 @@ private void forceCloseJob(
             PersistentTasksCustomMetadata.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
             if (jobTask != null) {
                 auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
-                persistentTasksService.sendRemoveRequest(
-                    jobTask.getId(),
-                    new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() {
-                        @Override
-                        public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
-                            if (counter.incrementAndGet() == numberOfJobs) {
-                                sendResponseOrFailure(request.getJobId(), listener, failures);
-                            }
+                persistentTasksService.sendRemoveRequest(jobTask.getId(), new ActionListener<>() {
+                    @Override
+                    public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> task) {
+                        if (counter.incrementAndGet() == numberOfJobs) {
+                            sendResponseOrFailure(request.getJobId(), listener, failures);
                         }
+                    }
 
-                        @Override
-                        public void onFailure(Exception e) {
-                            final int slot = counter.incrementAndGet();
-                            if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
-                                failures.set(slot - 1, e);
-                            }
-                            if (slot == numberOfJobs) {
-                                sendResponseOrFailure(request.getJobId(), listener, failures);
-                            }
+                    @Override
+                    public void onFailure(Exception e) {
+                        final int slot = counter.incrementAndGet();
+                        if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
+                            failures.set(slot - 1, e);
                         }
+                        if (slot == numberOfJobs) {
+                            sendResponseOrFailure(request.getJobId(), listener, failures);
+                        }
+                    }
 
-                        private void sendResponseOrFailure(
-                            String jobId,
-                            ActionListener<CloseJobAction.Response> listener,
-                            AtomicArray<Exception> failures
-                        ) {
-                            List<Exception> caughtExceptions = failures.asList();
-                            if (caughtExceptions.size() == 0) {
-                                listener.onResponse(new CloseJobAction.Response(true));
-                                return;
-                            }
-
-                            String msg = "Failed to force close job ["
-                                + jobId
-                                + "] with ["
-                                + caughtExceptions.size()
-                                + "] failures, rethrowing last, all Exceptions: ["
-                                + caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
-                                + "]";
-
-                            ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0));
-                            listener.onFailure(e);
+                    private void sendResponseOrFailure(
+                        String jobId,
+                        ActionListener<CloseJobAction.Response> listener,
+                        AtomicArray<Exception> failures
+                    ) {
+                        List<Exception> caughtExceptions = failures.asList();
+                        if (caughtExceptions.size() == 0) {
+                            listener.onResponse(new CloseJobAction.Response(true));
+                            return;
                         }
+
+                        String msg = "Failed to force close job ["
+                            + jobId
+                            + "] with ["
+                            + caughtExceptions.size()
+                            + "] failures, rethrowing last, all Exceptions: ["
+                            + caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+                            + "]";
+
+                        ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0));
+                        listener.onFailure(e);
                     }
-                );
+                });
             }
         }
     }
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java
index ad368dd4b8be0..a6c39a971fd31 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java
@@ -139,7 +139,7 @@ protected void doExecute(
             threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
                 .execute(() -> deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier));
         } else {
-            jobConfigProvider.expandJobs(request.getJobId(), false, true, ActionListener.wrap(jobBuilders -> {
+            jobConfigProvider.expandJobs(request.getJobId(), false, true, null, ActionListener.wrap(jobBuilders -> {
                 threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
                     List<Job> jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
                     String[] jobIds = jobs.stream().map(Job::getId).toArray(String[]::new);
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java
index dc7d41617e0dd..ac585ac22b81c 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java
@@ -31,6 +31,7 @@
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.action.util.QueryPage;
@@ -112,7 +113,7 @@ protected GetDataFrameAnalyticsStatsAction.Response newResponse(
         for (QueryPage<Stats> task : tasks) {
             stats.addAll(task.results());
         }
-        Collections.sort(stats, Comparator.comparing(Stats::getId));
+        stats.sort(Comparator.comparing(Stats::getId));
         return new GetDataFrameAnalyticsStatsAction.Response(
             taskFailures,
             nodeFailures,
@@ -157,6 +158,7 @@ protected void doExecute(
         GetDataFrameAnalyticsStatsAction.Request request,
         ActionListener<GetDataFrameAnalyticsStatsAction.Response> listener
     ) {
+        TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
         logger.debug("Get stats for data frame analytics [{}]", request.getId());
 
         ActionListener<GetDataFrameAnalyticsAction.Response> getResponseListener = ActionListener.wrap(getResponse -> {
@@ -170,6 +172,7 @@ protected void doExecute(
                 runningTasksStatsResponse -> gatherStatsForStoppedTasks(
                     getResponse.getResources().results(),
                     runningTasksStatsResponse,
+                    parentTaskId,
                     ActionListener.wrap(finalResponse -> {
 
                         // While finalResponse has all the stats objects we need, we should report the count
@@ -191,12 +194,14 @@ protected void doExecute(
         getRequest.setResourceId(request.getId());
         getRequest.setAllowNoResources(request.isAllowNoMatch());
         getRequest.setPageParams(request.getPageParams());
+        getRequest.setParentTask(parentTaskId);
         executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsAction.INSTANCE, getRequest, getResponseListener);
     }
 
     void gatherStatsForStoppedTasks(
         List<DataFrameAnalyticsConfig> configs,
         GetDataFrameAnalyticsStatsAction.Response runningTasksResponse,
+        TaskId parentTaskId,
         ActionListener<GetDataFrameAnalyticsStatsAction.Response> listener
     ) {
         List<DataFrameAnalyticsConfig> stoppedConfigs = determineStoppedConfigs(configs, runningTasksResponse.getResponse().results());
@@ -211,7 +216,7 @@ void gatherStatsForStoppedTasks(
         for (int i = 0; i < stoppedConfigs.size(); i++) {
             final int slot = i;
             DataFrameAnalyticsConfig config = stoppedConfigs.get(i);
-            searchStats(config, ActionListener.wrap(stats -> {
+            searchStats(config, parentTaskId, ActionListener.wrap(stats -> {
                 jobStats.set(slot, stats);
                 if (counter.decrementAndGet() == 0) {
                     if (searchException.get() != null) {
@@ -220,7 +225,7 @@ void gatherStatsForStoppedTasks(
                     }
                     List<Stats> allTasksStats = new ArrayList<>(runningTasksResponse.getResponse().results());
                     allTasksStats.addAll(jobStats.asList());
-                    Collections.sort(allTasksStats, Comparator.comparing(Stats::getId));
+                    allTasksStats.sort(Comparator.comparing(Stats::getId));
                     listener.onResponse(
                         new GetDataFrameAnalyticsStatsAction.Response(
                             new QueryPage<>(allTasksStats, allTasksStats.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)
@@ -242,7 +247,7 @@ static List<DataFrameAnalyticsConfig> determineStoppedConfigs(List<DataFrameAnal
         return configs.stream().filter(config -> startedTasksIds.contains(config.getId()) == false).collect(Collectors.toList());
     }
 
-    private void searchStats(DataFrameAnalyticsConfig config, ActionListener<Stats> listener) {
+    private void searchStats(DataFrameAnalyticsConfig config, TaskId parentTaskId, ActionListener<Stats> listener) {
         logger.debug("[{}] Gathering stats for stopped task", config.getId());
 
         RetrievedStatsHolder retrievedStatsHolder = new RetrievedStatsHolder(
@@ -256,6 +261,7 @@ private void searchStats(DataFrameAnalyticsConfig config, ActionListener<Stats>
         multiSearchRequest.add(buildStatsDocSearch(config.getId(), OutlierDetectionStats.TYPE_VALUE));
         multiSearchRequest.add(buildStatsDocSearch(config.getId(), ClassificationStats.TYPE_VALUE));
         multiSearchRequest.add(buildStatsDocSearch(config.getId(), RegressionStats.TYPE_VALUE));
+        multiSearchRequest.setParentTask(parentTaskId);
 
         executeAsyncWithOrigin(
             client,
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java
index 8adb6b1935865..fafeaa0c95e60 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java
@@ -18,6 +18,7 @@
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction;
 import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
@@ -67,6 +68,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
         ClusterState state = clusterService.state();
         final PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
         final Response.Builder responseBuilder = new Response.Builder();
+        final TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
 
         // 5. Build response
         ActionListener<GetDatafeedRunningStateAction.Response> runtimeStateListener = ActionListener.wrap(runtimeStateResponse -> {
@@ -77,6 +79,10 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
         // 4. Grab runtime state
         ActionListener<Map<String, DatafeedTimingStats>> datafeedTimingStatsListener = ActionListener.wrap(timingStatsByJobId -> {
             responseBuilder.setTimingStatsMap(timingStatsByJobId);
+            GetDatafeedRunningStateAction.Request datafeedRunningStateAction = new GetDatafeedRunningStateAction.Request(
+                responseBuilder.getDatafeedIds()
+            );
+            datafeedRunningStateAction.setParentTask(parentTaskId);
             client.execute(
                 GetDatafeedRunningStateAction.INSTANCE,
                 new GetDatafeedRunningStateAction.Request(responseBuilder.getDatafeedIds()),
@@ -89,7 +95,11 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
             Map<String, String> datafeedIdsToJobIds = datafeedBuilders.stream()
                 .collect(Collectors.toMap(DatafeedConfig.Builder::getId, DatafeedConfig.Builder::getJobId));
             responseBuilder.setDatafeedToJobId(datafeedIdsToJobIds);
-            jobResultsProvider.datafeedTimingStats(new ArrayList<>(datafeedIdsToJobIds.values()), datafeedTimingStatsListener);
+            jobResultsProvider.datafeedTimingStats(
+                new ArrayList<>(datafeedIdsToJobIds.values()),
+                parentTaskId,
+                datafeedTimingStatsListener
+            );
         }, listener::onFailure);
 
         // 2. Now that we have the ids, grab the datafeed configs
@@ -100,11 +110,19 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
                 // Already took into account the request parameter when we expanded the IDs with the tasks earlier
                 // Should allow for no datafeeds in case the config is gone
                 true,
+                parentTaskId,
                 expandedConfigsListener
             );
         }, listener::onFailure);
 
         // 1. This might also include datafeed tasks that exist but no longer have a config
-        datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoMatch(), tasksInProgress, true, expandIdsListener);
+        datafeedConfigProvider.expandDatafeedIds(
+            request.getDatafeedId(),
+            request.allowNoMatch(),
+            tasksInProgress,
+            true,
+            parentTaskId,
+            expandIdsListener
+        );
     }
 }
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobModelSnapshotsUpgradeStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobModelSnapshotsUpgradeStatsAction.java
index 66107210d8265..88ec82124bbe2 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobModelSnapshotsUpgradeStatsAction.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobModelSnapshotsUpgradeStatsAction.java
@@ -21,6 +21,7 @@
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
@@ -75,6 +76,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
         logger.debug(() -> format("[%s] get stats for model snapshot [%s] upgrades", request.getJobId(), request.getSnapshotId()));
         final PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
         final Collection<PersistentTasksCustomMetadata.PersistentTask<?>> snapshotUpgrades = MlTasks.snapshotUpgradeTasks(tasksInProgress);
+        final TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
 
         // 2. Now that we have the job IDs, find the relevant model snapshot upgrades
         ActionListener<List<Job.Builder>> expandIdsListener = ActionListener.wrap(jobs -> {
@@ -119,7 +121,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
         }, listener::onFailure);
 
         // 1. Expand jobs - this will throw if a required job ID match isn't made
-        jobConfigProvider.expandJobs(request.getJobId(), request.allowNoMatch(), true, expandIdsListener);
+        jobConfigProvider.expandJobs(request.getJobId(), request.allowNoMatch(), true, parentTaskId, expandIdsListener);
     }
 
     @Override
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java
index 5eb5adf101e3a..9ab41c7c15111 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java
@@ -22,6 +22,7 @@
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.action.util.QueryPage;
@@ -39,7 +40,6 @@
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
 import org.elasticsearch.xpack.ml.job.task.JobTask;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -95,18 +95,27 @@ public TransportGetJobsStatsAction(
     @Override
     protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> finalListener) {
         logger.debug("Get stats for job [{}]", request.getJobId());
+        TaskId parentTaskId = new TaskId(clusterService.getNodeName(), task.getId());
 
         ClusterState state = clusterService.state();
         PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
         // If there are deleted configs, but the task is still around, we probably want to return the tasks in the stats call
-        jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoMatch(), true, tasks, true, ActionListener.wrap(expandedIds -> {
-            request.setExpandedJobsIds(new ArrayList<>(expandedIds));
-            ActionListener<GetJobsStatsAction.Response> jobStatsListener = ActionListener.wrap(
-                response -> gatherStatsForClosedJobs(request, response, finalListener),
-                finalListener::onFailure
-            );
-            super.doExecute(task, request, jobStatsListener);
-        }, finalListener::onFailure));
+        jobConfigProvider.expandJobsIds(
+            request.getJobId(),
+            request.allowNoMatch(),
+            true,
+            tasks,
+            true,
+            parentTaskId,
+            ActionListener.wrap(expandedIds -> {
+                request.setExpandedJobsIds(new ArrayList<>(expandedIds));
+                ActionListener<GetJobsStatsAction.Response> jobStatsListener = ActionListener.wrap(
+                    response -> gatherStatsForClosedJobs(request, response, parentTaskId, finalListener),
+                    finalListener::onFailure
+                );
+                super.doExecute(task, request, jobStatsListener);
+            }, finalListener::onFailure)
+        );
     }
 
     @Override
@@ -120,7 +129,7 @@ protected GetJobsStatsAction.Response newResponse(
         for (QueryPage<JobStats> task : tasks) {
             stats.addAll(task.results());
         }
-        Collections.sort(stats, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
+        stats.sort(Comparator.comparing(JobStats::getJobId));
         return new GetJobsStatsAction.Response(
             taskOperationFailures,
             failedNodeExceptions,
@@ -135,6 +144,7 @@ protected void taskOperation(
         JobTask task,
         ActionListener<QueryPage<JobStats>> listener
     ) {
+        TaskId parentTaskId = new TaskId(clusterService.getNodeName(), actionTask.getId());
         String jobId = task.getJobId();
         ClusterState state = clusterService.state();
         PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
@@ -147,8 +157,8 @@ protected void taskOperation(
             DiscoveryNode node = state.nodes().get(pTask.getExecutorNode());
             JobState jobState = MlTasks.getJobState(jobId, tasks);
             String assignmentExplanation = pTask.getAssignment().getExplanation();
-            TimeValue openTime = durationToTimeValue(processManager.jobOpenTime(task));
-            jobResultsProvider.getForecastStats(jobId, forecastStats -> {
+            TimeValue openTime = processManager.jobOpenTime(task).map(value -> TimeValue.timeValueSeconds(value.getSeconds())).orElse(null);
+            jobResultsProvider.getForecastStats(jobId, parentTaskId, forecastStats -> {
                 JobStats jobStats = new JobStats(
                     jobId,
                     dataCounts,
@@ -173,6 +183,7 @@ protected void taskOperation(
     void gatherStatsForClosedJobs(
         GetJobsStatsAction.Request request,
         GetJobsStatsAction.Response response,
+        TaskId parentTaskId,
         ActionListener<GetJobsStatsAction.Response> listener
     ) {
         List<String> closedJobIds = determineJobIdsWithoutLiveStats(request.getExpandedJobsIds(), response.getResponse().results());
@@ -198,11 +209,14 @@ void gatherStatsForClosedJobs(
             for (int i = 0; i < closedJobIds.size(); i++) {
                 int slot = i;
                 String jobId = closedJobIds.get(i);
-                jobResultsProvider.getForecastStats(jobId, forecastStats -> {
-                    threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
+                jobResultsProvider.getForecastStats(
+                    jobId,
+                    parentTaskId,
+                    forecastStats -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
                         .execute(
                             () -> jobResultsProvider.getDataCountsModelSizeAndTimingStats(
                                 jobId,
+                                parentTaskId,
                                 (dataCounts, modelSizeStats, timingStats) -> {
                                     JobState jobState = MlTasks.getJobState(jobId, tasks);
                                     PersistentTasksCustomMetadata.PersistentTask<?> pTask = MlTasks.getJobTask(jobId, tasks);
@@ -232,7 +246,7 @@ void gatherStatsForClosedJobs(
                                         }
                                         List<JobStats> results = response.getResponse().results();
                                         results.addAll(jobStats.asList());
-                                        Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
+                                        results.sort(Comparator.comparing(JobStats::getJobId));
                                         listener.onResponse(
                                             new GetJobsStatsAction.Response(
                                                 response.getTaskFailures(),
@@ -244,20 +258,13 @@ void gatherStatsForClosedJobs(
                                 },
                                 errorHandler
                             )
-                        );
-                }, errorHandler);
+                        ),
+                    errorHandler
+                );
             }
         });
     }
 
-    static TimeValue durationToTimeValue(Optional<Duration> duration) {
-        if (duration.isPresent()) {
-            return TimeValue.timeValueSeconds(duration.get().getSeconds());
-        } else {
-            return null;
-        }
-    }
-
     static List<String> determineJobIdsWithoutLiveStats(List<String> requestedJobIds, List<GetJobsStatsAction.Response.JobStats> stats) {
         Set<String> excludeJobIds = stats.stream().map(GetJobsStatsAction.Response.JobStats::getJobId).collect(Collectors.toSet());
         return requestedJobIds.stream().filter(jobId -> excludeJobIds.contains(jobId) == false).collect(Collectors.toList());
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java
index 37194552b24ce..fccb8bcc29126 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java
@@ -188,6 +188,7 @@ private void doExecute(
                 request.allowNoMatch(),
                 tasks,
                 request.isForce(),
+                null,
                 ActionListener.wrap(expandedIds -> {
                     List<String> startedDatafeeds = new ArrayList<>();
                     List<String> stoppingDatafeeds = new ArrayList<>();
@@ -372,37 +373,28 @@ private void forceStopDatafeed(
         for (String datafeedId : notStoppedDatafeeds) {
             PersistentTasksCustomMetadata.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
             if (datafeedTask != null) {
-                persistentTasksService.sendRemoveRequest(
-                    datafeedTask.getId(),
-                    new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() {
-                        @Override
-                        public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
-                            // For force stop, only audit here if the datafeed was unassigned at the time of the stop, hence inactive.
-                            // If the datafeed was active then it audits itself on being cancelled.
-                            if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes)) {
-                                auditDatafeedStopped(datafeedTask);
-                            }
-                            if (counter.incrementAndGet() == notStoppedDatafeeds.size()) {
-                                sendResponseOrFailure(request.getDatafeedId(), listener, failures);
-                            }
-                        }
-
-                        @Override
-                        public void onFailure(Exception e) {
-                            final int slot = counter.incrementAndGet();
-                            // We validated that the datafeed names supplied in the request existed when we started processing the action.
-                            // If the related tasks don't exist at this point then they must have been stopped by a simultaneous stop
-                            // request.
-                            // This is not an error.
-                            if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
-                                failures.set(slot - 1, e);
-                            }
-                            if (slot == notStoppedDatafeeds.size()) {
-                                sendResponseOrFailure(request.getDatafeedId(), listener, failures);
-                            }
-                        }
+                persistentTasksService.sendRemoveRequest(datafeedTask.getId(), ActionListener.wrap(persistentTask -> {
+                    // For force stop, only audit here if the datafeed was unassigned at the time of the stop, hence inactive.
+                    // If the datafeed was active then it audits itself on being cancelled.
+                    if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes)) {
+                        auditDatafeedStopped(datafeedTask);
                     }
-                );
+                    if (counter.incrementAndGet() == notStoppedDatafeeds.size()) {
+                        sendResponseOrFailure(request.getDatafeedId(), listener, failures);
+                    }
+                }, e -> {
+                    final int slot = counter.incrementAndGet();
+                    // We validated that the datafeed names supplied in the request existed when we started processing the action.
+                    // If the related tasks don't exist at this point then they must have been stopped by a simultaneous stop
+                    // request.
+                    // This is not an error.
+                    if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
+                        failures.set(slot - 1, e);
+                    }
+                    if (slot == notStoppedDatafeeds.size()) {
+                        sendResponseOrFailure(request.getDatafeedId(), listener, failures);
+                    }
+                }));
             } else {
                 // This should not happen, because startedDatafeeds and stoppingDatafeeds
                 // were derived from the same tasks that were passed to this method
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdater.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdater.java
index 0434213b0affe..4828fbf517650 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdater.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdater.java
@@ -73,7 +73,7 @@ public String getName() {
     @Override
     public void runUpdate() {
         PlainActionFuture<List<DatafeedConfig.Builder>> getdatafeeds = PlainActionFuture.newFuture();
-        provider.expandDatafeedConfigs("_all", true, getdatafeeds);
+        provider.expandDatafeedConfigs("_all", true, null, getdatafeeds);
         List<DatafeedConfig.Builder> datafeedConfigBuilders = getdatafeeds.actionGet();
         List<DatafeedUpdate> updates = datafeedConfigBuilders.stream()
             .map(DatafeedConfig.Builder::build)
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java
index abd154647802a..8cb22a33fa926 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java
@@ -159,6 +159,7 @@ public void getDatafeeds(GetDatafeedsAction.Request request, ActionListener<Quer
         datafeedConfigProvider.expandDatafeedConfigs(
             request.getDatafeedId(),
             request.allowNoMatch(),
+            null,
             ActionListener.wrap(datafeedBuilders ->
 
             // Build datafeeds
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java
index fce5accc4c467..1ce5c19499bac 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java
@@ -31,6 +31,7 @@
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
@@ -42,6 +43,7 @@
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -378,6 +380,7 @@ private void indexUpdatedConfig(DatafeedConfig updatedConfig, long seqNo, long p
      *                     wildcard then setting this true will not suppress the exception
      * @param tasks The current tasks meta-data. For expanding IDs when datafeeds might have missing configurations
      * @param allowMissingConfigs If a datafeed has a task, but is missing a config, allow the ID to be expanded via the existing task
+     * @param parentTaskId the parent task ID if available
      * @param listener The expanded datafeed IDs listener
      */
     public void expandDatafeedIds(
@@ -385,6 +388,7 @@ public void expandDatafeedIds(
         boolean allowNoMatch,
         PersistentTasksCustomMetadata tasks,
         boolean allowMissingConfigs,
+        @Nullable TaskId parentTaskId,
         ActionListener<SortedSet<String>> listener
     ) {
         String[] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
@@ -398,6 +402,9 @@ public void expandDatafeedIds(
             .setSource(sourceBuilder)
             .setSize(MlConfigIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
             .request();
+        if (parentTaskId != null) {
+            searchRequest.setParentTask(parentTaskId);
+        }
 
         ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoMatch);
         Collection<String> matchingStartedDatafeedIds = matchingDatafeedIdsWithTasks(tokens, tasks);
@@ -431,18 +438,24 @@ public void expandDatafeedIds(
     }
 
     /**
-     * The same logic as {@link #expandDatafeedIds(String, boolean, PersistentTasksCustomMetadata, boolean, ActionListener)} but
+     * The same logic as {@link #expandDatafeedIds(String, boolean, PersistentTasksCustomMetadata, boolean, TaskId, ActionListener)} but
      * the full datafeed configuration is returned.
      *
-     * See {@link #expandDatafeedIds(String, boolean, PersistentTasksCustomMetadata, boolean, ActionListener)}
+     * See {@link #expandDatafeedIds(String, boolean, PersistentTasksCustomMetadata, boolean, TaskId, ActionListener)}
      *
      * @param expression the expression to resolve
      * @param allowNoMatch if {@code false}, an error is thrown when no name matches the {@code expression}.
      *                     This only applies to wild card expressions, if {@code expression} is not a
      *                     wildcard then setting this true will not suppress the exception
+     * @param parentTaskId the parent task ID if available
      * @param listener The expanded datafeed config listener
      */
-    public void expandDatafeedConfigs(String expression, boolean allowNoMatch, ActionListener<List<DatafeedConfig.Builder>> listener) {
+    public void expandDatafeedConfigs(
+        String expression,
+        boolean allowNoMatch,
+        @Nullable TaskId parentTaskId,
+        ActionListener<List<DatafeedConfig.Builder>> listener
+    ) {
         String[] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens));
         sourceBuilder.sort(DatafeedConfig.ID.getPreferredName());
@@ -452,6 +465,9 @@ public void expandDatafeedConfigs(String expression, boolean allowNoMatch, Actio
             .setSource(sourceBuilder)
             .setSize(MlConfigIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
             .request();
+        if (parentTaskId != null) {
+            searchRequest.setParentTask(parentTaskId);
+        }
 
         ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoMatch);
 
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java
index d0502b6826d5f..3b7a955c3748a 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java
@@ -158,7 +158,7 @@ public void getJob(String jobId, ActionListener<Job> jobListener) {
      * @param jobsListener The jobs listener
      */
     public void expandJobBuilders(String expression, boolean allowNoMatch, ActionListener<List<Job.Builder>> jobsListener) {
-        jobConfigProvider.expandJobs(expression, allowNoMatch, false, jobsListener);
+        jobConfigProvider.expandJobs(expression, allowNoMatch, false, null, jobsListener);
     }
 
     /**
@@ -436,16 +436,14 @@ private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, Acti
                         logger.error("[{}] Updating autodetect failed for job update [{}]", jobUpdate.getJobId(), jobUpdate);
                     }
                 },
-                    e -> {
-                        logger.error(
-                            () -> format(
-                                "[%s] Updating autodetect failed with an exception, job update [%s] ",
-                                jobUpdate.getJobId(),
-                                jobUpdate
-                            ),
-                            e
-                        );
-                    }
+                    e -> logger.error(
+                        () -> format(
+                            "[%s] Updating autodetect failed with an exception, job update [%s] ",
+                            jobUpdate.getJobId(),
+                            jobUpdate
+                        ),
+                        e
+                    )
                 ));
             }
         } else {
@@ -637,7 +635,7 @@ private void submitJobEventUpdate(Collection<String> jobIds, ActionListener<Bool
             updateJobProcessNotifier.submitJobUpdate(
                 UpdateParams.scheduledEventsUpdate(jobId),
                 ActionListener.wrap(
-                    isUpdated -> { auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)); },
+                    isUpdated -> auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)),
                     e -> logger.error("[" + jobId + "] failed submitting process update on calendar change", e)
                 )
             );
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java
index c095076069221..10e6c43c4033a 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java
@@ -47,11 +47,13 @@
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentFactory;
 import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
 import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
 import org.elasticsearch.xpack.core.ml.MlConfigIndex;
@@ -82,7 +84,6 @@
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.stream.Collectors;
 
 import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@@ -358,33 +359,27 @@ public void jobExists(String jobId, boolean errorIfMissing, ActionListener<Boole
         GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), Job.documentId(jobId));
         getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
 
-        executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener<GetResponse>() {
-            @Override
-            public void onResponse(GetResponse getResponse) {
-                if (getResponse.isExists() == false) {
-                    if (errorIfMissing) {
-                        listener.onFailure(ExceptionsHelper.missingJobException(jobId));
-                    } else {
-                        listener.onResponse(Boolean.FALSE);
-                    }
+        executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> {
+            if (getResponse.isExists() == false) {
+                if (errorIfMissing) {
+                    listener.onFailure(ExceptionsHelper.missingJobException(jobId));
                 } else {
-                    listener.onResponse(Boolean.TRUE);
+                    listener.onResponse(Boolean.FALSE);
                 }
+            } else {
+                listener.onResponse(Boolean.TRUE);
             }
-
-            @Override
-            public void onFailure(Exception e) {
-                if (e.getClass() == IndexNotFoundException.class) {
-                    if (errorIfMissing) {
-                        listener.onFailure(ExceptionsHelper.missingJobException(jobId));
-                    } else {
-                        listener.onResponse(Boolean.FALSE);
-                    }
+        }, e -> {
+            if (e.getClass() == IndexNotFoundException.class) {
+                if (errorIfMissing) {
+                    listener.onFailure(ExceptionsHelper.missingJobException(jobId));
                 } else {
-                    listener.onFailure(e);
+                    listener.onResponse(Boolean.FALSE);
                 }
+            } else {
+                listener.onFailure(e);
             }
-        });
+        }));
     }
 
     /**
@@ -468,6 +463,7 @@ public void updateJobAfterReset(String jobId, ActionListener<PutJobAction.Respon
      * @param tasksCustomMetadata The current persistent task metadata.
      *                            For resolving jobIds that have tasks, but for some reason, don't have configs
      * @param allowMissingConfigs If a job has a task, but is missing a config, allow the ID to be expanded via the existing task
+     * @param parentTaskId the parent task ID if available
      * @param listener The expanded job Ids listener
      */
     public void expandJobsIds(
@@ -476,6 +472,7 @@ public void expandJobsIds(
         boolean excludeDeleting,
         @Nullable PersistentTasksCustomMetadata tasksCustomMetadata,
         boolean allowMissingConfigs,
+        @Nullable TaskId parentTaskId,
         ActionListener<SortedSet<String>> listener
     ) {
         String[] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
@@ -490,6 +487,9 @@ public void expandJobsIds(
             .setSource(sourceBuilder)
             .setSize(MlConfigIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
             .request();
+        if (parentTaskId != null) {
+            searchRequest.setParentTask(parentTaskId);
+        }
 
         ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoMatch);
         Collection<String> openMatchingJobs = matchingJobIdsWithTasks(tokens, tasksCustomMetadata);
@@ -506,7 +506,7 @@ public void expandJobsIds(
                     jobIds.add(hit.field(Job.ID.getPreferredName()).getValue());
                     List<Object> groups = hit.field(Job.GROUPS.getPreferredName()).getValues();
                     if (groups != null) {
-                        groupsIds.addAll(groups.stream().map(Object::toString).collect(Collectors.toList()));
+                        groupsIds.addAll(groups.stream().map(Object::toString).toList());
                     }
                 }
                 if (allowMissingConfigs) {
@@ -528,19 +528,27 @@ public void expandJobsIds(
     }
 
     /**
-     * The same logic as {@link #expandJobsIds(String, boolean, boolean, PersistentTasksCustomMetadata, boolean, ActionListener)} but
+     * The same logic as
+     * {@link #expandJobsIds(String, boolean, boolean, PersistentTasksCustomMetadata, boolean, TaskId, ActionListener)} but
      * the full anomaly detector job configuration is returned.
      *
-     * See {@link #expandJobsIds(String, boolean, boolean, PersistentTasksCustomMetadata, boolean, ActionListener)}
+     * See {@link #expandJobsIds(String, boolean, boolean, PersistentTasksCustomMetadata, boolean, TaskId, ActionListener)}
      *
      * @param expression the expression to resolve
      * @param allowNoMatch if {@code false}, an error is thrown when no name matches the {@code expression}.
      *                     This only applies to wild card expressions, if {@code expression} is not a
      *                     wildcard then setting this true will not suppress the exception
      * @param excludeDeleting If true exclude jobs marked as deleting
+     * @param parentTaskId parent task id
      * @param listener The expanded jobs listener
      */
-    public void expandJobs(String expression, boolean allowNoMatch, boolean excludeDeleting, ActionListener<List<Job.Builder>> listener) {
+    public void expandJobs(
+        String expression,
+        boolean allowNoMatch,
+        boolean excludeDeleting,
+        @Nullable TaskId parentTaskId,
+        ActionListener<List<Job.Builder>> listener
+    ) {
         String[] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildJobWildcardQuery(tokens, excludeDeleting));
         sourceBuilder.sort(Job.ID.getPreferredName());
@@ -550,6 +558,9 @@ public void expandJobs(String expression, boolean allowNoMatch, boolean excludeD
             .setSource(sourceBuilder)
             .setSize(MlConfigIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
             .request();
+        if (parentTaskId != null) {
+            searchRequest.setParentTask(parentTaskId);
+        }
 
         ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoMatch);
 
@@ -591,8 +602,8 @@ public void expandJobs(String expression, boolean allowNoMatch, boolean excludeD
 
     /**
      * Expands the list of job group Ids to the set of jobs which are members of the groups.
-     * Unlike {@link #expandJobsIds(String, boolean, boolean, PersistentTasksCustomMetadata, boolean, ActionListener)} it is not an error
-     * if a group Id does not exist.
+     * Unlike {@link #expandJobsIds(String, boolean, boolean, PersistentTasksCustomMetadata, boolean, TaskId, ActionListener)} it is not an
+     * error if a group Id does not exist.
      * Wildcard expansion of group Ids is not supported.
      *
      * @param groupIds Group Ids to expand
@@ -654,7 +665,7 @@ public void groupExists(String groupId, ActionListener<Boolean> listener) {
             ML_ORIGIN,
             searchRequest,
             ActionListener.<SearchResponse>wrap(
-                response -> { listener.onResponse(response.getHits().getTotalHits().value > 0); },
+                response -> listener.onResponse(response.getHits().getTotalHits().value > 0),
                 listener::onFailure
             ),
             client::search
@@ -733,7 +744,7 @@ private void parseJobLenientlyFromSource(BytesReference source, ActionListener<J
         try (
             InputStream stream = source.streamInput();
             XContentParser parser = XContentFactory.xContent(XContentType.JSON)
-                .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
+                .createParser(XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE), stream)
         ) {
             jobListener.onResponse(Job.LENIENT_PARSER.apply(parser, null));
         } catch (Exception e) {
@@ -745,7 +756,7 @@ private Job.Builder parseJobLenientlyFromSource(BytesReference source) throws IO
         try (
             InputStream stream = source.streamInput();
             XContentParser parser = XContentFactory.xContent(XContentType.JSON)
-                .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
+                .createParser(XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE), stream)
         ) {
             return Job.LENIENT_PARSER.apply(parser, null);
         }
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java
index 0282661d880b0..a3eda544a5d64 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java
@@ -80,11 +80,12 @@
 import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.search.sort.SortOrder;
-import org.elasticsearch.xcontent.NamedXContentRegistry;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentFactory;
 import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
 import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.core.action.util.QueryPage;
@@ -495,6 +496,7 @@ public void dataCounts(String jobId, Consumer<DataCounts> handler, Consumer<Exce
 
     public void getDataCountsModelSizeAndTimingStats(
         String jobId,
+        @Nullable TaskId parentTaskId,
         TriConsumer<DataCounts, ModelSizeStats, TimingStats> handler,
         Consumer<Exception> errorHandler
     ) {
@@ -538,6 +540,9 @@ public void getDataCountsModelSizeAndTimingStats(
                     )
             )
             .request();
+        if (parentTaskId != null) {
+            request.setParentTask(parentTaskId);
+        }
         executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request, ActionListener.<SearchResponse>wrap(response -> {
             Aggregations aggs = response.getAggregations();
             if (aggs == null) {
@@ -590,7 +595,11 @@ private SearchRequestBuilder createLatestTimingStatsSearch(String indexName, Str
             .addSort(SortBuilders.fieldSort(TimingStats.BUCKET_COUNT.getPreferredName()).order(SortOrder.DESC));
     }
 
-    public void datafeedTimingStats(List<String> jobIds, ActionListener<Map<String, DatafeedTimingStats>> listener) {
+    public void datafeedTimingStats(
+        List<String> jobIds,
+        @Nullable TaskId parentTaskId,
+        ActionListener<Map<String, DatafeedTimingStats>> listener
+    ) {
         if (jobIds.isEmpty()) {
             listener.onResponse(Map.of());
             return;
@@ -601,6 +610,9 @@ public void datafeedTimingStats(List<String> jobIds, ActionListener<Map<String,
             msearchRequestBuilder.add(createLatestDatafeedTimingStatsSearch(indexName, jobId));
         }
         MultiSearchRequest msearchRequest = msearchRequestBuilder.request();
+        if (parentTaskId != null) {
+            msearchRequest.setParentTask(parentTaskId);
+        }
 
         executeAsyncWithOrigin(
             client.threadPool().getThreadContext(),
@@ -835,7 +847,10 @@ public void buckets(
                     try (
                         InputStream stream = source.streamInput();
                         XContentParser parser = XContentFactory.xContent(XContentType.JSON)
-                            .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
+                            .createParser(
+                                XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE),
+                                stream
+                            )
                     ) {
                         Bucket bucket = Bucket.LENIENT_PARSER.apply(parser, null);
                         results.add(bucket);
@@ -1038,7 +1053,10 @@ public void categoryDefinitions(
                     try (
                         InputStream stream = source.streamInput();
                         XContentParser parser = XContentFactory.xContent(XContentType.JSON)
-                            .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
+                            .createParser(
+                                XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE),
+                                stream
+                            )
                     ) {
                         CategoryDefinition categoryDefinition = CategoryDefinition.LENIENT_PARSER.apply(parser, null);
                         if (augment) {
@@ -1103,7 +1121,10 @@ public void records(
                     try (
                         InputStream stream = source.streamInput();
                         XContentParser parser = XContentFactory.xContent(XContentType.JSON)
-                            .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
+                            .createParser(
+                                XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE),
+                                stream
+                            )
                     ) {
                         results.add(AnomalyRecord.LENIENT_PARSER.apply(parser, null));
                     } catch (IOException e) {
@@ -1171,7 +1192,10 @@ public void influencers(
                     try (
                         InputStream stream = source.streamInput();
                         XContentParser parser = XContentFactory.xContent(XContentType.JSON)
-                            .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
+                            .createParser(
+                                XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE),
+                                stream
+                            )
                     ) {
                         influencers.add(Influencer.LENIENT_PARSER.apply(parser, null));
                     } catch (IOException e) {
@@ -1220,7 +1244,7 @@ public void getModelSnapshot(
             ModelSnapshot.TYPE.getPreferredName(),
             search,
             ModelSnapshot.LENIENT_PARSER,
-            result -> handler.accept(result.result == null ? null : new Result<ModelSnapshot>(result.index, result.result.build())),
+            result -> handler.accept(result.result == null ? null : new Result<>(result.index, result.result.build())),
             errorHandler,
             () -> null
         );
@@ -1366,7 +1390,7 @@ public QueryPage<ModelPlot> modelPlot(String jobId, int from, int size) {
             try (
                 InputStream stream = source.streamInput();
                 XContentParser parser = XContentFactory.xContent(XContentType.JSON)
-                    .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
+                    .createParser(XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE), stream)
             ) {
                 ModelPlot modelPlot = ModelPlot.LENIENT_PARSER.apply(parser, null);
                 results.add(modelPlot);
@@ -1400,7 +1424,7 @@ public QueryPage<CategorizerStats> categorizerStats(String jobId, int from, int
             try (
                 InputStream stream = source.streamInput();
                 XContentParser parser = XContentFactory.xContent(XContentType.JSON)
-                    .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
+                    .createParser(XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE), stream)
             ) {
                 CategorizerStats categorizerStats = CategorizerStats.LENIENT_PARSER.apply(parser, null).build();
                 results.add(categorizerStats);
@@ -1731,7 +1755,12 @@ public void getForecastRequestStats(
         );
     }
 
-    public void getForecastStats(String jobId, Consumer<ForecastStats> handler, Consumer<Exception> errorHandler) {
+    public void getForecastStats(
+        String jobId,
+        @Nullable TaskId parentTaskId,
+        Consumer<ForecastStats> handler,
+        Consumer<Exception> errorHandler
+    ) {
         String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
 
         QueryBuilder termQuery = new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE);
@@ -1758,6 +1787,9 @@ public void getForecastStats(String jobId, Consumer<ForecastStats> handler, Cons
         sourceBuilder.trackTotalHits(false);
 
         searchRequest.source(sourceBuilder);
+        if (parentTaskId != null) {
+            searchRequest.setParentTask(parentTaskId);
+        }
 
         executeAsyncWithOrigin(
             client.threadPool().getThreadContext(),
@@ -1912,11 +1944,13 @@ public void onResponse(GetResponse getDocResponse) {
                 try {
                     if (getDocResponse.isExists()) {
                         BytesReference docSource = getDocResponse.getSourceAsBytesRef();
-
                         try (
                             InputStream stream = docSource.streamInput();
                             XContentParser parser = XContentFactory.xContent(XContentType.JSON)
-                                .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
+                                .createParser(
+                                    XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE),
+                                    stream
+                                )
                         ) {
                             Calendar calendar = Calendar.LENIENT_PARSER.apply(parser, null).build();
                             listener.onResponse(calendar);
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestGetDatafeedStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestGetDatafeedStatsAction.java
index d7d094f55ce7c..50a666aef9ebf 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestGetDatafeedStatsAction.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestGetDatafeedStatsAction.java
@@ -11,6 +11,7 @@
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction.Request;
@@ -59,6 +60,10 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
             (r, s) -> r.paramAsBoolean(s, request.allowNoMatch()),
             request::setAllowNoMatch
         );
-        return channel -> client.execute(GetDatafeedsStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
+        return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
+            GetDatafeedsStatsAction.INSTANCE,
+            request,
+            new RestToXContentListener<>(channel)
+        );
     }
 }
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestGetDataFrameAnalyticsStatsAction.java
index 2fc2c8714c315..d5132978a739b 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestGetDataFrameAnalyticsStatsAction.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestGetDataFrameAnalyticsStatsAction.java
@@ -10,6 +10,7 @@
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.action.util.PageParams;
 import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
@@ -57,7 +58,11 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
             restRequest.paramAsBoolean(GetDataFrameAnalyticsStatsAction.Request.ALLOW_NO_MATCH.getPreferredName(), request.isAllowNoMatch())
         );
 
-        return channel -> client.execute(GetDataFrameAnalyticsStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
+        return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
+            GetDataFrameAnalyticsStatsAction.INSTANCE,
+            request,
+            new RestToXContentListener<>(channel)
+        );
     }
 
     @Override
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestGetJobStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestGetJobStatsAction.java
index 104abcd4364c1..1adac7991e12b 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestGetJobStatsAction.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestGetJobStatsAction.java
@@ -12,6 +12,7 @@
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction.Request;
@@ -60,6 +61,10 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
             (r, s) -> r.paramAsBoolean(s, request.allowNoMatch()),
             request::setAllowNoMatch
         );
-        return channel -> client.execute(GetJobsStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
+        return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
+            GetJobsStatsAction.INSTANCE,
+            request,
+            new RestToXContentListener<>(channel)
+        );
     }
 }
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/modelsnapshots/RestGetJobModelSnapshotsUpgradeStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/modelsnapshots/RestGetJobModelSnapshotsUpgradeStatsAction.java
index b56b89fe3073b..4e327a31c07a5 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/modelsnapshots/RestGetJobModelSnapshotsUpgradeStatsAction.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/modelsnapshots/RestGetJobModelSnapshotsUpgradeStatsAction.java
@@ -11,6 +11,7 @@
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestHandler;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.ml.action.GetJobModelSnapshotsUpgradeStatsAction;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
@@ -44,6 +45,10 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest restReq
         request.setAllowNoMatch(
             restRequest.paramAsBoolean(GetJobModelSnapshotsUpgradeStatsAction.Request.ALLOW_NO_MATCH, request.allowNoMatch())
         );
-        return channel -> client.execute(GetJobModelSnapshotsUpgradeStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
+        return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
+            GetJobModelSnapshotsUpgradeStatsAction.INSTANCE,
+            request,
+            new RestToXContentListener<>(channel)
+        );
     }
 }
diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java
index 90ca52c7a27e2..c55b78c07939a 100644
--- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java
+++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java
@@ -363,11 +363,11 @@ private void mockDatafeedConfigFindDatafeeds(Set<String> datafeedIds) {
     @SuppressWarnings("unchecked")
     private void mockJobConfigProviderExpandIds(Set<String> expandedIds) {
         doAnswer(invocation -> {
-            ActionListener<Set<String>> listener = (ActionListener<Set<String>>) invocation.getArguments()[5];
+            ActionListener<Set<String>> listener = (ActionListener<Set<String>>) invocation.getArguments()[6];
             listener.onResponse(expandedIds);
 
             return null;
-        }).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), anyBoolean(), any(), anyBoolean(), any(ActionListener.class));
+        }).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), anyBoolean(), any(), anyBoolean(), any(), any(ActionListener.class));
     }
 
     @SuppressWarnings("unchecked")
diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java
index 61ffd15132ba6..66f930a165844 100644
--- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java
+++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java
@@ -6,18 +6,15 @@
  */
 package org.elasticsearch.xpack.ml.action;
 
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
 
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 import static org.elasticsearch.xpack.ml.action.TransportGetJobsStatsAction.determineJobIdsWithoutLiveStats;
 
@@ -144,11 +141,4 @@ public void testDetermineJobIds() {
         assertEquals(0, result.size());
     }
 
-    public void testDurationToTimeValue() {
-        assertNull(TransportGetJobsStatsAction.durationToTimeValue(Optional.empty()));
-
-        Duration duration = Duration.ofSeconds(10L);
-        TimeValue timeValue = TransportGetJobsStatsAction.durationToTimeValue(Optional.of(duration));
-        assertEquals(10L, timeValue.getSeconds());
-    }
 }
diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdaterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdaterTests.java
index 8bc22b012b585..c6ccf1ef6affe 100644
--- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdaterTests.java
+++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigAutoUpdaterTests.java
@@ -50,18 +50,18 @@
 public class DatafeedConfigAutoUpdaterTests extends ESTestCase {
 
     private DatafeedConfigProvider provider;
-    private List<DatafeedConfig.Builder> datafeeds = new ArrayList<>();
-    private IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();
+    private final List<DatafeedConfig.Builder> datafeeds = new ArrayList<>();
+    private final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();
 
     @Before
     public void setup() {
         provider = mock(DatafeedConfigProvider.class);
         doAnswer(call -> {
             @SuppressWarnings("unchecked")
-            ActionListener<List<DatafeedConfig.Builder>> handler = (ActionListener<List<DatafeedConfig.Builder>>) call.getArguments()[2];
+            ActionListener<List<DatafeedConfig.Builder>> handler = (ActionListener<List<DatafeedConfig.Builder>>) call.getArguments()[3];
             handler.onResponse(datafeeds);
             return null;
-        }).when(provider).expandDatafeedConfigs(any(), anyBoolean(), any());
+        }).when(provider).expandDatafeedConfigs(any(), anyBoolean(), any(), any());
         doAnswer(call -> {
             @SuppressWarnings("unchecked")
             ActionListener<DatafeedConfig> handler = (ActionListener<DatafeedConfig>) call.getArguments()[4];
diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java
index 16d72ca4e543d..fc1db21a52325 100644
--- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java
+++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java
@@ -681,6 +681,7 @@ public void testDatafeedTimingStats_EmptyJobList() {
         JobResultsProvider provider = createProvider(client);
         provider.datafeedTimingStats(
             List.of(),
+            null,
             ActionListener.wrap(
                 statsByJobId -> assertThat(statsByJobId, anEmptyMap()),
                 e -> { throw new AssertionError("Failure getting datafeed timing stats", e); }
@@ -774,6 +775,7 @@ public void testDatafeedTimingStats_MultipleDocumentsAtOnce() throws IOException
         );
         provider.datafeedTimingStats(
             List.of("foo", "bar"),
+            null,
             ActionListener.wrap(
                 statsByJobId -> assertThat(
                     statsByJobId,