Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly support task batches in ReservedStateUpdateTaskExecutor #116353

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SimpleBatchedExecutor;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.Tuple;

/**
* Reserved cluster state update task executor
*/
public class ReservedStateUpdateTaskExecutor extends SimpleBatchedExecutor<ReservedStateUpdateTask, Void> {
public class ReservedStateUpdateTaskExecutor implements ClusterStateTaskExecutor<ReservedStateUpdateTask> {

private static final Logger logger = LogManager.getLogger(ReservedStateUpdateTaskExecutor.class);

Expand All @@ -34,17 +33,27 @@ public ReservedStateUpdateTaskExecutor(RerouteService rerouteService) {
}

@Override
public Tuple<ClusterState, Void> executeTask(ReservedStateUpdateTask task, ClusterState clusterState) {
return Tuple.tuple(task.execute(clusterState), null);
public final ClusterState execute(BatchExecutionContext<ReservedStateUpdateTask> batchExecutionContext) throws Exception {
var initState = batchExecutionContext.initialState();
var taskContexts = batchExecutionContext.taskContexts();
if (taskContexts.isEmpty()) {
return initState;
}
// Only the last update is relevant; the others can be skipped
var taskContext = taskContexts.get(taskContexts.size() - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure the tasks are submitted in order, such that the last one in the list is always the best one to apply? Deserves a comment and an assertion I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... is there some other way to define the "order" of tasks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but I understood that file-based settings have some notion of version (carried in the file) to ensure we're not overriding fresh settings with stale ones. I believe it relates to task.stateChunk.metadata().version(), although I haven't tracked down the details.

Copy link
Contributor

@n1v0lg n1v0lg Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, task.stateChunk.metadata().version() will give you the version of the file-settings file this update task was created for.

This is then compared against the last-processed version stored in metadata.

We should pick the update task with the highest version as @DaveCTurner suggests, unless someone from core infra confirms that the tasks are already guaranteed to be "well-ordered".

ClusterState clusterState = initState;
try (var ignored = taskContext.captureResponseHeaders()) {
var task = taskContext.getTask();
clusterState = task.execute(clusterState);
taskContext.success(() -> task.listener().onResponse(ActionResponse.Empty.INSTANCE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to mark all the other tasks as having succeeded too. There's an assertion to check this in the MasterService, so I guess we're missing a test that runs this executor in a sufficiently realistic setup.

Copy link
Contributor Author

@prdoyle prdoyle Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Indeed. I'm not a huge fan of mocks TBH because it's so hard to tell what you're actually testing.</rant>)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think you could perhaps test this with an ESSingleNodeTestCase to get something more realistic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it'd be good to have a test that tries to create the same repository twice in a batch in order to demonstrate the NPE is fixed.

Copy link
Contributor

@n1v0lg n1v0lg Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly RepositoriesFileSettingsIT is a good place or FileSettingsServiceIT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud on how this might look in an IT:

In RepositoriesFileSettingsIT, we could get a FileSettingsService instance via internalCluster().getInstance(FileSettingsService.class, masterNode) and manually invoke processFileOnServiceStart() multiple times concurrently. That would result in multiple concurrent and identical state update tasks, though I'm not sure it would suffice to guarantee that they'd all be part of the same batch.

Based on the MasterService#createTaskQueue Javadoc, "Tasks submitted to the same queue (while the master service is otherwise busy) will be batched together into a single cluster state update." -- I'm not sure if there's a simpler way in tests to force batching, aside from ensuring the master is busy with another task when we call processFileOnServiceStart.

Copy link
Contributor

@DaveCTurner DaveCTurner Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an ESIntegTestCase or an ESSingleNodeTestCase you can force batching by first blocking the master service and then doing whatever is needed to submit the tasks you want to be batched, then unblocking the master service again:

        masterNodeClusterService.createTaskQueue("block", Priority.NORMAL, batchExecutionContext -> {
            safeAwait(barrier);
            safeAwait(barrier);
            batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {}));
            return batchExecutionContext.initialState();
        }).submitTask("block", ESTestCase::fail, null);

Copy link
Contributor Author

@prdoyle prdoyle Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update - I think I have confirmed everything below now. No need to read it in detail.


Hi @DaveCTurner - I've finally come back to this. I'm trying to understand your advice here. Looks like the code above is similar to this code:

masterNodeClusterService.createTaskQueue("block", Priority.NORMAL, batchExecutionContext -> {
    safeAwait(barrier);
    safeAwait(barrier);
    batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {}));
    return batchExecutionContext.initialState();
}).submitTask("block", ESTestCase::fail, null);

This seems to be a trick to block masterNodeClusterService until the other test code encounters the barrier twice. You put whatever code you want to run during the block between the two barriers and it's guaranteed to run after this lamba begins but before it does the forEach.

As far as I can tell, the rest is just "legalese" required to make this trick work, and I can pretty much ignore it.

Am I close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok new question... I'm not sure how to trigger this condition in the integration test. I tried bad JSON but that fails in a different way: it never seems to make it to ReservedStateUpdateTaskExecutor because it dies earlier, parsing the JSON.

I wonder how I can feed this valid JSON that parses, but then somehow the ReservedStateUpdateTask still fails? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Welp I guess I can at least test the happy path. Submit N changes and verify that the last one takes effect.

} catch (Exception e) {
taskContext.onFailure(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the last task fails, and there are earlier tasks in the list, should we try them instead of completely giving up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to try multiple tasks? I thought the motivation for this PR was to avoid doing so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task failed then the cluster state remains unchanged, so yes in that case it's safe to try a different one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're effectively trying them in reverse until one succeeds.

Makes sense I guess, though it seems a little odd. I wonder if fixing it so they can apply in order would be more going "with the grain".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's also a possibility but to achieve that you'd have to go through all the ClusterStateTaskExecutor implementations to which the reserved-state subsystem delegates checking for batchability problems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every failed task will also kick off a separate task to update error state in reserved cluster state metadata, and these error state updates are also subject to the same version checks. So if we have three tasks with versions (v3 highest, v1 lowest):

(t3, v3), (t2, v2), (t1, v1)

We'd apply t3 first. Suppose it fails. Then error state is marked with version v3. When we process t2, if it succeeds, we clear the error regardless of version, so we're fine. But if it fails, I think we will skip updating the error state because v3 > v2.

The code for error handling is in several places, but see here for example

I wonder if we should simply try to apply the highest version task for now, then follow up with potential improvements (like trying older tasks, or instead trying the queued tasks in reverse order).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not my understanding. If we fail to update state t3 then we spawn a separate task (or batch thereof) that will run after the current batch is complete, so when we process t2 there's no error state to clear.

Copy link
Contributor

@n1v0lg n1v0lg Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are very likely right, but let me just write out what the flow looks like:

  1. we submit the state update task here
  2. the update task takes an error-handler (ReservedClusterStateService.this::updateErrorState)
  3. when invoked, that error handlers submits an error state update task (to a separate executor, with a separate queue)
  4. Inside the cluster-state update task execute method, if we run into errors, we trigger the error handler and throw an exception

Given that error update tasks go to a separate executor and task queue, is it still true that they're guaranteed to run after we've processed tasks in the non-error state update queue?

That is, can't it happen that we finish task t3, which kicks off error task t3_error, which then completes before we get to task t2? I'm probably just misunderstanding what order guarantees we have for cluster-state update tasks stored in separate queues and handled by separate executors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We're already processing a fixed batch of tasks here, there's no way to add more tasks to it. Even if the error-update tasks went to the same executor, they'd be added to a new batch of tasks, allowing us to complete the processing of the current batch of tasks undisturbed.

@DaveCTurner thanks for clarifying! The piece I was missing conceptually is that the execution of batches is also ordered so while we're processing one batch, we won't start processing another batch in parallel.

@prdoyle all good to ignore my comments 👍

}
return clusterState;
}

@Override
public void taskSucceeded(ReservedStateUpdateTask task, Void unused) {
task.listener().onResponse(ActionResponse.Empty.INSTANCE);
}

@Override
public void clusterStatePublished() {
public final void clusterStatePublished(ClusterState newClusterState) {
rerouteService.reroute(
"reroute after saving and reserving part of the cluster state",
Priority.NORMAL,
Expand All @@ -54,4 +63,5 @@ public void clusterStatePublished() {
)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,39 @@ public void success(Runnable onPublicationSuccess) {
verify(rerouteService, times(1)).reroute(anyString(), any(), any());
}

public void testLastUpdateIsApplied() throws Exception {
ClusterState state0 = ClusterState.builder(new ClusterName("test")).version(1000).build();
ClusterState state1 = ClusterState.builder(new ClusterName("test")).version(1001).build();
ClusterState state2 = ClusterState.builder(new ClusterName("test")).version(1002).build();
ReservedStateUpdateTask realTask = new ReservedStateUpdateTask(
"test",
null,
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
Map.of(),
Set.of(),
errorState -> fail("Unexpected error"),
ActionListener.noop()
);
ReservedStateUpdateTask task1 = spy(realTask);
doReturn(state1).when(task1).execute(any());
ReservedStateUpdateTask task2 = spy(realTask);
doReturn(state2).when(task2).execute(any());
RerouteService rerouteService = mock(RerouteService.class);
ReservedStateUpdateTaskExecutor taskExecutor = new ReservedStateUpdateTaskExecutor(rerouteService);
ClusterState newState = taskExecutor.execute(
new ClusterStateTaskExecutor.BatchExecutionContext<>(
state0,
List.of(new TestTaskContext<>(task1), new TestTaskContext<>(task2)),
() -> null
)
);

assertThat("State should be the final state", newState, sameInstance(state2));
// Only process the final task; the intermediate ones can be skipped
verify(task1, times(0)).execute(any());
verify(task2, times(1)).execute(any());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I verified that this fails without the fix.

}

public void testUpdateErrorState() {
ClusterService clusterService = mock(ClusterService.class);
ClusterState state = ClusterState.builder(new ClusterName("test")).build();
Expand Down Expand Up @@ -400,7 +433,7 @@ public TransformState transform(Object source, TransformState prevState) throws
var chunk = new ReservedStateChunk(Map.of("one", "two", "maker", "three"), new ReservedStateVersion(2L, BuildVersion.current()));
var orderedHandlers = List.of(exceptionThrower.name(), newStateMaker.name());

// We submit a task with two handler, one will cause an exception, the other will create a new state.
// We submit a task with two handlers, one will cause an exception, the other will create a new state.
// When we fail to update the metadata because of version, we ensure that the returned state is equal to the
// original state by pointer reference to avoid cluster state update task to run.
ReservedStateUpdateTask task = new ReservedStateUpdateTask(
Expand Down