From ac721938c2276144ed4d2a301c898e8a9cabd705 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 11 Mar 2020 09:03:54 +0000 Subject: [PATCH] Allow joining node to trigger term bump (#53338) In rare circumstances it is possible for an isolated node to have a greater term than the currently-elected leader. Today such a node will attempt to join the cluster but will not offer a vote to the leader and will reject its cluster state publications due to their stale term. This situation persists since there is no mechanism for the joining node to inform the leader that its term is stale and a new election is required. This commit adds the current term of the joining node to the join request. Once the join has been validated, the leader will perform another election to increase its term far enough to allow the isolated node to join properly. Fixes #53271 --- .../cluster/coordination/Coordinator.java | 4 +- .../coordination/DiscoveryUpgradeService.java | 2 +- .../cluster/coordination/JoinHelper.java | 13 ++--- .../cluster/coordination/JoinRequest.java | 46 ++++++++++++++-- .../cluster/coordination/JoinHelperTests.java | 10 ++-- .../cluster/coordination/MessagesTests.java | 11 ++-- .../cluster/coordination/NodeJoinTests.java | 52 +++++++++++++------ .../cluster/coordination/ZenDiscoveryIT.java | 2 +- 8 files changed, 104 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 1282ee1e97df2..0b7da5ee3e5d1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -519,6 +519,8 @@ public void onFailure(Exception e) { private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) { final Optional optionalJoin = joinRequest.getOptionalJoin(); synchronized (mutex) { + updateMaxTermSeen(joinRequest.getTerm()); + final CoordinationState coordState = coordinationState.get(); final boolean prevElectionWon = coordState.electionWon(); @@ -1148,7 +1150,7 @@ private class CoordinatorPeerFinder extends PeerFinder { protected void onActiveMasterFound(DiscoveryNode masterNode, long term) { synchronized (mutex) { ensureTermAtLeast(masterNode, term); - joinHelper.sendJoinRequest(masterNode, joinWithDestination(lastJoin, masterNode, term)); + joinHelper.sendJoinRequest(masterNode, getCurrentTerm(), joinWithDestination(lastJoin, masterNode, term)); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java index 3bf5fa225a934..a549d0dde81b5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java @@ -263,7 +263,7 @@ public void onResponse(Void value) { if (isRunning()) { final MasterCandidate electedMaster = electMasterService.electMaster(masterCandidates); logger.debug("elected {}, sending join", electedMaster); - joinHelper.sendJoinRequest(electedMaster.getNode(), Optional.empty(), + joinHelper.sendJoinRequest(electedMaster.getNode(), 0L, Optional.empty(), JoiningRound.this::scheduleNextAttempt); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 095cfbaa8ee46..4dc629ad8c444 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -130,14 +130,15 @@ public ClusterTasksResult execute(ClusterState currentSta transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, MembershipAction.JoinRequest::new, - (request, channel, task) -> joinHandler.accept(new JoinRequest(request.getNode(), Optional.empty()), // treat as non-voting join + (request, channel, task) -> + joinHandler.accept(new JoinRequest(request.getNode(), 0L, Optional.empty()), // treat as non-voting join transportJoinCallback(request, channel))); transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false, false, StartJoinRequest::new, (request, channel, task) -> { final DiscoveryNode destination = request.getSourceNode(); - sendJoinRequest(destination, Optional.of(joinLeaderInTerm.apply(request))); + sendJoinRequest(destination, currentTermSupplier.getAsLong(), Optional.of(joinLeaderInTerm.apply(request))); channel.sendResponse(Empty.INSTANCE); }); @@ -212,8 +213,8 @@ boolean isJoinPending() { return pendingOutgoingJoins.isEmpty() == false; } - public void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) { - sendJoinRequest(destination, optionalJoin, () -> { + public void sendJoinRequest(DiscoveryNode destination, long term, Optional optionalJoin) { + sendJoinRequest(destination, term, optionalJoin, () -> { }); } @@ -265,9 +266,9 @@ void logLastFailedJoinAttempt() { } } - public void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin, Runnable onCompletion) { + public void sendJoinRequest(DiscoveryNode destination, long term, Optional optionalJoin, Runnable onCompletion) { assert destination.isMasterNode() : "trying to join master-ineligible " + destination; - final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin); + final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin); final Tuple dedupKey = Tuple.tuple(destination, joinRequest); if (pendingOutgoingJoins.add(dedupKey)) { logger.debug("attempting to join {} with {}", destination, joinRequest); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java index 091a6809c84dc..2772599844d18 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java @@ -18,29 +18,53 @@ */ package org.elasticsearch.cluster.coordination; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; +import java.util.Objects; import java.util.Optional; public class JoinRequest extends TransportRequest { + /** + * The sending (i.e. joining) node. + */ private final DiscoveryNode sourceNode; + /** + * The minimum term for which the joining node will accept any cluster state publications. If the joining node is in a strictly greater + * term than the master it wants to join then the master must enter a new term and hold another election. Doesn't necessarily match + * {@link JoinRequest#optionalJoin} and may be zero in join requests sent prior to {@link Version#V_7_7_0}. + */ + private final long minimumTerm; + + /** + * A vote for the receiving node. This vote is optional since the sending node may have voted for a different master in this term. + * That's ok, the sender likely discovered that the master we voted for lost the election and now we're trying to join the winner. Once + * the sender has successfully joined the master, the lack of a vote in its term causes another election (see + * {@link Publication#onMissingJoin(DiscoveryNode)}). + */ private final Optional optionalJoin; - public JoinRequest(DiscoveryNode sourceNode, Optional optionalJoin) { + public JoinRequest(DiscoveryNode sourceNode, long minimumTerm, Optional optionalJoin) { assert optionalJoin.isPresent() == false || optionalJoin.get().getSourceNode().equals(sourceNode); this.sourceNode = sourceNode; + this.minimumTerm = minimumTerm; this.optionalJoin = optionalJoin; } public JoinRequest(StreamInput in) throws IOException { super(in); sourceNode = new DiscoveryNode(in); + if (in.getVersion().onOrAfter(Version.V_7_7_0)) { + minimumTerm = in.readLong(); + } else { + minimumTerm = 0L; + } optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new)); } @@ -48,6 +72,9 @@ public JoinRequest(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); sourceNode.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + out.writeLong(minimumTerm); + } out.writeOptionalWriteable(optionalJoin.orElse(null)); } @@ -55,6 +82,17 @@ public DiscoveryNode getSourceNode() { return sourceNode; } + public long getMinimumTerm() { + return minimumTerm; + } + + public long getTerm() { + // If the join is also present then its term will normally equal the corresponding term, but we do not require callers to + // obtain the term and the join in a synchronized fashion so it's possible that they disagree. Also older nodes do not share the + // minimum term, so for BWC we can take it from the join if present. + return Math.max(minimumTerm, optionalJoin.map(Join::getTerm).orElse(0L)); + } + public Optional getOptionalJoin() { return optionalJoin; } @@ -66,21 +104,21 @@ public boolean equals(Object o) { JoinRequest that = (JoinRequest) o; + if (minimumTerm != that.minimumTerm) return false; if (!sourceNode.equals(that.sourceNode)) return false; return optionalJoin.equals(that.optionalJoin); } @Override public int hashCode() { - int result = sourceNode.hashCode(); - result = 31 * result + optionalJoin.hashCode(); - return result; + return Objects.hash(sourceNode, minimumTerm, optionalJoin); } @Override public String toString() { return "JoinRequest{" + "sourceNode=" + sourceNode + + ", minimumTerm=" + minimumTerm + ", optionalJoin=" + optionalJoin + '}'; } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index a276ad83fe0d3..66da0fcda243a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -69,7 +69,7 @@ public void testJoinDeduplication() { // check that sending a join to node1 works Optional optionalJoin1 = randomBoolean() ? Optional.empty() : Optional.of(new Join(localNode, node1, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); - joinHelper.sendJoinRequest(node1, optionalJoin1); + joinHelper.sendJoinRequest(node1, 0L, optionalJoin1); CapturedRequest[] capturedRequests1 = capturingTransport.getCapturedRequestsAndClear(); assertThat(capturedRequests1.length, equalTo(1)); CapturedRequest capturedRequest1 = capturedRequests1[0]; @@ -80,14 +80,14 @@ public void testJoinDeduplication() { // check that sending a join to node2 works Optional optionalJoin2 = randomBoolean() ? Optional.empty() : Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); - joinHelper.sendJoinRequest(node2, optionalJoin2); + joinHelper.sendJoinRequest(node2, 0L, optionalJoin2); CapturedRequest[] capturedRequests2 = capturingTransport.getCapturedRequestsAndClear(); assertThat(capturedRequests2.length, equalTo(1)); CapturedRequest capturedRequest2 = capturedRequests2[0]; assertEquals(node2, capturedRequest2.node); // check that sending another join to node1 is a noop as the previous join is still in progress - joinHelper.sendJoinRequest(node1, optionalJoin1); + joinHelper.sendJoinRequest(node1, 0L, optionalJoin1); assertThat(capturingTransport.getCapturedRequestsAndClear().length, equalTo(0)); // complete the previous join to node1 @@ -98,7 +98,7 @@ public void testJoinDeduplication() { } // check that sending another join to node1 now works again - joinHelper.sendJoinRequest(node1, optionalJoin1); + joinHelper.sendJoinRequest(node1, 0L, optionalJoin1); CapturedRequest[] capturedRequests1a = capturingTransport.getCapturedRequestsAndClear(); assertThat(capturedRequests1a.length, equalTo(1)); CapturedRequest capturedRequest1a = capturedRequests1a[0]; @@ -107,7 +107,7 @@ public void testJoinDeduplication() { // check that sending another join to node2 works if the optionalJoin is different Optional optionalJoin2a = optionalJoin2.isPresent() && randomBoolean() ? Optional.empty() : Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); - joinHelper.sendJoinRequest(node2, optionalJoin2a); + joinHelper.sendJoinRequest(node2, 0L, optionalJoin2a); CapturedRequest[] capturedRequests2a = capturingTransport.getCapturedRequestsAndClear(); assertThat(capturedRequests2a.length, equalTo(1)); CapturedRequest capturedRequest2a = capturedRequests2a[0]; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java index cb89c7c0fea33..fb99649892ae2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java @@ -179,13 +179,18 @@ public void testJoinRequestEqualsHashCodeSerialization() { Join initialJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); JoinRequest initialJoinRequest = new JoinRequest(initialJoin.getSourceNode(), - randomBoolean() ? Optional.empty() : Optional.of(initialJoin)); + randomNonNegativeLong(), randomBoolean() ? Optional.empty() : Optional.of(initialJoin)); // Note: the explicit cast of the CopyFunction is needed for some IDE (specifically Eclipse 4.8.0) to infer the right type EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialJoinRequest, (CopyFunction) joinRequest -> copyWriteable(joinRequest, writableRegistry(), JoinRequest::new), joinRequest -> { if (randomBoolean() && joinRequest.getOptionalJoin().isPresent() == false) { - return new JoinRequest(createNode(randomAlphaOfLength(20)), joinRequest.getOptionalJoin()); + return new JoinRequest(createNode(randomAlphaOfLength(10)), + joinRequest.getMinimumTerm(), joinRequest.getOptionalJoin()); + } else if (randomBoolean()) { + return new JoinRequest(joinRequest.getSourceNode(), + randomValueOtherThan(joinRequest.getMinimumTerm(), ESTestCase::randomNonNegativeLong), + joinRequest.getOptionalJoin()); } else { // change OptionalJoin final Optional newOptionalJoin; @@ -195,7 +200,7 @@ public void testJoinRequestEqualsHashCodeSerialization() { newOptionalJoin = Optional.of(new Join(joinRequest.getSourceNode(), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); } - return new JoinRequest(joinRequest.getSourceNode(), newOptionalJoin); + return new JoinRequest(joinRequest.getSourceNode(), joinRequest.getMinimumTerm(), newOptionalJoin); } }); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 1e1bf2d058798..1ea352fe4e3a1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -74,6 +74,7 @@ import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class NodeJoinTests extends ESTestCase { @@ -278,7 +279,8 @@ public void testJoinWithHigherTermElectsLeader() { assertFalse(isLocalNodeElectedMaster()); assertNull(coordinator.getStateForMasterService().nodes().getMasterNodeId()); long newTerm = initialTerm + randomLongBetween(1, 10); - SimpleFuture fut = joinNodeAsync(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion)))); + SimpleFuture fut = joinNodeAsync(new JoinRequest(node1, newTerm, + Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion)))); assertEquals(Coordinator.Mode.LEADER, coordinator.getMode()); assertNull(coordinator.getStateForMasterService().nodes().getMasterNodeId()); deterministicTaskQueue.runAllRunnableTasks(); @@ -298,7 +300,8 @@ public void testJoinWithHigherTermButBetterStateGetsRejected() { long newTerm = initialTerm + randomLongBetween(1, 10); long higherVersion = initialVersion + randomLongBetween(1, 10); expectThrows(CoordinationStateRejectedException.class, - () -> joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion))))); + () -> joinNodeAndRun(new JoinRequest(node1, newTerm, + Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion))))); assertFalse(isLocalNodeElectedMaster()); } @@ -312,7 +315,7 @@ public void testJoinWithHigherTermButBetterStateStillElectsMasterThroughSelfJoin assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); long higherVersion = initialVersion + randomLongBetween(1, 10); - joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion)))); + joinNodeAndRun(new JoinRequest(node1, newTerm, Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion)))); assertTrue(isLocalNodeElectedMaster()); } @@ -325,14 +328,32 @@ public void testJoinElectedLeader() { new VotingConfiguration(Collections.singleton(node0.getId())))); assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); - joinNodeAndRun(new JoinRequest(node0, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); + joinNodeAndRun(new JoinRequest(node0, newTerm, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); assertTrue(isLocalNodeElectedMaster()); assertFalse(clusterStateHasNode(node1)); - joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion)))); + joinNodeAndRun(new JoinRequest(node1, newTerm, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion)))); assertTrue(isLocalNodeElectedMaster()); assertTrue(clusterStateHasNode(node1)); } + public void testJoinElectedLeaderWithHigherTerm() { + DiscoveryNode node0 = newNode(0, true); + DiscoveryNode node1 = newNode(1, true); + long initialTerm = randomLongBetween(1, 10); + long initialVersion = randomLongBetween(1, 10); + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, + new VotingConfiguration(Collections.singleton(node0.getId())))); + long newTerm = initialTerm + randomLongBetween(1, 10); + + joinNodeAndRun(new JoinRequest(node0, newTerm, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); + assertTrue(isLocalNodeElectedMaster()); + + long newerTerm = newTerm + randomLongBetween(1, 10); + joinNodeAndRun(new JoinRequest(node1, newerTerm, Optional.empty())); + assertThat(coordinator.getCurrentTerm(), greaterThanOrEqualTo(newerTerm)); + assertTrue(isLocalNodeElectedMaster()); + } + public void testJoinAccumulation() { DiscoveryNode node0 = newNode(0, true); DiscoveryNode node1 = newNode(1, true); @@ -343,17 +364,17 @@ public void testJoinAccumulation() { new VotingConfiguration(Collections.singleton(node2.getId())))); assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); - SimpleFuture futNode0 = joinNodeAsync(new JoinRequest(node0, Optional.of( + SimpleFuture futNode0 = joinNodeAsync(new JoinRequest(node0, newTerm, Optional.of( new Join(node0, node0, newTerm, initialTerm, initialVersion)))); deterministicTaskQueue.runAllRunnableTasks(); assertFalse(futNode0.isDone()); assertFalse(isLocalNodeElectedMaster()); - SimpleFuture futNode1 = joinNodeAsync(new JoinRequest(node1, Optional.of( + SimpleFuture futNode1 = joinNodeAsync(new JoinRequest(node1, newTerm, Optional.of( new Join(node1, node0, newTerm, initialTerm, initialVersion)))); deterministicTaskQueue.runAllRunnableTasks(); assertFalse(futNode1.isDone()); assertFalse(isLocalNodeElectedMaster()); - joinNodeAndRun(new JoinRequest(node2, Optional.of(new Join(node2, node0, newTerm, initialTerm, initialVersion)))); + joinNodeAndRun(new JoinRequest(node2, newTerm, Optional.of(new Join(node2, node0, newTerm, initialTerm, initialVersion)))); assertTrue(isLocalNodeElectedMaster()); assertTrue(clusterStateHasNode(node1)); assertTrue(clusterStateHasNode(node2)); @@ -372,7 +393,7 @@ public void testJoinFollowerWithHigherTerm() throws Exception { handleStartJoinFrom(node1, newTerm); handleFollowerCheckFrom(node1, newTerm); long newerTerm = newTerm + randomLongBetween(1, 10); - joinNodeAndRun(new JoinRequest(node1, + joinNodeAndRun(new JoinRequest(node1, newerTerm, Optional.of(new Join(node1, node0, newerTerm, initialTerm, initialVersion)))); assertTrue(isLocalNodeElectedMaster()); } @@ -447,7 +468,7 @@ public void testJoinFollowerFails() throws Exception { handleStartJoinFrom(node1, newTerm); handleFollowerCheckFrom(node1, newTerm); assertThat(expectThrows(CoordinationStateRejectedException.class, - () -> joinNodeAndRun(new JoinRequest(node1, Optional.empty()))).getMessage(), + () -> joinNodeAndRun(new JoinRequest(node1, newTerm, Optional.empty()))).getMessage(), containsString("join target is a follower")); assertFalse(isLocalNodeElectedMaster()); } @@ -460,7 +481,8 @@ public void testBecomeFollowerFailsPendingJoin() throws Exception { setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, new VotingConfiguration(Collections.singleton(node1.getId())))); long newTerm = initialTerm + randomLongBetween(1, 10); - SimpleFuture fut = joinNodeAsync(new JoinRequest(node0, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); + SimpleFuture fut = joinNodeAsync(new JoinRequest(node0, newTerm, + Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); deterministicTaskQueue.runAllRunnableTasks(); assertFalse(fut.isDone()); assertFalse(isLocalNodeElectedMaster()); @@ -501,7 +523,7 @@ public void testConcurrentJoining() { logger.info("Successful voting nodes: {}", successfulNodes); List correctJoinRequests = successfulNodes.stream().map( - node -> new JoinRequest(node, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion)))) + node -> new JoinRequest(node, newTerm, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion)))) .collect(Collectors.toList()); List possiblyUnsuccessfulNodes = new ArrayList<>(allNodes); @@ -512,15 +534,15 @@ public void testConcurrentJoining() { List possiblyFailingJoinRequests = possiblyUnsuccessfulNodes.stream().map(node -> { if (randomBoolean()) { // a correct request - return new JoinRequest(node, Optional.of(new Join(node, localNode, + return new JoinRequest(node, newTerm, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion))); } else if (randomBoolean()) { // term too low - return new JoinRequest(node, Optional.of(new Join(node, localNode, + return new JoinRequest(node, newTerm, Optional.of(new Join(node, localNode, randomLongBetween(0, initialTerm), initialTerm, initialVersion))); } else { // better state - return new JoinRequest(node, Optional.of(new Join(node, localNode, + return new JoinRequest(node, newTerm, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion + randomLongBetween(1, 10)))); } }).collect(Collectors.toList()); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ZenDiscoveryIT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ZenDiscoveryIT.java index 1889b5292a160..4dde5241f8532 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ZenDiscoveryIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ZenDiscoveryIT.java @@ -113,7 +113,7 @@ public void testHandleNodeJoin_incompatibleClusterState() final CompletableFuture future = new CompletableFuture<>(); DiscoveryNode node = state.nodes().getLocalNode(); - coordinator.sendValidateJoinRequest(stateWithCustomMetaData, new JoinRequest(node, Optional.empty()), + coordinator.sendValidateJoinRequest(stateWithCustomMetaData, new JoinRequest(node, 0L, Optional.empty()), new JoinHelper.JoinCallback() { @Override public void onSuccess() {