-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Zen2: Cluster state publication pipeline #32584
Conversation
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mostly asked for changes around the tests, but there's a couple of things in Publication
too.
There's a handful of unnecessarily-public
methods according to IntelliJ that I think can be more restricted. I haven't looked at the privacy levels in detail yet.
* and an optional {@link Join}. | ||
*/ | ||
public class PublishWithJoinResponse extends TransportResponse { | ||
private final PublishResponse publishResponse; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PublishResponse
is also a TransportResponse
(via TermVersionResponse
) but I think because it's wrapped up like this it could just be a standalone object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed and inlined TermVersionResponse, see a21e8c7
assertTrue(node1.coordinationState.electionWon()); | ||
} | ||
|
||
public void testSimpleClusterStatePublishing() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this make finer assertions about the state that the publication is in? More precise requests follow...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comments below
publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { | ||
PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( | ||
publication.publishRequest); | ||
e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this assert that publication.applyCommit
is null iff n1
hasn't been processed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, fixed in f09659c
assertEquals(publication.applyCommit.getTerm(), publication.publishRequest.getAcceptedState().term()); | ||
assertEquals(publication.applyCommit.getVersion(), publication.publishRequest.getAcceptedState().version()); | ||
publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> { | ||
nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we assert publication.completed == false
and publication.committed == false
at the top of this lambda?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, fixed in f09659c
assertTrue(publication.completed); | ||
assertTrue(publication.committed); | ||
|
||
assertThat(ackListener.await(0L, TimeUnit.SECONDS).size(), equalTo(3)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be:
assertThat(ackListener.await(0L, TimeUnit.SECONDS), containsInAnyOrder(n1, n2, n3));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jup (f09659c)
} | ||
|
||
@Override | ||
protected void onCompletion(boolean committed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we assert that this is only ever called once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, done in f09659c
isCompleted = true; | ||
onCompletion(true); | ||
assert applyCommitRequest.isPresent(); | ||
logger.trace("onPossibleCompletion: [{}] was successful, applying new state locally", this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure it makes sense to talk about applying
the state here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 25d5835
|
||
protected abstract Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse); | ||
|
||
protected abstract void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need the whole PublishWithJoinResponse
or could it just take the Optional<Join>
? Or maybe even the Join
only if present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know yet. I've added a TODO in 43e3399
|
||
private class PublicationTarget { | ||
private final DiscoveryNode discoveryNode; | ||
private final PublicationTargetStateMachine publicationTargetStateMachine = new PublicationTargetStateMachine(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was useful for development, but now things are more stable shall we merge this into PublicationTarget
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, fixed in b7b4e39
} | ||
publicationTargetStateMachine.setState(PublicationTargetState.SENT_PUBLISH_REQUEST); | ||
Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublicationTarget.PublishResponseHandler()); | ||
// TODO Can this ^ fail with an exception? Target should be failed if so. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this question have an answer? I think any failures are passed to the response handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm deferring this TODO until we integrate this with Legislator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it a little strange that we don't have any tests with more than 2 nodes in the configuration, to demonstrate that we only need a majority of responses to commit. I don't think this is a gap in coverage so much as in documentation. I suggested a fix for this, and also found one private
thing that I think shouldn't be.
private boolean ackIsPending = true; | ||
private PublicationTargetState state = PublicationTargetState.NOT_STARTED; | ||
|
||
private PublicationTarget(DiscoveryNode discoveryNode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% sure of the rules here, but private
seems too restricted for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed by 799157d
} | ||
|
||
public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedException { | ||
VotingConfiguration config = new VotingConfiguration(Sets.newHashSet(n1.getId(), n2.getId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we (randomly?) add n3
to the configuration here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, fixed in 799157d
@elasticmachine retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks @DaveCTurner |
This PR implements the state machine on the master to publish a cluster state.
Relates to #32006