diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/TaskRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/TaskRecoveryIT.java new file mode 100644 index 0000000000000..416c29f882dd2 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/TaskRecoveryIT.java @@ -0,0 +1,137 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.stream.StreamSupport; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class TaskRecoveryIT extends ESIntegTestCase { + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), TaskRecoveryIT.EngineTestPlugin.class); + } + + /** + * Checks that the parent / child task hierarchy is correct for tasks that are initiated by a recovery task. + * We use an engine plugin that stalls translog recovery, which gives us the opportunity to inspect the + * task hierarchy. + */ + public void testTaskForOngoingRecovery() throws Exception { + String indexName = "test"; + internalCluster().startMasterOnlyNode(); + String nodeWithPrimary = internalCluster().startDataOnlyNode(); + assertAcked( + client().admin() + .indices() + .prepareCreate(indexName) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.routing.allocation.include._name", nodeWithPrimary) + ) + ); + try { + String nodeWithReplica = internalCluster().startDataOnlyNode(); + + // Create an index so that there is something to recover + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.include._name", nodeWithPrimary + "," + nodeWithReplica) + ) + ); + // Translog recovery is stalled, so we can inspect the running tasks. + assertBusy(() -> { + List primaryTasks = client().admin() + .cluster() + .prepareListTasks(nodeWithPrimary) + .setActions(PeerRecoverySourceService.Actions.START_RECOVERY) + .get() + .getTasks(); + assertThat("Expected a single primary task", primaryTasks.size(), equalTo(1)); + List replicaTasks = client().admin() + .cluster() + .prepareListTasks(nodeWithReplica) + .setActions(PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG) + .get() + .getTasks(); + assertThat("Expected a single replica task", replicaTasks.size(), equalTo(1)); + assertThat( + "Replica task's parent task ID was incorrect", + replicaTasks.get(0).parentTaskId(), + equalTo(primaryTasks.get(0).taskId()) + ); + }); + } finally { + // Release the EngineTestPlugin, which will allow translog recovery to complete + StreamSupport.stream(internalCluster().getInstances(PluginsService.class).spliterator(), false) + .flatMap(ps -> ps.filterPlugins(EnginePlugin.class).stream()) + .map(EngineTestPlugin.class::cast) + .forEach(EngineTestPlugin::release); + } + ensureGreen(indexName); + } + + /** + * An engine plugin that defers translog recovery until the engine is released via {@link #release()}. + */ + public static class EngineTestPlugin extends Plugin implements EnginePlugin { + private final CountDownLatch latch = new CountDownLatch(1); + + public void release() { + latch.countDown(); + } + + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + return Optional.of(config -> new InternalEngine(config) { + + @Override + public void skipTranslogRecovery() { + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + super.skipTranslogRecovery(); + } + }); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStatePublicationEvent.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStatePublicationEvent.java index dc8c328cbbe9d..bae96e912da26 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStatePublicationEvent.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStatePublicationEvent.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster; import org.elasticsearch.cluster.service.BatchSummary; +import org.elasticsearch.tasks.Task; /** * Represents a cluster state update computed by the {@link org.elasticsearch.cluster.service.MasterService} for publication to the cluster. @@ -24,6 +25,7 @@ public class ClusterStatePublicationEvent { private final BatchSummary summary; private final ClusterState oldState; private final ClusterState newState; + private final Task task; private final long computationTimeMillis; private final long publicationStartTimeMillis; private volatile long publicationContextConstructionElapsedMillis = NOT_SET; @@ -35,12 +37,14 @@ public ClusterStatePublicationEvent( BatchSummary summary, ClusterState oldState, ClusterState newState, + Task task, long computationTimeMillis, long publicationStartTimeMillis ) { this.summary = summary; this.oldState = oldState; this.newState = newState; + this.task = task; this.computationTimeMillis = computationTimeMillis; this.publicationStartTimeMillis = publicationStartTimeMillis; } @@ -57,6 +61,10 @@ public ClusterState getNewState() { return newState; } + public Task getTask() { + return task; + } + public long getComputationTimeMillis() { return computationTimeMillis; } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java index 41ecb0cbdc785..048b2e6665102 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java @@ -36,6 +36,7 @@ import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasables; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BytesTransportRequest; import org.elasticsearch.transport.TransportException; @@ -284,6 +285,7 @@ public class PublicationContext extends AbstractRefCounted { private final DiscoveryNodes discoveryNodes; private final ClusterState newState; private final ClusterState previousState; + private final Task task; private final boolean sendFullVersion; // All the values of these maps have one ref for the context (while it's open) and one for each in-flight message. @@ -294,6 +296,7 @@ public class PublicationContext extends AbstractRefCounted { discoveryNodes = clusterStatePublicationEvent.getNewState().nodes(); newState = clusterStatePublicationEvent.getNewState(); previousState = clusterStatePublicationEvent.getOldState(); + task = clusterStatePublicationEvent.getTask(); sendFullVersion = previousState.getBlocks().disableStatePersistence(); } @@ -421,10 +424,11 @@ private void sendClusterState( return; } try { - transportService.sendRequest( + transportService.sendChildRequest( destination, PUBLISH_STATE_ACTION_NAME, new BytesTransportRequest(bytes, destination.getVersion()), + task, STATE_REQUEST_OPTIONS, new ActionListenerResponseHandler<>( ActionListener.runAfter(listener, bytes::decRef), diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index 0983500d47ffa..4696da3fe2c5f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.node.Node; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; public class ClusterService extends AbstractLifecycleComponent { @@ -55,11 +56,11 @@ public class ClusterService extends AbstractLifecycleComponent { private RerouteService rerouteService; - public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, TaskManager taskManager) { this( settings, clusterSettings, - new MasterService(settings, clusterSettings, threadPool), + new MasterService(settings, clusterSettings, threadPool, taskManager), new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool) ); } @@ -262,5 +263,4 @@ public void submitStateUpdateTask( ) { masterService.submitStateUpdateTask(source, task, config, executor); } - } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index f78e3c2a7cfdd..4c00f33a27b01 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -42,6 +42,10 @@ import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskAwareRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -74,6 +78,8 @@ public class MasterService extends AbstractLifecycleComponent { static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask"; + public static final String STATE_UPDATE_ACTION_NAME = "publish_cluster_state_update"; + ClusterStatePublisher clusterStatePublisher; private final String nodeName; @@ -84,13 +90,14 @@ public class MasterService extends AbstractLifecycleComponent { private final TimeValue starvationLoggingThreshold; protected final ThreadPool threadPool; + private final TaskManager taskManager; private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor; private volatile Batcher taskBatcher; private final ClusterStateUpdateStatsTracker clusterStateUpdateStatsTracker = new ClusterStateUpdateStatsTracker(); - public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, TaskManager taskManager) { this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); this.slowTaskLoggingThreshold = MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings); @@ -99,6 +106,7 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP this.starvationLoggingThreshold = MASTER_SERVICE_STARVATION_LOGGING_THRESHOLD_SETTING.get(settings); this.threadPool = threadPool; + this.taskManager = taskManager; } private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) { @@ -271,121 +279,140 @@ private void runTasks( logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary); clusterStateUpdateStatsTracker.onUnchangedClusterState(computationTime.millis(), executionTime.millis()); } else { - if (logger.isTraceEnabled()) { - logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState); - } else { - logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary); - } - final long publicationStartTime = threadPool.rawRelativeTimeInMillis(); - try { - final ClusterStatePublicationEvent clusterStatePublicationEvent = new ClusterStatePublicationEvent( - summary, - previousClusterState, - newClusterState, - computationTime.millis(), - publicationStartTime - ); + final Task task = taskManager.register("master", STATE_UPDATE_ACTION_NAME, new TaskAwareRequest() { + @Override + public void setParentTask(TaskId taskId) {} - // new cluster state, notify all listeners - final DiscoveryNodes.Delta nodesDelta = newClusterState.nodes().delta(previousClusterState.nodes()); - if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { - String nodesDeltaSummary = nodesDelta.shortSummary(); - if (nodesDeltaSummary.length() > 0) { - logger.info( - "{}, term: {}, version: {}, delta: {}", - summary, - newClusterState.term(), - newClusterState.version(), - nodesDeltaSummary - ); - } + @Override + public TaskId getParentTask() { + return TaskId.EMPTY_TASK_ID; } - logger.debug("publishing cluster state version [{}]", newClusterState.version()); - publish( - clusterStatePublicationEvent, - new CompositeTaskAckListener( - executionResults.stream() - .map(ExecutionResult::getContextPreservingAckListener) - .filter(Objects::nonNull) - .map( - contextPreservingAckListener -> new TaskAckListener( - contextPreservingAckListener, - newClusterState.version(), - newClusterState.nodes(), - threadPool - ) - ) - .toList() - ), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - final long notificationStartTime = threadPool.rawRelativeTimeInMillis(); - for (final var executionResult : executionResults) { - executionResult.onPublishSuccess(newClusterState); - } - - try { - executor.clusterStatePublished(newClusterState); - } catch (Exception e) { - logger.error( - () -> format( - "exception thrown while notifying executor of new cluster state publication [%s]", - summary - ), - e - ); - } - final TimeValue executionTime = getTimeSince(notificationStartTime); - logExecutionTime( - executionTime, - "notify listeners on successful publication of cluster state (version: " - + newClusterState.version() - + ", uuid: " - + newClusterState.stateUUID() - + ')', - summary - ); - clusterStateUpdateStatsTracker.onPublicationSuccess( - threadPool.rawRelativeTimeInMillis(), - clusterStatePublicationEvent, - executionTime.millis() + @Override + public String getDescription() { + return "publication of cluster state [" + newClusterState.getVersion() + "]"; + } + }); + try { + if (logger.isTraceEnabled()) { + logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState); + } else { + logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary); + } + final long publicationStartTime = threadPool.rawRelativeTimeInMillis(); + try { + final ClusterStatePublicationEvent clusterStatePublicationEvent = new ClusterStatePublicationEvent( + summary, + previousClusterState, + newClusterState, + task, + computationTime.millis(), + publicationStartTime + ); + + // new cluster state, notify all listeners + final DiscoveryNodes.Delta nodesDelta = newClusterState.nodes().delta(previousClusterState.nodes()); + if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { + String nodesDeltaSummary = nodesDelta.shortSummary(); + if (nodesDeltaSummary.length() > 0) { + logger.info( + "{}, term: {}, version: {}, delta: {}", + summary, + newClusterState.term(), + newClusterState.version(), + nodesDeltaSummary ); } + } - @Override - public void onFailure(Exception exception) { - if (exception instanceof FailedToCommitClusterStateException failedToCommitClusterStateException) { + logger.debug("publishing cluster state version [{}]", newClusterState.version()); + publish( + clusterStatePublicationEvent, + new CompositeTaskAckListener( + executionResults.stream() + .map(ExecutionResult::getContextPreservingAckListener) + .filter(Objects::nonNull) + .map( + contextPreservingAckListener -> new TaskAckListener( + contextPreservingAckListener, + newClusterState.version(), + newClusterState.nodes(), + threadPool + ) + ) + .toList() + ), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { final long notificationStartTime = threadPool.rawRelativeTimeInMillis(); - final long version = newClusterState.version(); - logger.warn( - () -> format("failing [%s]: failed to commit cluster state version [%s]", summary, version), - exception - ); for (final var executionResult : executionResults) { - executionResult.onPublishFailure(failedToCommitClusterStateException); + executionResult.onPublishSuccess(newClusterState); } - final long notificationMillis = threadPool.rawRelativeTimeInMillis() - notificationStartTime; - clusterStateUpdateStatsTracker.onPublicationFailure( - threadPool.rawRelativeTimeInMillis(), - clusterStatePublicationEvent, - notificationMillis + + try { + executor.clusterStatePublished(newClusterState); + } catch (Exception e) { + logger.error( + () -> format( + "exception thrown while notifying executor of new cluster state publication [%s]", + summary + ), + e + ); + } + final TimeValue executionTime = getTimeSince(notificationStartTime); + logExecutionTime( + executionTime, + "notify listeners on successful publication of cluster state (version: " + + newClusterState.version() + + ", uuid: " + + newClusterState.stateUUID() + + ')', + summary ); - } else { - assert publicationMayFail() : exception; - clusterStateUpdateStatsTracker.onPublicationFailure( + clusterStateUpdateStatsTracker.onPublicationSuccess( threadPool.rawRelativeTimeInMillis(), clusterStatePublicationEvent, - 0L + executionTime.millis() ); - handleException(summary, publicationStartTime, newClusterState, exception); + } + + @Override + public void onFailure(Exception exception) { + if (exception instanceof FailedToCommitClusterStateException failedToCommitClusterStateException) { + final long notificationStartTime = threadPool.rawRelativeTimeInMillis(); + final long version = newClusterState.version(); + logger.warn( + () -> format("failing [%s]: failed to commit cluster state version [%s]", summary, version), + exception + ); + for (final var executionResult : executionResults) { + executionResult.onPublishFailure(failedToCommitClusterStateException); + } + final long notificationMillis = threadPool.rawRelativeTimeInMillis() - notificationStartTime; + clusterStateUpdateStatsTracker.onPublicationFailure( + threadPool.rawRelativeTimeInMillis(), + clusterStatePublicationEvent, + notificationMillis + ); + } else { + assert publicationMayFail() : exception; + clusterStateUpdateStatsTracker.onPublicationFailure( + threadPool.rawRelativeTimeInMillis(), + clusterStatePublicationEvent, + 0L + ); + handleException(summary, publicationStartTime, newClusterState, exception); + } } } - } - ); - } catch (Exception e) { - handleException(summary, publicationStartTime, newClusterState, e); + ); + } catch (Exception e) { + handleException(summary, publicationStartTime, newClusterState, e); + } + } finally { + taskManager.unregister(task); } } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index 4c5cc62a5cdde..f0590b98bc034 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -1002,7 +1002,7 @@ public Set getNamespaces(Settings settings) { } /** - * Returns a map of all namespaces to it's values give the provided settings + * Returns a map of all namespaces to its values given the provided settings */ public Map getAsMap(Settings settings) { Map map = new HashMap<>(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index b0498e619fca5..f910bf4dea58b 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -133,7 +133,7 @@ public void clusterChanged(ClusterChangedEvent event) { } } - private void recover(StartRecoveryRequest request, ActionListener listener) { + private void recover(StartRecoveryRequest request, Task task, ActionListener listener) { final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); @@ -153,7 +153,7 @@ private void recover(StartRecoveryRequest request, ActionListener { @Override public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception { - recover(request, new ChannelActionListener<>(channel, Actions.START_RECOVERY, request)); + recover(request, task, new ChannelActionListener<>(channel, Actions.START_RECOVERY, request)); } } @@ -204,10 +204,10 @@ final class OngoingRecoveries { @Nullable private List> emptyListeners; - synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) { + synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, Task task, IndexShard shard) { assert lifecycle.started(); final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext()); - final Tuple handlers = shardContext.addNewRecovery(request, shard); + final Tuple handlers = shardContext.addNewRecovery(request, task, shard); final RemoteRecoveryTargetHandler recoveryTargetHandler = handlers.v2(); nodeToHandlers.computeIfAbsent(recoveryTargetHandler.targetNode(), k -> new HashSet<>()).add(recoveryTargetHandler); shard.recoveryStats().incCurrentAsSource(); @@ -307,6 +307,7 @@ private final class ShardRecoveryContext { */ synchronized Tuple addNewRecovery( StartRecoveryRequest request, + Task task, IndexShard shard ) { for (RecoverySourceHandler existingHandler : recoveryHandlers.keySet()) { @@ -317,7 +318,11 @@ synchronized Tuple addNewRec ); } } - final Tuple handlers = createRecoverySourceHandler(request, shard); + final Tuple handlers = createRecoverySourceHandler( + request, + task, + shard + ); recoveryHandlers.put(handlers.v1(), handlers.v2()); return handlers; } @@ -344,6 +349,7 @@ synchronized void reestablishRecovery(ReestablishRecoveryRequest request, Action private Tuple createRecoverySourceHandler( StartRecoveryRequest request, + Task task, IndexShard shard ) { RecoverySourceHandler handler; @@ -353,7 +359,8 @@ private Tuple createRecovery transportService, request.targetNode(), recoverySettings, - throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime) + throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime), + task ); handler = new RecoverySourceHandler( shard, diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 3ec72a07cfc1e..4d6cdcaba63bf 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.RemoteTransportException; @@ -68,6 +69,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { private final AtomicLong requestSeqNoGenerator = new AtomicLong(0); private final Consumer onSourceThrottle; + private final Task task; private volatile boolean isCancelled = false; public RemoteRecoveryTargetHandler( @@ -76,7 +78,8 @@ public RemoteRecoveryTargetHandler( TransportService transportService, DiscoveryNode targetNode, RecoverySettings recoverySettings, - Consumer onSourceThrottle + Consumer onSourceThrottle, + Task task ) { this.transportService = transportService; this.threadPool = transportService.getThreadPool(); @@ -94,6 +97,7 @@ public RemoteRecoveryTargetHandler( TransportRequestOptions.Type.RECOVERY ); this.standardTimeoutRequestOptions = TransportRequestOptions.timeout(recoverySettings.internalActionTimeout()); + this.task = task; } public DiscoveryNode targetNode() { @@ -342,10 +346,11 @@ private void executeRetryableAction( @Override public void tryAction(ActionListener listener) { if (request.tryIncRef()) { - transportService.sendRequest( + transportService.sendChildRequest( targetNode, action, request, + task, options, new ActionListenerResponseHandler<>( ActionListener.runBefore(listener, request::decRef), diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 519ac92ceb494..27195bb04746e 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -181,6 +181,7 @@ import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancellationService; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.tasks.TaskResultsService; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; @@ -408,6 +409,13 @@ protected Node( HeaderWarning.setThreadContext(threadPool.getThreadContext()); resourcesToClose.add(() -> HeaderWarning.removeThreadContext(threadPool.getThreadContext())); + final Set taskHeaders = Stream.concat( + pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()), + Task.HEADERS_TO_COPY.stream() + ).collect(Collectors.toSet()); + + final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders); + // register the node.data, node.ingest, node.master, node.remote_cluster_client settings here so we can mark them private final List> additionalSettings = new ArrayList<>(pluginsService.flatMap(Plugin::getSettings).toList()); for (final ExecutorBuilder builder : threadPool.builders()) { @@ -458,7 +466,12 @@ protected Node( ); List clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class); - final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool); + final ClusterService clusterService = new ClusterService( + settings, + settingsModule.getClusterSettings(), + threadPool, + taskManager + ); clusterService.addStateApplier(scriptService); resourcesToClose.add(clusterService); @@ -732,10 +745,6 @@ protected Node( } new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders); final Transport transport = networkModule.getTransportSupplier().get(); - Set taskHeaders = Stream.concat( - pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()), - Task.HEADERS_TO_COPY.stream() - ).collect(Collectors.toSet()); final TransportService transportService = newTransportService( settings, transport, @@ -743,7 +752,7 @@ protected Node( networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), - taskHeaders + taskManager ); final GatewayMetaState gatewayMetaState = new GatewayMetaState(); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); @@ -1101,9 +1110,9 @@ protected TransportService newTransportService( TransportInterceptor interceptor, Function localNodeFactory, ClusterSettings clusterSettings, - Set taskHeaders + TaskManager taskManager ) { - return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); + return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskManager); } protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 371fd9e09b434..1f614eaf42ed6 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -174,6 +174,28 @@ public String toString() { * @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}. */ + public TransportService( + Settings settings, + Transport transport, + ThreadPool threadPool, + TransportInterceptor transportInterceptor, + Function localNodeFactory, + @Nullable ClusterSettings clusterSettings, + TaskManager taskManager + ) { + this( + settings, + transport, + threadPool, + transportInterceptor, + localNodeFactory, + clusterSettings, + new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()), + taskManager + ); + } + + // NOTE: Only for use in tests public TransportService( Settings settings, Transport transport, @@ -190,8 +212,8 @@ public TransportService( transportInterceptor, localNodeFactory, clusterSettings, - taskHeaders, - new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()) + new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()), + new TaskManager(settings, threadPool, taskHeaders) ); } @@ -202,8 +224,8 @@ public TransportService( TransportInterceptor transportInterceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings, - Set taskHeaders, - ConnectionManager connectionManager + ConnectionManager connectionManager, + TaskManager taskManger ) { this.transport = transport; transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings)); @@ -214,7 +236,7 @@ public TransportService( setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings)); tracerLog = Loggers.getLogger(logger, ".tracer"); - taskManager = createTaskManager(settings, threadPool, taskHeaders); + this.taskManager = taskManger; this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); @@ -256,10 +278,6 @@ public TaskManager getTaskManager() { return taskManager; } - protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { - return new TaskManager(settings, threadPool, taskHeaders); - } - void setTracerLogInclude(List tracerLogInclude) { this.tracerLogInclude = tracerLogInclude.toArray(Strings.EMPTY_ARRAY); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 8a5a290b9e5ad..d6643f70482d1 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -52,7 +52,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -178,6 +177,12 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) { discoveryNode.set(new DiscoveryNode(name, address.publishAddress(), emptyMap(), emptySet(), Version.CURRENT)); return discoveryNode.get(); }; + TaskManager taskManager; + if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { + taskManager = new MockTaskManager(settings, threadPool, emptySet()); + } else { + taskManager = new TaskManager(settings, threadPool, emptySet()); + } transportService = new TransportService( settings, new Netty4Transport( @@ -194,18 +199,9 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) { TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null, - Collections.emptySet() - ) { - @Override - protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { - if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { - return new MockTaskManager(settings, threadPool, taskHeaders); - } else { - return super.createTaskManager(settings, threadPool, taskHeaders); - } - } - }; - transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService)); + taskManager + ); + taskManager.setTaskCancellationService(new TaskCancellationService(transportService)); transportService.start(); clusterService = createClusterService(threadPool, discoveryNode.get()); clusterService.addStateApplier(transportService.getTaskManager()); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveIndexActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveIndexActionTests.java index 26a31d2737b85..9b751702dfd6c 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveIndexActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveIndexActionTests.java @@ -63,7 +63,8 @@ public void writeTo(StreamOutput out) throws IOException { ClusterService clusterService = new ClusterService( settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + null ); ResolveIndexAction.TransportAction action = new ResolveIndexAction.TransportAction( transportService, diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesActionTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesActionTests.java index a2be6aae89c04..1e5f9762b1b5e 100644 --- a/server/src/test/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesActionTests.java @@ -66,7 +66,8 @@ protected void doWriteTo(StreamOutput out) throws IOException { ClusterService clusterService = new ClusterService( settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + null ); TransportFieldCapabilitiesAction action = new TransportFieldCapabilitiesAction( transportService, diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index b44605d72f1a0..6fe576e1b3247 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -1396,7 +1396,8 @@ protected void doWriteTo(StreamOutput out) throws IOException { ClusterService clusterService = new ClusterService( settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + null ); TransportSearchAction action = new TransportSearchAction( threadPool, diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 25869ce1f849c..4c01ad34527b4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.plugins.ClusterPlugin; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.gateway.TestGatewayAllocator; import java.util.Arrays; @@ -66,7 +67,8 @@ public void setUp() throws Exception { clusterService = new ClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - null + null, + (TaskManager) null ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index 5813cc3119cf3..dcb44183a5b66 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.core.TimeValue; import org.elasticsearch.monitor.StatusInfo; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest; @@ -32,6 +33,7 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -57,8 +59,8 @@ public void testJoinDeduplication() { TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> localNode, null, - Collections.emptySet(), - new ClusterConnectionManager(Settings.EMPTY, capturingTransport, threadPool.getThreadContext()) + new ClusterConnectionManager(Settings.EMPTY, capturingTransport, threadPool.getThreadContext()), + new TaskManager(Settings.EMPTY, threadPool, Set.of()) ); JoinHelper joinHelper = new JoinHelper( null, diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 7de1d752b5a4c..24d9d96ef9468 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.monitor.NodeHealthService; import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.node.Node; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; @@ -140,10 +141,12 @@ private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initia } private void setupRealMasterServiceAndCoordinator(long term, ClusterState initialState) { + final Settings settings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(); MasterService masterService = new MasterService( - Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(), + settings, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + new TaskManager(settings, threadPool, Set.of()) ); AtomicReference clusterStateRef = new AtomicReference<>(initialState); masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java index a4375e15e2316..52cc92f73c749 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java @@ -32,6 +32,8 @@ import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransport; @@ -52,6 +54,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static java.util.Collections.emptyMap; +import static org.elasticsearch.cluster.service.MasterService.STATE_UPDATE_ACTION_NAME; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -101,7 +105,14 @@ public void writeTo(StreamOutput out) throws IOException { final ElasticsearchException e = expectThrows( ElasticsearchException.class, () -> handler.newPublicationContext( - new ClusterStatePublicationEvent(new BatchSummary("test"), clusterState, unserializableClusterState, 0L, 0L) + new ClusterStatePublicationEvent( + new BatchSummary("test"), + clusterState, + unserializableClusterState, + new Task(randomNonNegativeLong(), "test", STATE_UPDATE_ACTION_NAME, "", TaskId.EMPTY_TASK_ID, emptyMap()), + 0L, + 0L + ) ) ); assertNotNull(e.getCause()); @@ -276,7 +287,14 @@ public void writeTo(StreamOutput out) throws IOException { final PublicationTransportHandler.PublicationContext context; try { context = handler.newPublicationContext( - new ClusterStatePublicationEvent(new BatchSummary("test"), prevClusterState, nextClusterState, 0L, 0L) + new ClusterStatePublicationEvent( + new BatchSummary("test"), + prevClusterState, + nextClusterState, + new Task(randomNonNegativeLong(), "test", STATE_UPDATE_ACTION_NAME, "", TaskId.EMPTY_TASK_ID, emptyMap()), + 0L, + 0L + ) ); } catch (ElasticsearchException e) { assertTrue(simulateFailures); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index f81d72fb2cf81..7cdc310b89a64 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -42,10 +42,14 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.node.Node; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.tasks.MockTaskManager; +import org.elasticsearch.test.tasks.MockTaskManagerListener; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -69,6 +73,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; @@ -109,14 +114,25 @@ public void randomizeCurrentTime() { } private MasterService createMasterService(boolean makeMaster) { + return createMasterService(makeMaster, null); + } + + private MasterService createMasterService(boolean makeMaster, TaskManager taskManager) { final DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + final Settings settings = Settings.builder() + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) + .put(Node.NODE_NAME_SETTING.getKey(), "test_node") + .build(); + + if (taskManager == null) { + taskManager = new TaskManager(settings, threadPool, emptySet()); + } + final MasterService masterService = new MasterService( - Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) - .put(Node.NODE_NAME_SETTING.getKey(), "test_node") - .build(), + settings, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + taskManager ); final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) .nodes( @@ -177,6 +193,55 @@ public void onFailure(Exception e) { nonMaster.close(); } + /** + * Check that when the master service publishes a cluster state update, it uses a dedicated task. + */ + public void testCreatesChildTaskForPublishingClusterState() throws Exception { + final MockTaskManager taskManager = new MockTaskManager(Settings.EMPTY, threadPool, emptySet()); + + final List registeredActions = new ArrayList<>(); + taskManager.addListener(new MockTaskManagerListener() { + @Override + public void onTaskRegistered(Task task) { + registeredActions.add(task.getAction()); + } + + @Override + public void onTaskUnregistered(Task task) {} + + @Override + public void waitForTaskCompletion(Task task) {} + }); + + final CountDownLatch latch = new CountDownLatch(1); + + try (MasterService masterService = createMasterService(true, taskManager)) { + masterService.submitStateUpdateTask( + "testCreatesChildTaskForPublishingClusterState", + new ExpectSuccessTask(), + ClusterStateTaskConfig.build(Priority.NORMAL), + new ClusterStateTaskExecutor<>() { + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) { + for (final var taskContext : taskContexts) { + taskContext.success(() -> {}); + } + return ClusterState.builder(currentState).build(); + } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + latch.countDown(); + } + } + ); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } + + assertThat(registeredActions.toString(), registeredActions, contains(MasterService.STATE_UPDATE_ACTION_NAME)); + } + public void testThreadContext() throws InterruptedException { final MasterService master = createMasterService(true); final CountDownLatch latch = new CountDownLatch(1); @@ -1062,17 +1127,18 @@ public void testLongClusterStateUpdateLogging() throws Exception { Logger clusterLogger = LogManager.getLogger(MasterService.class); Loggers.addAppender(clusterLogger, mockAppender); + final Settings settings = Settings.builder() + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) + .put(Node.NODE_NAME_SETTING.getKey(), "test_node") + .build(); try ( MasterService masterService = new MasterService( - Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) - .put(Node.NODE_NAME_SETTING.getKey(), "test_node") - .build(), + settings, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + new TaskManager(settings, threadPool, emptySet()) ) ) { - final DiscoveryNode localNode = new DiscoveryNode( "node1", buildNewFakeTransportAddress(), @@ -1243,14 +1309,16 @@ public void testAcking() throws InterruptedException { final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); final DiscoveryNode node3 = new DiscoveryNode("node3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + final Settings settings = Settings.builder() + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) + .put(Node.NODE_NAME_SETTING.getKey(), "test_node") + .build(); try ( MasterService masterService = new MasterService( - Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) - .put(Node.NODE_NAME_SETTING.getKey(), "test_node") - .build(), + settings, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + new TaskManager(settings, threadPool, emptySet()) ) ) { diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index 53c163172fce5..fa8a9f3eafc34 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -233,8 +234,8 @@ public void setup() { TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, - emptySet(), - connectionManager + connectionManager, + new TaskManager(settings, threadPool, emptySet()) ); transportService.start(); diff --git a/server/src/test/java/org/elasticsearch/gateway/ClusterStateUpdatersTests.java b/server/src/test/java/org/elasticsearch/gateway/ClusterStateUpdatersTests.java index cb94458859833..ad915f45f7215 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ClusterStateUpdatersTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ClusterStateUpdatersTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import java.util.Arrays; @@ -90,7 +91,7 @@ public String getValue(final String value) { }) ); - final ClusterService clusterService = new ClusterService(Settings.EMPTY, clusterSettings, null); + final ClusterService clusterService = new ClusterService(Settings.EMPTY, clusterSettings, null, (TaskManager) null); final Metadata.Builder builder = Metadata.builder(); final Settings settings = Settings.builder().put("foo.old", randomAlphaOfLength(8)).build(); applySettingsToBuilder.accept(builder, settings); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java index e94d9e2135096..8d6be37086719 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; @@ -35,7 +36,8 @@ private GatewayService createService(final Settings.Builder settings) { final ClusterService clusterService = new ClusterService( Settings.builder().put("cluster.name", "GatewayServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - null + null, + (TaskManager) null ); return new GatewayService(settings.build(), (reason, priority, listener) -> fail("should not reroute"), clusterService, null); } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 773b6e0dd71e0..4990d86330f0c 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -89,6 +89,7 @@ import org.elasticsearch.indices.ShardLimitValidator; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.snapshots.EmptySnapshotsInfoService; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.gateway.TestGatewayAllocator; import org.elasticsearch.threadpool.ThreadPool; @@ -162,7 +163,12 @@ public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool th Environment environment = TestEnvironment.newEnvironment(SETTINGS); Transport transport = mock(Transport.class); // it's not used - final var masterService = new MasterService(SETTINGS, clusterSettings, threadPool) { + final var masterService = new MasterService( + SETTINGS, + clusterSettings, + threadPool, + new TaskManager(SETTINGS, threadPool, Collections.emptySet()) + ) { @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { // run master tasks inline, no need to fork to a separate thread diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java index 7ce07dc1178e6..66503bf4cea79 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -17,11 +17,15 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.NodeRoles; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Collections; +import static org.elasticsearch.indices.recovery.PeerRecoverySourceService.Actions.START_RECOVERY; import static org.hamcrest.Matchers.containsString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -52,15 +56,29 @@ public void testDuplicateRecoveries() throws IOException { true ); peerRecoverySourceService.start(); - RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary); + + final Task recoveryTask = new Task( + randomNonNegativeLong(), + "test", + START_RECOVERY, + "", + TaskId.EMPTY_TASK_ID, + Collections.emptyMap() + ); + + RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery( + startRecoveryRequest, + recoveryTask, + primary + ); DelayRecoveryException delayRecoveryException = expectThrows( DelayRecoveryException.class, - () -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary) + () -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, recoveryTask, primary) ); assertThat(delayRecoveryException.getMessage(), containsString("recovery with same target already registered")); peerRecoverySourceService.ongoingRecoveries.remove(primary, handler); // re-adding after removing previous attempt works - handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary); + handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, recoveryTask, primary); peerRecoverySourceService.ongoingRecoveries.remove(primary, handler); closeShards(primary); } diff --git a/server/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java b/server/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java index daa2a09e5b79b..efc615403e169 100644 --- a/server/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/node/ResponseCollectorServiceTests.java @@ -42,7 +42,8 @@ public void setUp() throws Exception { clusterService = new ClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadpool + threadpool, + null ); collector = new ResponseCollectorService(clusterService); } diff --git a/server/src/test/java/org/elasticsearch/readiness/ReadinessServiceTests.java b/server/src/test/java/org/elasticsearch/readiness/ReadinessServiceTests.java index 3f89d1b6a5ee7..9c9167a8d546c 100644 --- a/server/src/test/java/org/elasticsearch/readiness/ReadinessServiceTests.java +++ b/server/src/test/java/org/elasticsearch/readiness/ReadinessServiceTests.java @@ -97,7 +97,8 @@ public void setUp() throws Exception { clusterService = new ClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadpool + threadpool, + null ); env = newEnvironment(Settings.builder().put(ReadinessService.PORT.getKey(), 0).build()); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java index a8945a0f8ea5e..7b4a2283dbd41 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java @@ -20,10 +20,12 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.Node; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -46,11 +48,23 @@ public FakeThreadPoolMasterService( ThreadPool threadPool, Consumer onTaskAvailableToRun ) { - super( + this( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + serviceName, + onTaskAvailableToRun ); + } + + private FakeThreadPoolMasterService( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + String serviceName, + Consumer onTaskAvailableToRun + ) { + super(settings, clusterSettings, threadPool, new TaskManager(settings, threadPool, Set.of())); this.name = serviceName; this.onTaskAvailableToRun = onTaskAvailableToRun; } diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 79794127d57e8..2e8862d4017d2 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -37,6 +37,7 @@ import org.elasticsearch.search.MockSearchService; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.fetch.FetchPhase; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockHttpTransport; import org.elasticsearch.test.transport.MockTransportService; @@ -49,8 +50,8 @@ import java.nio.file.Path; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.function.Function; import java.util.function.LongSupplier; @@ -187,16 +188,24 @@ protected TransportService newTransportService( TransportInterceptor interceptor, Function localNodeFactory, ClusterSettings clusterSettings, - Set taskHeaders + TaskManager taskManager ) { // we use the MockTransportService.TestPlugin class as a marker to create a network // module with this MockNetworkService. NetworkService is such an integral part of the systme // we don't allow to plug it in from plugins or anything. this is a test-only override and // can't be done in a production env. if (getPluginsService().filterPlugins(MockTransportService.TestPlugin.class).isEmpty()) { - return super.newTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); + return super.newTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskManager); } else { - return new MockTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); + return new MockTransportService( + settings, + transport, + threadPool, + interceptor, + localNodeFactory, + clusterSettings, + new HashSet<>(taskManager.getTaskHeaders()) + ); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index 87044dccf37d4..7f9eebece277f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; @@ -121,7 +122,12 @@ public static ClusterService createClusterService(ThreadPool threadPool, Cluster public static ClusterService createClusterService(ThreadPool threadPool, DiscoveryNode localNode, ClusterSettings clusterSettings) { Settings settings = Settings.builder().put("node.name", "test").put("cluster.name", "ClusterServiceTests").build(); - ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); + ClusterService clusterService = new ClusterService( + settings, + clusterSettings, + threadPool, + new TaskManager(settings, threadPool, Collections.emptySet()) + ); clusterService.setNodeConnectionsService(createNoOpNodeConnectionsService()); ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId())) diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java index 397f200b4824d..c4bb1e4abeda0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.core.Nullable; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.CloseableConnection; @@ -76,7 +77,15 @@ public TransportService createTransportService( @Nullable ClusterSettings clusterSettings, Set taskHeaders ) { - return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); + return new TransportService( + settings, + this, + threadPool, + interceptor, + localNodeFactory, + clusterSettings, + new TaskManager(settings, threadPool, taskHeaders) + ); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java index b19c989d5bdc3..c7277c593a4ba 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.CloseableConnection; import org.elasticsearch.transport.ClusterConnectionManager; @@ -69,8 +70,8 @@ public TransportService createTransportService( interceptor, localNodeFactory, clusterSettings, - taskHeaders, - connectionManager + connectionManager, + new TaskManager(settings, threadPool, taskHeaders) ); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 669e69b36f47a..15a8096ded49c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -149,7 +149,7 @@ public static MockTransportService createNewService( ) { return new MockTransportService( settings, - transport, + new StubbableTransport(transport), threadPool, interceptor, boundAddress -> new DiscoveryNode( @@ -161,7 +161,7 @@ public static MockTransportService createNewService( version ), clusterSettings, - taskHeaders + createTaskManager(settings, threadPool, taskHeaders) ); } @@ -183,7 +183,7 @@ public MockTransportService( ) { this( settings, - transport, + new StubbableTransport(transport), threadPool, interceptor, (boundAddress) -> DiscoveryNode.createLocal( @@ -192,7 +192,7 @@ public MockTransportService( settings.get(Node.NODE_NAME_SETTING.getKey(), UUIDs.randomBase64UUID()) ), clusterSettings, - Collections.emptySet() + createTaskManager(settings, threadPool, Set.of()) ); } @@ -212,7 +212,34 @@ public MockTransportService( @Nullable ClusterSettings clusterSettings, Set taskHeaders ) { - this(settings, new StubbableTransport(transport), threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); + this( + settings, + new StubbableTransport(transport), + threadPool, + interceptor, + localNodeFactory, + clusterSettings, + createTaskManager(settings, threadPool, taskHeaders) + ); + } + + public MockTransportService( + Settings settings, + Transport transport, + ThreadPool threadPool, + TransportInterceptor interceptor, + Function localNodeFactory, + @Nullable ClusterSettings clusterSettings + ) { + this( + settings, + new StubbableTransport(transport), + threadPool, + interceptor, + localNodeFactory, + clusterSettings, + createTaskManager(settings, threadPool, Set.of()) + ); } private MockTransportService( @@ -222,7 +249,7 @@ private MockTransportService( TransportInterceptor interceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings, - Set taskHeaders + TaskManager taskManager ) { super( settings, @@ -231,8 +258,8 @@ private MockTransportService( interceptor, localNodeFactory, clusterSettings, - taskHeaders, - new StubbableConnectionManager(new ClusterConnectionManager(settings, transport, threadPool.getThreadContext())) + new StubbableConnectionManager(new ClusterConnectionManager(settings, transport, threadPool.getThreadContext())), + taskManager ); this.original = transport.getDelegate(); } @@ -245,12 +272,11 @@ private static TransportAddress[] extractTransportAddresses(TransportService tra return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]); } - @Override - protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { + private static TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return super.createTaskManager(settings, threadPool, taskHeaders); + return new TaskManager(settings, threadPool, taskHeaders); } } diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java index 5d76426b0c311..822a6753c1df4 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java @@ -61,12 +61,7 @@ public void testFakeMasterService() { doAnswer(invocationOnMock -> runnableTasks.add((Runnable) invocationOnMock.getArguments()[0])).when(executorService).execute(any()); when(mockThreadPool.generic()).thenReturn(executorService); - FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService( - "test_node", - "test", - mockThreadPool, - runnableTasks::add - ); + MasterService masterService = new FakeThreadPoolMasterService("test_node", "test", mockThreadPool, runnableTasks::add); masterService.setClusterStateSupplier(lastClusterStateRef::get); masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> { ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java index f2e67e7a710b5..1ee4c9b7f8d4e 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java @@ -456,12 +456,12 @@ public void testStoppedPriority() { ) ); final SetOnce task = new SetOnce<>(); - ClusterService fakeService = new ClusterService(Settings.EMPTY, clusterSettings, threadPool) { + ClusterService fakeService = new ClusterService(Settings.EMPTY, clusterSettings, threadPool, null) { @Override public void submitUnbatchedStateUpdateTask(String source, ClusterStateUpdateTask updateTask) { logger.info("--> got task: [source: {}]: {}", source, updateTask); if (updateTask instanceof OperationModeUpdateTask) { - task.set((OperationModeUpdateTask) updateTask); + task.set(updateTask); } } }; diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java index ab5f7f3d04d50..1be8afe96194b 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java @@ -72,7 +72,7 @@ public void createComponents() throws Exception { ) ) ); - ClusterService clusterService = new ClusterService(Settings.EMPTY, clusterSettings, tp); + ClusterService clusterService = new ClusterService(Settings.EMPTY, clusterSettings, tp, null); OriginSettingClient originSettingClient = new OriginSettingClient(client(), ML_ORIGIN); ResultsPersisterService resultsPersisterService = new ResultsPersisterService( diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 73ad580dc00ae..d99aa96656074 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -161,7 +161,7 @@ public void createComponents() throws Exception { ) ) ); - ClusterService clusterService = new ClusterService(settings, clusterSettings, tp); + ClusterService clusterService = new ClusterService(settings, clusterSettings, tp, null); OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN); resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, settings); resultProcessor = new AutodetectResultProcessor( diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java index ce48e81dc2d9d..5b297846a6428 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java @@ -60,7 +60,7 @@ public void createComponents() { ) ) ); - ClusterService clusterService = new ClusterService(settings, clusterSettings, tp); + ClusterService clusterService = new ClusterService(settings, clusterSettings, tp, null); OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN); ResultsPersisterService resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, settings); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobModelSnapshotCRUDIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobModelSnapshotCRUDIT.java index 8601b3023d588..a141f8624425c 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobModelSnapshotCRUDIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobModelSnapshotCRUDIT.java @@ -72,7 +72,7 @@ public void createComponents() throws Exception { ) ) ); - ClusterService clusterService = new ClusterService(Settings.EMPTY, clusterSettings, tp); + ClusterService clusterService = new ClusterService(Settings.EMPTY, clusterSettings, tp, null); OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN); ResultsPersisterService resultsPersisterService = new ResultsPersisterService( diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index 168e20fb93c09..306a383d2ea0c 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -130,7 +130,7 @@ public void createComponents() throws Exception { ) ) ); - ClusterService clusterService = new ClusterService(builder.build(), clusterSettings, tp); + ClusterService clusterService = new ClusterService(builder.build(), clusterSettings, tp, null); OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN); resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, builder.build()); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java index 7d5c6b291bd55..5b8a966ad01ca 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java @@ -75,7 +75,7 @@ public void createComponents() { ) ) ); - ClusterService clusterService = new ClusterService(settings, clusterSettings, tp); + ClusterService clusterService = new ClusterService(settings, clusterSettings, tp, null); OriginSettingClient originSettingClient = new OriginSettingClient(client(), ClientHelper.ML_ORIGIN); ResultsPersisterService resultsPersisterService = new ResultsPersisterService(tp, originSettingClient, clusterService, settings); jobResultsProvider = new JobResultsProvider(client(), settings, TestIndexNameExpressionResolver.newInstance()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java index 7ee464a32e980..04ded3e1fad12 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java @@ -124,7 +124,7 @@ public void setUpVariables() { ) ) ); - clusterService = new ClusterService(settings, clusterSettings, tp); + clusterService = new ClusterService(settings, clusterSettings, tp, null); ingestService = new IngestService(clusterService, tp, null, null, null, Collections.singletonList(SKINNY_INGEST_PLUGIN), client); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java index 10daa4e3efc29..d3ad7e4418241 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java @@ -96,7 +96,8 @@ public void init() { clusterService = new ClusterService( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(), clusterSettings, - threadPool + threadPool, + null ); clusterService.getClusterApplierService() .setInitialState( diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java index 5398719f7832e..7f02c0b254af1 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java @@ -81,7 +81,7 @@ public void setUpVariables() { ) ) ); - clusterService = new ClusterService(settings, clusterSettings, tp); + clusterService = new ClusterService(settings, clusterSettings, tp, null); } public void testCreateProcessorWithTooManyExisting() throws Exception { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index 4f7791a9507a5..e73ab615045f9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -423,7 +423,7 @@ private ResultsPersisterService buildResultsPersisterService(OriginSettingClient ) ) ); - ClusterService clusterService = new ClusterService(Settings.EMPTY, clusterSettings, tp); + ClusterService clusterService = new ClusterService(Settings.EMPTY, clusterSettings, tp, null); ExecutorService executor = mock(ExecutorService.class); doAnswer(invocationOnMock -> { ((Runnable) invocationOnMock.getArguments()[0]).run(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java index 04bec4f22634a..de062997f4e09 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java @@ -107,7 +107,7 @@ public void setUpMocks() { ) ) ); - clusterService = new ClusterService(settings, clusterSettings, tp); + clusterService = new ClusterService(settings, clusterSettings, tp, null); autodetectProcessManager = mock(AutodetectProcessManager.class); datafeedConfigProvider = mock(DatafeedConfigProvider.class); client = mock(Client.class); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java index 7e0db4f282032..de7007eda9146 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java @@ -404,7 +404,7 @@ public static ResultsPersisterService buildResultsPersisterService(OriginSetting ) ) ); - ClusterService clusterService = new ClusterService(Settings.EMPTY, clusterSettings, tp); + ClusterService clusterService = new ClusterService(Settings.EMPTY, clusterSettings, tp, null); ExecutorService executor = mock(ExecutorService.class); doAnswer(invocationOnMock -> { ((Runnable) invocationOnMock.getArguments()[0]).run(); diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java index 1e3d89b565e0a..a4d52f0d278ef 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.search.suggest.completion.CompletionStats; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; @@ -168,7 +169,12 @@ public void createComponents() { transformCheckpointService = new TransformCheckpointService( Clock.systemUTC(), Settings.EMPTY, - new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), + new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null, + mock(TaskManager.class) + ), transformsConfigManager, mockAuditor ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index 7d6e4924d22ed..fd7747c48d559 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; @@ -406,7 +407,12 @@ public TransformPersistentTasksExecutor buildTaskExecutor() { TransformCheckpointService transformCheckpointService = new TransformCheckpointService( clock, Settings.EMPTY, - new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), + new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null, + (TaskManager) null + ), transformsConfigManager, mockAuditor ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index be12730af3f90..9885f247f255d 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -93,7 +93,12 @@ public void testStopOnFailedTaskWithStoppedIndexer() { TransformCheckpointService transformsCheckpointService = new TransformCheckpointService( clock, Settings.EMPTY, - new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), + new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null, + (TaskManager) null + ), transformsConfigManager, auditor );