-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Zen2: Add leader-side join handling logic #33013
Conversation
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main questions at this point are around the split between JoinHelper
and Coordinator
, and the use of a proper threadpool in the tests.
|
||
private final Optional<Join> optionalJoin; | ||
|
||
public JoinRequest(DiscoveryNode sourceNode, Optional<Join> optionalJoin) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If optionalJoin
is present then the source node is duplicated. I think this is the neatest way to do this, but can we assert they're the same in these constructors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 2757c07
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
|
||
@TestLogging("org.elasticsearch.cluster.service:TRACE,org.elasticsearch.cluster.coordination:TRACE") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need verbose logs here as a matter of course?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's convenient to have those when tests with concurrency are failing. As the tests here are very contained (only invoking a few classes), enabling this will not lead to log flooding. It's useful to have this enabled by default, see e.g. other test classes in distributed land, RelocationIT, MasterDisruptionIT, ClusterDisruptionIT, ... or NodeJoinControllerTests, the Zen1 pendant to these tests.
@@ -972,7 +972,7 @@ private static String groupName(ThreadGroup threadGroup) { | |||
* Returns a random subset of values (including a potential empty list) | |||
*/ | |||
public static <T> List<T> randomSubsetOf(Collection<T> collection) { | |||
return randomSubsetOf(randomInt(Math.max(collection.size() - 1, 0)), collection); | |||
return randomSubsetOf(randomInt(collection.size()), collection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm, this is ok? We previously never returned the whole set (unless it was empty) and now we do. Nowhere expects this, right? I think the Javadoc should say it might return nothing and everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was an actual bug, where someone assumed randomInt to have an inclusive bound instead of an exclusive one (randomIntBetween for example uses inclusive bounds). I can add more javadoc here, but what this fix now actually implements is the definition of a subset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore my explanation about the bounds here. Still think it's a bug. I've enhanced javadocs in 1ce4fdc
public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join"; | ||
|
||
private final MasterService masterService; | ||
private final TransportService transportService; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only used in the constructor, doesn't need to be a field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, not yet. The follow-up PR will add a send method to this class, so it will be of direct use then. If you feel strongly about this, I can revert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that's fine.
import java.util.function.BiConsumer; | ||
import java.util.function.LongSupplier; | ||
|
||
public class JoinHelper extends AbstractComponent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure the separation of responsibilities between this and the Coordinator
is in the right place. E.g. the Coordinator
passes this::handleJoinRequest
straight through to the constructor here solely for this node to register it as a handler for internal:cluster/coordination/join
, but then handleJoinRequest
does a bit of stuff and then calls back into JoinHelper
. There's a lot of back-and-forth. Maybe it'd be better just to merge them - they're not tested separately, for instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The separation I had in mind here was that JoinHelper would be responsible for all join-related transport actions (i.e. later also have a sendJoin method + the startjoin stuff) and all MasterService-related join tasks, similar to NodeJoinController
in Zen1. I would like to avoid cramming all of that into Coordinator. I'm sure that Coordinator will be bloated once we have implemented all the things we want to. The back and forth at the moment is only there for the handleJoinRequest method, which I find to be the better tradeoff if the other option is to put all of this into Coordinator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggested a slightly different split in
#33013 (comment)
}; | ||
|
||
transportService.registerRequestHandler(JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, JoinRequest::new, | ||
(request, channel, task) -> joinRequestHandler.accept(request, new JoinCallback() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this JoinCallback implement toString()
too please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in 4676116
void onFailure(Exception e); | ||
} | ||
|
||
static class JoinTaskListener implements ClusterStateTaskListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this implement toString()
please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in 4676116
assertFalse(isLocalNodeElectedMaster()); | ||
} | ||
|
||
public void testConcurrentJoining() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way we could test this without introducing full multithreading (and therefore possibly nondeterminism)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also have more of these concurrent tests. I expect the non-concurrent functionality of this particular test to later be tested by "LegislatorTests".
} | ||
} | ||
|
||
if (prevElectionWon == false && coordState.electionWon()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can't happen as a LEADER
but can it happen as a FOLLOWER
? I think it can't, because we must have bumped our term too. Therefore, can this be reorganised as a switch on mode
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great observation. The scenario I had in mind while writing these conditions was as follows: Assume you want to do a leader handoff. You send a startJoin to all nodes, telling them to join the new prospective leader. Assume the corresponding join from one node arrives on the prospective leader (that is still a follower) before that one has received the start join. Handling of this join with higher term will trigger ensureTermAtLeast
, turn the node into a candidate, then handle the join, and then reach this point here. So yes, it sounds like we could fold this check into the CANDIDATE branch here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've simplified this in bd242ed
public void testJoinWithHigherTermElectsLeader() { | ||
DiscoveryNode node0 = newNode(0, true); | ||
DiscoveryNode node1 = newNode(1, true); | ||
setupFakeMasterServiceAndCoordinator(1, initialState(false, node0, 1, 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throughout the tests, could we pick more variable terms and versions? All these 1
s and 2
s are tricky to keep track of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 7dcd656
} | ||
} | ||
|
||
switch (mode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great. I now wonder if this whole block can move into the JoinHelper
. If the code above returned its mode
and called becomeLeader()
then the JoinHelper
should be able to work out the right things to do with the join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried, but that would make JoinHelper be aware of the mode, as well as need to call becomeLeader
, so it would be calling back into Coordinator (or CoordinationState), which also feels wrong. Finally, these lines here need to happen under the mutex, so even if I move part of this over to Coordinator, it can not just selectively call into Coordinator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed my attempt at this. I'm not sure if it's an improvement over the existing code though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
attempt is here: 6f58a8c
assertFalse(isLocalNodeElectedMaster()); | ||
joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion)))); | ||
assertTrue(isLocalNodeElectedMaster()); | ||
assertTrue(clusterStateHasNode(node1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this test is strong enough - I think it should be showing that non-winning joins end up in the cluster state, but node1
's join was a winning one. Perhaps assert that node0
is there, although node0
is the thing collecting the joins, so maybe we need a node2
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a second node in 06ab9e1
new VotingConfiguration(Collections.singleton(node0.getId())))); | ||
long newTerm = initialTerm + randomLongBetween(1, 10); | ||
coordinator.coordinationState.get().handleStartJoin(new StartJoinRequest(node1, newTerm)); | ||
synchronized (coordinator.mutex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems ugly, although necessary given the current infrastructure. //TODO
fix this in future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comment in 5b390a8
|
||
final String stateUpdateSource = "elected-as-master ([" + pendingAsTasks.size() + "] nodes joined)"; | ||
|
||
pendingAsTasks.put(JoinTaskExecutor.BECOME_MASTER_TASK, (source, e) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should really ask for toString()
s on these handlers too, although this adds noise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The handler is not logged anywhere, so adding toString adds little.
|
||
if (justBecameLeader) { | ||
joinAccumulator.submitPendingJoins(); | ||
joinAccumulator = new LeaderJoinAccumulator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a risk here that we become leader, then become candidate, and then overwrite the accumulator here. It's ok if we becomeCandidate
on each new election, but this is at least worthy of a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I misunderstood you, but I think the scenario you've described is not possible. Both handleJoinRequest (where we would become leader) and this follow-up code is under the same mutex here in JoinHelper. If there was another concurrent becoming candidate in Coordinator (after it switched to leader), it would notify us, but that notification would acquire same mutex and wait for our leader transition to complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Coordinator
becomes leader in joinHandler.test()
not in handleJoinRequest
, and that's outside this mutex, so it's technically possible that it could become a candidate again before this synchronised block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, right. Should we extend the mutex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this race here can also lead to another odd situation:
Assume you have two concurrent join requests, both are required to become leader. The first that enters handleJoin or Coordinator will return becomeLeader = false, the second one will return true. If the second one now gets to execute this section here first, it will be submitted to the masterservice whereas the second will only be submitted in a follow-up. This means its possible that only the second node will be part of the cluster state that is published as becoming leader.
private final Predicate<Join> joinHandler; | ||
private final JoinTaskExecutor joinTaskExecutor; | ||
private final Object mutex = new Object(); | ||
private JoinAccumulator joinAccumulator = new CandidateJoinAccumulator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm undecided about whether this is too much machinery, and a simple Mode
variable would be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we talked about this and think it's fine
} | ||
} | ||
|
||
interface JoinAccumulator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This definitely feels like overkill now the JoinHelper
is mode-aware and its mode is in sync with the coordinator.
|
||
public void invariant() { | ||
synchronized (mutex) { | ||
if (mode == Mode.LEADER) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can restore those assertions about the state of the join helper here - i.e. no accumulated joins when leader or follower.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, LGTM
Adds the logic for handling joins by a prospective leader. Introduces the Coordinator class with the basic lifecycle modes (candidate, leader, follower) as well as a JoinHelper class that contains most of the plumbing for handling joins.