Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly support task batches in ReservedStateUpdateTaskExecutor #116353

Closed
wants to merge 16 commits into from

Conversation

prdoyle
Copy link
Contributor

@prdoyle prdoyle commented Nov 6, 2024

ReservedStateUpdateTaskExecutor had been extending SimpleBatchedExecutor and looping through every state update. Not only was this unnecessary, since only the final state matters; it was also leading to NullPointerException in certain cases.

Here, we simplify the processing to execute only the last task.

@prdoyle prdoyle added >non-issue :Core/Infra/Core Core issues without another label labels Nov 6, 2024
@prdoyle prdoyle requested a review from DaveCTurner November 6, 2024 19:22
@prdoyle prdoyle self-assigned this Nov 6, 2024
assertThat("State should be the final state", newState, sameInstance(state2));
// Only process the final task; the intermediate ones can be skipped
verify(task1, times(0)).execute(any());
verify(task2, times(1)).execute(any());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I verified that this fails without the fix.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the basic idea yeah, I left a handful of suggestions tho.

return initState;
}
// Only the last update is relevant; the others can be skipped
var taskContext = taskContexts.get(taskContexts.size() - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure the tasks are submitted in order, such that the last one in the list is always the best one to apply? Deserves a comment and an assertion I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... is there some other way to define the "order" of tasks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but I understood that file-based settings have some notion of version (carried in the file) to ensure we're not overriding fresh settings with stale ones. I believe it relates to task.stateChunk.metadata().version(), although I haven't tracked down the details.

Copy link
Contributor

@n1v0lg n1v0lg Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, task.stateChunk.metadata().version() will give you the version of the file-settings file this update task was created for.

This is then compared against the last-processed version stored in metadata.

We should pick the update task with the highest version as @DaveCTurner suggests, unless someone from core infra confirms that the tasks are already guaranteed to be "well-ordered".

try (var ignored = taskContext.captureResponseHeaders()) {
var task = taskContext.getTask();
clusterState = task.execute(clusterState);
taskContext.success(() -> task.listener().onResponse(ActionResponse.Empty.INSTANCE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to mark all the other tasks as having succeeded too. There's an assertion to check this in the MasterService, so I guess we're missing a test that runs this executor in a sufficiently realistic setup.

Copy link
Contributor Author

@prdoyle prdoyle Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Indeed. I'm not a huge fan of mocks TBH because it's so hard to tell what you're actually testing.</rant>)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think you could perhaps test this with an ESSingleNodeTestCase to get something more realistic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it'd be good to have a test that tries to create the same repository twice in a batch in order to demonstrate the NPE is fixed.

Copy link
Contributor

@n1v0lg n1v0lg Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly RepositoriesFileSettingsIT is a good place or FileSettingsServiceIT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud on how this might look in an IT:

In RepositoriesFileSettingsIT, we could get a FileSettingsService instance via internalCluster().getInstance(FileSettingsService.class, masterNode) and manually invoke processFileOnServiceStart() multiple times concurrently. That would result in multiple concurrent and identical state update tasks, though I'm not sure it would suffice to guarantee that they'd all be part of the same batch.

Based on the MasterService#createTaskQueue Javadoc, "Tasks submitted to the same queue (while the master service is otherwise busy) will be batched together into a single cluster state update." -- I'm not sure if there's a simpler way in tests to force batching, aside from ensuring the master is busy with another task when we call processFileOnServiceStart.

Copy link
Contributor

@DaveCTurner DaveCTurner Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an ESIntegTestCase or an ESSingleNodeTestCase you can force batching by first blocking the master service and then doing whatever is needed to submit the tasks you want to be batched, then unblocking the master service again:

        masterNodeClusterService.createTaskQueue("block", Priority.NORMAL, batchExecutionContext -> {
            safeAwait(barrier);
            safeAwait(barrier);
            batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {}));
            return batchExecutionContext.initialState();
        }).submitTask("block", ESTestCase::fail, null);

Copy link
Contributor Author

@prdoyle prdoyle Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update - I think I have confirmed everything below now. No need to read it in detail.


Hi @DaveCTurner - I've finally come back to this. I'm trying to understand your advice here. Looks like the code above is similar to this code:

masterNodeClusterService.createTaskQueue("block", Priority.NORMAL, batchExecutionContext -> {
    safeAwait(barrier);
    safeAwait(barrier);
    batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {}));
    return batchExecutionContext.initialState();
}).submitTask("block", ESTestCase::fail, null);

This seems to be a trick to block masterNodeClusterService until the other test code encounters the barrier twice. You put whatever code you want to run during the block between the two barriers and it's guaranteed to run after this lamba begins but before it does the forEach.

As far as I can tell, the rest is just "legalese" required to make this trick work, and I can pretty much ignore it.

Am I close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok new question... I'm not sure how to trigger this condition in the integration test. I tried bad JSON but that fails in a different way: it never seems to make it to ReservedStateUpdateTaskExecutor because it dies earlier, parsing the JSON.

I wonder how I can feed this valid JSON that parses, but then somehow the ReservedStateUpdateTask still fails? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Welp I guess I can at least test the happy path. Submit N changes and verify that the last one takes effect.

clusterState = task.execute(clusterState);
taskContext.success(() -> task.listener().onResponse(ActionResponse.Empty.INSTANCE));
} catch (Exception e) {
taskContext.onFailure(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the last task fails, and there are earlier tasks in the list, should we try them instead of completely giving up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to try multiple tasks? I thought the motivation for this PR was to avoid doing so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task failed then the cluster state remains unchanged, so yes in that case it's safe to try a different one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're effectively trying them in reverse until one succeeds.

Makes sense I guess, though it seems a little odd. I wonder if fixing it so they can apply in order would be more going "with the grain".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's also a possibility but to achieve that you'd have to go through all the ClusterStateTaskExecutor implementations to which the reserved-state subsystem delegates checking for batchability problems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every failed task will also kick off a separate task to update error state in reserved cluster state metadata, and these error state updates are also subject to the same version checks. So if we have three tasks with versions (v3 highest, v1 lowest):

(t3, v3), (t2, v2), (t1, v1)

We'd apply t3 first. Suppose it fails. Then error state is marked with version v3. When we process t2, if it succeeds, we clear the error regardless of version, so we're fine. But if it fails, I think we will skip updating the error state because v3 > v2.

The code for error handling is in several places, but see here for example

I wonder if we should simply try to apply the highest version task for now, then follow up with potential improvements (like trying older tasks, or instead trying the queued tasks in reverse order).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not my understanding. If we fail to update state t3 then we spawn a separate task (or batch thereof) that will run after the current batch is complete, so when we process t2 there's no error state to clear.

Copy link
Contributor

@n1v0lg n1v0lg Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are very likely right, but let me just write out what the flow looks like:

  1. we submit the state update task here
  2. the update task takes an error-handler (ReservedClusterStateService.this::updateErrorState)
  3. when invoked, that error handlers submits an error state update task (to a separate executor, with a separate queue)
  4. Inside the cluster-state update task execute method, if we run into errors, we trigger the error handler and throw an exception

Given that error update tasks go to a separate executor and task queue, is it still true that they're guaranteed to run after we've processed tasks in the non-error state update queue?

That is, can't it happen that we finish task t3, which kicks off error task t3_error, which then completes before we get to task t2? I'm probably just misunderstanding what order guarantees we have for cluster-state update tasks stored in separate queues and handled by separate executors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We're already processing a fixed batch of tasks here, there's no way to add more tasks to it. Even if the error-update tasks went to the same executor, they'd be added to a new batch of tasks, allowing us to complete the processing of the current batch of tasks undisturbed.

@DaveCTurner thanks for clarifying! The piece I was missing conceptually is that the execution of batches is also ordered so while we're processing one batch, we won't start processing another batch in parallel.

@prdoyle all good to ignore my comments 👍

@DaveCTurner
Copy link
Contributor

Given that error update tasks go to a separate executor and task queue, is it still true that they're guaranteed to run after we've processed tasks in the non-error state update queue?

Yes. We're already processing a fixed batch of tasks here, there's no way to add more tasks to it. Even if the error-update tasks went to the same executor, they'd be added to a new batch of tasks, allowing us to complete the processing of the current batch of tasks undisturbed.

@prdoyle
Copy link
Contributor Author

prdoyle commented Nov 7, 2024

Alright I think I can do all of the above, though I'd gratefully accept some help with the tests. 😅

Comment on lines 415 to 416
writeJSONFile(masterNode, testJSON, versionCounter, logger); // Valid but skipped
writeJSONFile(masterNode, testJSON43mb, versionCounter, logger); // The last valid setting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we sometimes try 3, I have a hunch that might cover some logic that the 2 case doesn't.

Could we give them different version values? And not always apply them in ascending order?

Could we wait for the master to pick up one file before writing the next?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good ideas!

Waiting for the master to pick up one file before writing the next would exercise existing functionality, rather than my changes, but it's obviously an important case to test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected behaviour when the versions are not ascending?

Should ReservedStateUpdateTaskExecutor be attempting them in descending version order rather than just iterating backward through the batch list?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The guiding principle is that the expected outcome should be the same whether the tasks are executed in a single batch or one-at-a-time. So yes AIUI I think we should attempt them in descending version order.

Copy link
Contributor Author

@prdoyle prdoyle Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah of course, that makes perfect sense. I'll make that change (on the assumption that updates with lower version numbers are ignored if processed one-per-batch).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I found this javadoc for TaskContext:

A task to be executed, along with callbacks for the executor to record the outcome of this task's execution. The executor must call exactly one of these methods for every task in its batch.

It seems we can't simply skip the intervening tasks after all?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to call success (or onFailure) to record the outcome of the task, but that doesn't mean you have to actually do anything else with the task. According to the guiding principle, skipping a task because we processed a newer one counts as success right?

Copy link
Contributor Author

@prdoyle prdoyle Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright makes sense. So perhaps:

  1. Scan the tasks to identify the one that would have run last, had the tasks all been run individually instead of in a batch, taking into consideration task ordering, version numbers, and the ReservedStateVersionCheck mode.
  2. Try that task. If it succeeds, call success on that one and all prior ones, and possibly on all following ones?
  3. If it fails, call onFailure, remove it from the candidates list, and loop back to step 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that sounds right although maybe it'd be simpler just to sort the list of tasks in the right order ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was afraid I couldn't determine the right order before I know whether the first fails. But now that I think about it, if the first task succeeds, the rest are irrelevant, so I could sort them on the assumption that each one fails.

@prdoyle
Copy link
Contributor Author

prdoyle commented Nov 29, 2024

Latest problem: if I overwrite the file multiple times in succession, there's no guarantee that the watcher registered by Path.register will notice the individual updates. That means that writing the file multiple times is not a reliable way to get a batch containing multiple updates.

@DaveCTurner
Copy link
Contributor

That's why I suggested waiting for the master to pick up each change and enqueue the relevant task before overwriting the file again.

@prdoyle
Copy link
Contributor Author

prdoyle commented Nov 29, 2024

Oh I see what you meant. I thought you meant they'd end up in different batches.

This will be O(n log n) for long lists, while the original way will be O(mn)
where m is the number of failing tasks.

None of this matters much for performance because n is expected to be small;
but it does simplify the code and make it smell less imperative.
@prdoyle
Copy link
Contributor Author

prdoyle commented Dec 2, 2024

Ok, I have implemented the precedence logic we discussed, used sorting over the tasks, and added unit tests to ensure all the tasks are marked completed.

All that remains, I think, is to change the IT so it forces multiple file changes to appear in the same batch, in order to prove the original NPE (which motivated this PR) is gone.

@DaveCTurner - do you happen to have any tips for how to achieve that? I assume I can use another barrier in a strategic spot to make the file watcher and unit test wait for each other?

@prdoyle prdoyle marked this pull request as ready for review December 2, 2024 21:50
@elasticsearchmachine elasticsearchmachine added the Team:Core/Infra Meta label for core/infra team label Dec 2, 2024
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-core-infra (Team:Core/Infra)

@DaveCTurner
Copy link
Contributor

DaveCTurner commented Dec 2, 2024

do you happen to have any tips for how to achieve that

Assuming there's no easy way to wait for the file watcher to run, I think I'd busy-wait, checking org.elasticsearch.cluster.service.MasterService#pendingTasks repeatedly until I see the first task, then update the file, then busy-wait on org.elasticsearch.cluster.service.MasterService#pendingTasks again until I see the second task, and so on.

See e.g. org.elasticsearch.action.support.AutoCreateIndexIT#testBatchingWithDeprecationWarnings

@prdoyle
Copy link
Contributor Author

prdoyle commented Dec 3, 2024

I might need a way to wait for the watcher anyway. The way I've written the IT tests, there's no guarantee all my changes have taken effect before the assertions run. It sometimes results in this:

FileSettingsServiceIT > testLastSettingsInBatchApplied FAILED
    java.lang.AssertionError: 
    Expected: "43mb"
         but: was "50mb"
        at __randomizedtesting.SeedInfo.seed([76FB2677EDF08BC8:E940E5E7DD70379E]:0)
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
        at org.elasticsearch.test.ESTestCase.assertThat(ESTestCase.java:2581)
        at org.elasticsearch.reservedstate.service.FileSettingsServiceIT.assertExpectedRecoveryBytesSettingAndVersion(FileSettingsServiceIT.java:226)
        at org.elasticsearch.reservedstate.service.FileSettingsServiceIT.assertClusterStateSaveOK(FileSettingsServiceIT.java:218)
        at org.elasticsearch.reservedstate.service.FileSettingsServiceIT.testLastSettingsInBatchApplied(FileSettingsServiceIT.java:418)

@prdoyle
Copy link
Contributor Author

prdoyle commented Dec 3, 2024

Trying to wait for the watcher ended up deadlocking due to this line, where the watcher waits for the submitted task to complete before processing the next file.

This makes me wonder how it's even possible to have a batch containing more than one file update. It seems like FileSettingsService blocks the watcher thread until the file processing is finished.

@prdoyle
Copy link
Contributor Author

prdoyle commented Dec 3, 2024

...but then if that is true, how do I explain my previous comment... 🤔

Update: I think that's a separate issue. That failure is because the test case does not wait for the updates to be processed, which is different from the updates being processed one-per-batch.

@prdoyle
Copy link
Contributor Author

prdoyle commented Dec 3, 2024

@DaveCTurner - can I get a sanity check here? I think this whole PR might be unnecessary, and the problem could be deeper.

File changes are processed on the watcherThread, of which there can be at most one per node. (Caveat: more on this in a moment.) When the file changes, that calls (indirectly) FileSettingsService.processFileChanges, which:

  1. makes its way to ReservedClusterStateService.process, which creates the update task and submits it to the updateTaskQueue; and then
  2. calls completion.get() to wait for the task to finish.

This makes it impossible for a second file update task to be created until the first is finished, which implies that it's impossible for two update tasks to be in the same batch.

This would imply that what we have observed is impossible, unless more than one thread submits update tasks.

I could see this happening in two scenarios:

  1. processFileOnServiceStart races with processSettingsAndNotifyListeners, or
  2. we accidentally spawn a second watcher thread.

It seems to be that these are both undesirable, and if we fix those, we no longer need to worry about multiple updates per batch.

The first could be fixed with appropriate synchronization / coordination.

The second could be fixed by using a singleton executor instead of directly using new Thread(...).start().

What do you think?

@DaveCTurner
Copy link
Contributor

Ah interesting, I hadn't looked in much detail at the calling code, I didn't realise it was doing stuff with bare Thread calls and interrupts and so on. It's not really the Elasticsearch way, tho in this case it's only one extra thread so scalability isn't a huge concern.

So yes another possibility would be to ensure that there's only ever at most one of these tasks in the master queue at once.

Note that today's implementation doesn't guarantee this, because the wait for the task to complete may be interrupted if the node stops being master. It's then possible for the node to become master again straight away, start up a new watcher thread and enqueue another task all while the old task is still in the queue. Instead we must always wait for the task's listener to complete.

@prdoyle
Copy link
Contributor Author

prdoyle commented Dec 4, 2024

Thanks @DaveCTurner. What would be the Elasticsearch Way to implement this? Is it a good idea to change it?

On other projects, I've used scheduleWithFixedDelay on a singleton ScheduledExecutorService, which has the side benefit that if the thread exits for any reason, it will wait a bit and then restart automatically. I suspect we'd prefer something more ThreadPool oriented?

@rjernst
Copy link
Member

rjernst commented Dec 4, 2024

What would be the Elasticsearch Way to implement this?

Normally we would use a threadpool. But we don't want that for file settings because we don't want to be queued up behind other system operations. Essentially this is like having a dedicated fixed size threadpool of size 1.

@prdoyle
Copy link
Contributor Author

prdoyle commented Dec 4, 2024

Ok I think I'd rather actually use a bina fide dedicated thread pool of size 1 then, so it doesn't accidentally end up with two threads causing otherwise impossible race conditions.

@rjernst
Copy link
Member

rjernst commented Dec 4, 2024

I'm still not sure we should use ThreadPool. That comes with baggage (eg, size configurable by user through elasticsearch.yml). This is an implementation detail. We should make it robust, but IMO we should do so without ThreadPool.

@prdoyle
Copy link
Contributor Author

prdoyle commented Dec 4, 2024

Ok makes sense. I really meant lowercase "thread pool" as a general concept. So if you prefer to avoid ThreadPool here, it could just be some sort of Executor.

@DaveCTurner
Copy link
Contributor

Yes, we do create a dedicated executor in other places - see e.g. org.elasticsearch.cluster.service.MasterService#createThreadPoolExecutor.

@prdoyle
Copy link
Contributor Author

prdoyle commented Dec 5, 2024

@DaveCTurner - what if I change ReservedClusterStateService to create a new task queue for each ReservedStateUpdateTask instead of using the same long-lived queue? That would force each task into its own batch I think?

@prdoyle prdoyle mentioned this pull request Dec 5, 2024
@prdoyle
Copy link
Contributor Author

prdoyle commented Dec 5, 2024

New PR #118112 that aims to prevent batching rather than cope with it. FYI @DaveCTurner @rjernst

@prdoyle
Copy link
Contributor Author

prdoyle commented Dec 10, 2024

Superseded by #118112

@prdoyle prdoyle closed this Dec 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Core/Infra/Core Core issues without another label >non-issue Team:Core/Infra Meta label for core/infra team v9.0.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants