-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor GatewayService #99994
Refactor GatewayService #99994
Conversation
This PR refactors GatewayService with the goal to make it easier to add new features. Resolves: elastic#89310
Pinging @elastic/es-distributed (Team:Distributed) |
@DaveCTurner This is my first attempt to "clean up" the GatewayService class. I tried to work with your suggestion to use
I am open to any suggestions and comments. Once we are happy with the overall approach. I also plan to add a few more tests. Thanks! |
} | ||
|
||
// This node is ready to schedule state recovery | ||
thisRecoveryPlanned.addListener(new ActionListener<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to use andThen
but it does not expose exception to the consumer while we need the exception here to handle timeout.
Certainly nicer already yes. Just playing around with ideas here, but I'd like to try having an object which represents roughly "a pending state recovery in master term t". The master term always increases, and if we need to retry then that'll always be in a later term so we'll get a new one of these pending-state-recovery objects. Does that make sense? Edit to add: don't try too hard to preserve the exact semantics of the timeout as it stands today - it would probably fit the implementation I suggested better if we started a new timeout when a new master is elected. That's ok IMO (indeed I think I'd prefer it). |
if (pendingStateRecovery.term < currentTerm) { | ||
// Always start a new state recovery if the master term changes | ||
// If there is a previous one still waiting, both will run but at most one of them will | ||
// actually make changes to cluster state | ||
pendingStateRecovery = new PendingStateRecovery(currentTerm); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the code always schedule a new state recovery if it sees a new term (and the cluster state does need recovery). Not sure if this is what you mean by
I suggested better if we started a new timeout when a new master is elected
This code does not try to cancel the previous pending state recovery since it can only be a best effort. The task may have already been submitted when we try to cancel it. Please let me know if you disagree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep that's the direction of which I was thinking indeed. Looking better.
I think I'd like the RecoverStateUpdateTask
to remember the corresponding term with which it was registered, and verify that the term hasn't changed before it does anything. That way we don't need to worry about spurious recoveries from older terms that are no longer correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. I added the term check.
I pushed 571db2d as an attempt to implement this idea. Please let me know whether it makes sense. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the direction. Left a few more suggestions.
|
||
if (state.nodes().isLocalNodeElectedMaster() == false) { | ||
if (nodes.getMasterNodeId() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is already covered by the check on nodes.isLocalNodeElectedMaster()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep I kept it because it has a separate logging message. I have now moved it inside isLocalNodeElectedMaster
check to retain the logging message.
if (state.nodes().getMasterNodeId() == null) { | ||
logger.debug("not recovering from gateway, no master elected yet"); | ||
} else if (recoverAfterDataNodes != -1 && nodes.getDataNodes().size() < recoverAfterDataNodes) { | ||
if (recoverAfterDataNodes != -1 && nodes.getDataNodes().size() < recoverAfterDataNodes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move this condition into the per-term check too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the check inside the new PendingStateRecovery
class before scheduling the recovery. I assume this is what you mean by "per-term", it is not about checking it inside ClusterStateUpdateTask#execute
. The former allows us to keep the same semantics as of today, i.e. no action at all until the required number of data nodes is met.
if (pendingStateRecovery.term < currentTerm) { | ||
// Always start a new state recovery if the master term changes | ||
// If there is a previous one still waiting, both will run but at most one of them will | ||
// actually make changes to cluster state | ||
pendingStateRecovery = new PendingStateRecovery(currentTerm); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep that's the direction of which I was thinking indeed. Looking better.
I think I'd like the RecoverStateUpdateTask
to remember the corresponding term with which it was registered, and verify that the term hasn't changed before it does anything. That way we don't need to worry about spurious recoveries from older terms that are no longer correct.
} | ||
|
||
void maybeStart(int dataNodeSize) { | ||
final SubscribableListener<Void> thisRecoveryPlanned; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On reflection doing this with a SubscribableListener
actually seems more awkward than just sticking with the original threadPool.schedule
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I changed it to use just threadPool.schedule
. It might make SubscribableListener
more suitable for this use case if it does not have to use Exception to signal timeout.
I made a rather large change based on last round's feedback. The code no longer needs synchronization. It now uses an AtomicReference to hold the current pending recovery. The task still cleans itself up when finishes either successfully and exceptionally. I personally think the code looks simpler with the latest change. |
Right yes this is looking even better. But we are still doing some kinda complex state management within each term via |
I pushed a sketch of an idea to main...DaveCTurner:elasticsearch:2023/10/03/GatewayService-ideas (not tested or even necessarily fully-formed, but maybe it is useful) |
runRecoveryImmediately(); | ||
} else if (recoverAfterTime == null) { | ||
logger.debug("performing state recovery of term [{}], no delay time is configured", expectedTerm); | ||
runRecoveryImmediately(); | ||
} else { | ||
if (scheduledRecovery == null) { | ||
logger.info( | ||
"delaying initial state recovery for [{}] of term [{}]. expecting [{}] data nodes, but only have [{}]", | ||
recoverAfterTime, | ||
expectedTerm, | ||
expectedDataNodes, | ||
currentDataNodeSize | ||
); | ||
scheduledRecovery = threadPool.schedule(getScheduleTask(), recoverAfterTime, threadPool.generic()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DaveCTurner There are still some complexities with this block of code and other related areas.
- This class has no state to remember that it has attempt to recover immediately. Therefore if the cluster can recover immediately, it can potentially submit multiple cluster update tasks. Is it a problem?
- For the scheduled case, we can mostly avoid multiple schedules by checking whether
scheduledRecovery
isnull
. There can still be edge cases where we can schedule more than once due to racing between checkingscheduledRecovery
and reset it back tonull
. If submitting multiple update tasks isn't an issue, we may also chose to not check it all and just always schedule? - Because we need to reset
scheduledRecovery
back to null in the scheduled runnable, it needs to be madevolatile
as well.
Do we need to address the 1st point? If so, it seems we need another state variable for it. I forgot to mention it during the sync but this was one of the original complexity. Also because the ClusterStateUpdateTask
may not run inside execute
due to dataNodeSize dropping again, the state needs to be reset from within the task which brings back the need of passing a "runAfter" into the task. To simplify things, I think we don't want to check dataNodeSize again inside the task. It's an edge case anyway and dropping it makes things simpler. But we will still need some other state management if we want to address the 1st point. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
Good point. I think we should only ever submit one cluster state update task per term, so we ought to track this with a flag within the per-term state.
-
I would not expect any races here, or rather I think if we keep track of whether we've submitted the cluster state update task then that solves those races.
-
Good point, although since we only do that when actually submitting the task again I think the solution is to make this submission a once-only thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed 25bbeba to add a new state variable (AtomicBoolean
) solved multiple issues that I had. Thanks!
Please let me know if the main code looks good to you. I'll proceed to add some more tests if you are happy with the main code changes. Please also let me know if you have any ideas for what kinda tests we might need. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have now added multiple tests to cover different scenarios in bf8f21a
Now the whole thing looks ready to me.
@elasticmachine run elasticsearch-ci/part-1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation looks good now, but I do not like the mocks-heavy tests you've added. Tests like this are very fragile and tend to obstruct other refactorings in the same area. There's no need for mocks here, we can create realistic versions of all the dependencies in ways that let us verify that this component really does run its tasks at the right time and not otherwise through their stable APIs.
I pushed a rough commit which shows how to do that. It doesn't cover everything and probably deserves some more abstraction to split it into multiple test cases but hopefully it's a useful start.
This pattern of creating a ClusterService
that uses a deterministic threadpool with fake clock is used in a few other places too:
org.elasticsearch.cluster.InternalClusterInfoServiceSchedulingTests#testScheduling
org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocatorTests#testAllocate
org.elasticsearch.snapshots.SnapshotResiliencyTests.TestClusterNodes.TestClusterNode#TestClusterNode
They're not all quite the same but it feels like this should be something we can move into ClusterServiceUtils
.
* @return The resulting cluster state after executing all the tasks. If {@code batchExecutionContext.initialState()} is returned then | ||
* no update is published. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The real change here is just adding the missing @
sign. The other change is due to cascading line wrap.
deterministicTaskQueue.scheduleAt( | ||
initialTimeInMillis + elapsed.millis(), | ||
() -> setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(recoverAfterNodes - 1), null) | ||
); | ||
deterministicTaskQueue.advanceTime(); | ||
deterministicTaskQueue.runAllRunnableTasks(); | ||
|
||
// The 2nd scheduled recovery when data nodes are above recoverAfterDataNodes again | ||
deterministicTaskQueue.scheduleAt( | ||
initialTimeInMillis + elapsed.millis() * 2, | ||
() -> setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(recoverAfterNodes), null) | ||
); | ||
deterministicTaskQueue.advanceTime(); | ||
deterministicTaskQueue.runAllRunnableTasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These advanceTime
then runAllRunnableTasks()
calls can be simplified if we have some method like runToTime(long)
that runs all tasks up to the specified time. I can have a follow-up PR to add this method if you think it is useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep I can see value in that, please do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Yang much nicer IMO. I left one small request about the tests, it's not essential but would be nice if you can find a way to do it without masses of duplication.
// The 1st scheduled recovery may or may not run depend on what happens to the cluster next | ||
final int caseNo = randomIntBetween(0, 2); | ||
switch (caseNo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we test these cases as separate tests (so they all run every time)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's fair. I pushed 19f89ba
It is no kidding that this issue is a great source for learning. I spent quite sometime to read through the tests you proposed and related production code. It felt productive. Thanks! The PR is now updated accordingly. I re-organized the tests for different test purposes and chose to advace the time manually in some cases for testing cancellation. But otherwise the tests are mostly what you have suggested. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more small observation about the tests (I could be persuaded that it's not a blocker)
final var settings = settingsBuilder.build(); | ||
final var clusterSettings = createBuiltInClusterSettings(settings); | ||
|
||
clusterService = new ClusterService( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm it seems a bit odd to set this field here. Could we avoid storing it in a field and instead create the ClusterService
in its own method, and then create the GatewayService
from that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I pushed b5da2d1
An easier alternative is to access ClusterService
from GatewayService
. But it requires making the GatewayService.clusterService
field package private (or add a package private accessor method). Though this pattern is used in many places for testing, I am not sure whether you'd like it. So I did not take this approach. Please let me know whether this would work for you since it might be useful in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR refactors GatewayService with the goal to make it easier to add new features.
Resolves: #89310