Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve master service batching queues #92021

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
809a556
Improve master service batching queues
DaveCTurner Nov 30, 2022
0370148
Update docs/changelog/92021.yaml
DaveCTurner Nov 30, 2022
4cb1140
Update docs/changelog/92021.yaml
DaveCTurner Nov 30, 2022
35b334d
Merge branch 'main' into 2022-11-30-new-master-service-queues
elasticmachine Nov 30, 2022
83ea0e7
Merge branch 'main' into 2022-11-30-new-master-service-queues
elasticmachine Dec 1, 2022
0c88f4f
Merge branch 'main' into TMP
DaveCTurner Dec 20, 2022
e856717
getTaskQueue -> createTaskQueue
DaveCTurner Dec 20, 2022
fc89fe3
Note that forkQueueProcessor is single-threaded
DaveCTurner Dec 20, 2022
0797f30
CountedQueue -> PerPriorityQueue
DaveCTurner Dec 20, 2022
5f85269
Correct comment
DaveCTurner Dec 20, 2022
b199b42
Avoid cast with wildcard
DaveCTurner Dec 20, 2022
781da78
EnumMap doesn't need separate array
DaveCTurner Dec 20, 2022
765a9d2
Unused
DaveCTurner Dec 20, 2022
d6a18d3
Merge branch 'main' into 2022-11-30-new-master-service-queues
DaveCTurner Dec 31, 2022
9cfa0e7
Update MetadataIndexAliasesService
DaveCTurner Dec 31, 2022
52ac444
Fix tests
DaveCTurner Dec 31, 2022
414045a
Merge branch 'main' into 2022-11-30-new-master-service-queues
DaveCTurner Jan 5, 2023
4e56e4d
Merge branch 'main' into 2022-11-30-new-master-service-queues
DaveCTurner Feb 22, 2023
c7f79bf
Fixup merge
DaveCTurner Feb 22, 2023
bfccca2
Simplify submitUnbatchedStateUpdateTask
DaveCTurner Feb 23, 2023
a22db27
Extract common allBatchesStream()
DaveCTurner Feb 23, 2023
76d99f4
Consistency assertions on currentlyExecutingBatch
DaveCTurner Feb 23, 2023
d565881
Rename item -> batch
DaveCTurner Feb 23, 2023
ff22d79
Merge branch 'main' into 2022-11-30-new-master-service-queues
DaveCTurner Feb 23, 2023
ed56710
Merge branch 'main' into 2022-11-30-new-master-service-queues
DaveCTurner Feb 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/92021.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 92021
summary: Improve master service batching queues
area: Cluster Coordination
type: enhancement
issues:
- 81626
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -50,13 +50,14 @@ public class UpdateTimeSeriesRangeService extends AbstractLifecycleComponent imp
volatile TimeValue pollInterval;
volatile Scheduler.Cancellable job;
private final AtomicBoolean running = new AtomicBoolean(false);
private final ClusterStateTaskExecutor<UpdateTimeSeriesTask> taskExecutor = new UpdateTimeSeriesExecutor();
private final MasterServiceTaskQueue<UpdateTimeSeriesTask> taskQueue;

UpdateTimeSeriesRangeService(Settings settings, ThreadPool threadPool, ClusterService clusterService) {
this.pollInterval = DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.get(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
clusterService.getClusterSettings().addSettingsUpdateConsumer(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL, this::setPollInterval);
this.taskQueue = clusterService.getTaskQueue("update-time-series-range", Priority.URGENT, new UpdateTimeSeriesExecutor());
}

void perform(Runnable onComplete) {
Expand All @@ -69,8 +70,7 @@ void perform(Runnable onComplete) {
running.set(false);
onComplete.run();
});
var config = ClusterStateTaskConfig.build(Priority.URGENT);
clusterService.submitStateUpdateTask("update_tsdb_data_stream_end_times", task, config, taskExecutor);
taskQueue.submitTask("update_tsdb_data_stream_end_times", task, null);
} else {
LOGGER.debug("not starting tsdb update task, because another execution is still running");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws In
"test",
EsExecutors.daemonThreadFactory("test"),
threadPool.getThreadContext(),
threadPool.scheduler(),
PrioritizedEsThreadPoolExecutor.StarvationWatcher.NOOP_STARVATION_WATCHER
threadPool.scheduler()
);
try {
checkExecutionError(getExecuteRunner(prioritizedExecutor));
Expand Down Expand Up @@ -208,8 +207,7 @@ public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throw
"test",
EsExecutors.daemonThreadFactory("test"),
threadPool.getThreadContext(),
threadPool.scheduler(),
PrioritizedEsThreadPoolExecutor.StarvationWatcher.NOOP_STARVATION_WATCHER
threadPool.scheduler()
);
try {
checkExecutionException(getExecuteRunner(prioritizedExecutor), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -28,17 +27,12 @@ public class AutoCreateIndexIT extends ESIntegTestCase {
public void testBatchingWithDeprecationWarnings() throws Exception {
final var masterNodeClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var barrier = new CyclicBarrier(2);
masterNodeClusterService.submitStateUpdateTask(
"block",
e -> { assert false : e; },
ClusterStateTaskConfig.build(Priority.NORMAL),
batchExecutionContext -> {
barrier.await(10, TimeUnit.SECONDS);
barrier.await(10, TimeUnit.SECONDS);
batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {}));
return batchExecutionContext.initialState();
}
);
masterNodeClusterService.getTaskQueue("block", Priority.NORMAL, batchExecutionContext -> {
barrier.await(10, TimeUnit.SECONDS);
barrier.await(10, TimeUnit.SECONDS);
batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {}));
return batchExecutionContext.initialState();
}).submitTask("block", e -> { assert false : e; }, null);

barrier.await(10, TimeUnit.SECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SimpleBatchedExecutor;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DesiredNodesMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.Tuple;
Expand All @@ -31,7 +30,7 @@

public class TransportDeleteDesiredNodesAction extends TransportMasterNodeAction<DeleteDesiredNodesAction.Request, ActionResponse.Empty> {

private final ClusterStateTaskExecutor<DeleteDesiredNodesTask> taskExecutor = new DeleteDesiredNodesExecutor();
private final MasterServiceTaskQueue<DeleteDesiredNodesTask> taskQueue;

@Inject
public TransportDeleteDesiredNodesAction(
Expand All @@ -52,6 +51,7 @@ public TransportDeleteDesiredNodesAction(
in -> ActionResponse.Empty.INSTANCE,
ThreadPool.Names.SAME
);
this.taskQueue = clusterService.getTaskQueue("delete-desired-nodes", Priority.HIGH, new DeleteDesiredNodesExecutor());
}

@Override
Expand All @@ -61,12 +61,7 @@ protected void masterOperation(
ClusterState state,
ActionListener<ActionResponse.Empty> listener
) throws Exception {
clusterService.submitStateUpdateTask(
"delete-desired-nodes",
new DeleteDesiredNodesTask(listener),
ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),
taskExecutor
);
taskQueue.submitTask("delete-desired-nodes", new DeleteDesiredNodesTask(listener), request.masterNodeTimeout());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -27,6 +26,7 @@
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
Expand All @@ -41,7 +41,7 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction
private static final Logger logger = LogManager.getLogger(TransportUpdateDesiredNodesAction.class);

private final DesiredNodesSettingsValidator settingsValidator;
private final ClusterStateTaskExecutor<UpdateDesiredNodesTask> taskExecutor;
private final MasterServiceTaskQueue<UpdateDesiredNodesTask> taskQueue;

@Inject
public TransportUpdateDesiredNodesAction(
Expand All @@ -66,7 +66,11 @@ public TransportUpdateDesiredNodesAction(
ThreadPool.Names.SAME
);
this.settingsValidator = settingsValidator;
this.taskExecutor = new UpdateDesiredNodesExecutor(clusterService.getRerouteService(), allocationService);
this.taskQueue = clusterService.getTaskQueue(
"update-desired-nodes",
Priority.URGENT,
new UpdateDesiredNodesExecutor(clusterService.getRerouteService(), allocationService)
);
}

@Override
Expand All @@ -83,12 +87,7 @@ protected void masterOperation(
) throws Exception {
try {
settingsValidator.validate(request.getNodes());
clusterService.submitStateUpdateTask(
"update-desired-nodes",
new UpdateDesiredNodesTask(request, listener),
ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()),
taskExecutor
);
taskQueue.submitTask("update-desired-nodes", new UpdateDesiredNodesTask(request, listener), request.masterNodeTimeout());
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.LocalMasterServiceTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -163,13 +165,20 @@ public void onFailure(Exception e) {
if (e instanceof ProcessClusterEventTimeoutException) {
listener.onResponse(getResponse(request, clusterService.state(), waitCount, TimeoutState.TIMED_OUT));
} else {
final Level level = e instanceof NotMasterException ? Level.TRACE : Level.ERROR;
assert e instanceof NotMasterException : e; // task cannot fail, nor will it trigger a publication which fails
final Level level = isExpectedFailure(e) ? Level.TRACE : Level.ERROR;
logger.log(level, () -> "unexpected failure during [" + source + "]", e);
assert isExpectedFailure(e) : e; // task cannot fail, nor will it trigger a publication which fails
// TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException
listener.onFailure(e);
}
}

static boolean isExpectedFailure(Exception e) {
return e instanceof NotMasterException
|| e instanceof FailedToCommitClusterStateException
&& e.getCause()instanceof EsRejectedExecutionException esre
&& esre.isExecutorShutdown();
Comment on lines +178 to +180
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today's MasterService silently drops stuff when it's shut down. With this change we become stricter about rejecting work explicitly in this case.

}
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateAckListener;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -36,6 +35,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionMultiListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -77,7 +77,7 @@ public static final class TransportAction extends TransportMasterNodeAction<Crea
private final AutoCreateIndex autoCreateIndex;
private final SystemIndices systemIndices;

private final ClusterStateTaskExecutor<CreateIndexTask> executor;
private final MasterServiceTaskQueue<CreateIndexTask> taskQueue;

@Inject
public TransportAction(
Expand Down Expand Up @@ -107,7 +107,7 @@ public TransportAction(
this.createIndexService = createIndexService;
this.metadataCreateDataStreamService = metadataCreateDataStreamService;
this.autoCreateIndex = autoCreateIndex;
this.executor = batchExecutionContext -> {
this.taskQueue = clusterService.getTaskQueue("auto-create", Priority.URGENT, batchExecutionContext -> {
final var listener = new AllocationActionMultiListener<CreateIndexResponse>(threadPool.getThreadContext());
final var taskContexts = batchExecutionContext.taskContexts();
final var successfulRequests = Maps.<CreateIndexRequest, String>newMapWithExpectedSize(taskContexts.size());
Expand All @@ -129,7 +129,7 @@ public TransportAction(
listener.noRerouteNeeded();
}
return state;
};
});
}

@Override
Expand All @@ -139,11 +139,10 @@ protected void masterOperation(
ClusterState state,
ActionListener<CreateIndexResponse> listener
) {
clusterService.submitStateUpdateTask(
taskQueue.submitTask(
"auto create [" + request.index() + "]",
new CreateIndexTask(request, listener),
ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()),
executor
request.masterNodeTimeout()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -35,6 +34,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionMultiListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -64,7 +64,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
private static final Logger logger = LogManager.getLogger(TransportRolloverAction.class);

private final Client client;
private final RolloverExecutor rolloverTaskExecutor;
private final MasterServiceTaskQueue<RolloverTask> rolloverTaskQueue;

@Inject
public TransportRolloverAction(
Expand All @@ -89,7 +89,11 @@ public TransportRolloverAction(
ThreadPool.Names.SAME
);
this.client = client;
this.rolloverTaskExecutor = new RolloverExecutor(clusterService, allocationService, rolloverService, threadPool);
this.rolloverTaskQueue = clusterService.getTaskQueue(
"rollover",
Priority.NORMAL,
new RolloverExecutor(clusterService, allocationService, rolloverService, threadPool)
);
}

@Override
Expand Down Expand Up @@ -173,8 +177,7 @@ protected void masterOperation(
if (rolloverRequest.areConditionsMet(trialConditionResults)) {
String source = "rollover_index source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]";
RolloverTask rolloverTask = new RolloverTask(rolloverRequest, statsResponse, trialRolloverResponse, listener);
ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.NORMAL, rolloverRequest.masterNodeTimeout());
clusterService.submitStateUpdateTask(source, rolloverTask, config, rolloverTaskExecutor);
rolloverTaskQueue.submitTask(source, rolloverTask, rolloverRequest.masterNodeTimeout());
} else {
// conditions not met
listener.onResponse(trialRolloverResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
* across master elections (and therefore is preserved in a rolling restart).
* <p>
* Updates are triggered by submitting tasks to the {@link MasterService} on the elected master, typically using a {@link
* TransportMasterNodeAction} to route a request to the master on which the task is submitted with {@link
* ClusterService#submitStateUpdateTask}. Submitted tasks have an associated {@link ClusterStateTaskConfig} which defines a priority and a
* TransportMasterNodeAction} to route a request to the master on which the task is submitted via a queue obtained with {@link
* ClusterService#getTaskQueue}, which has an associated priority. Submitted tasks have an associated
* timeout. Tasks are processed in priority order, so a flood of higher-priority tasks can starve lower-priority ones from running.
* Therefore, avoid priorities other than {@link Priority#NORMAL} where possible. Tasks associated with client actions should typically have
* a timeout, or otherwise be sensitive to client cancellations, to avoid surprises caused by the execution of stale tasks long after they
Expand Down
Loading