From 90f41c44570e46a257bd13f09e747ad3a8aabb46 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 24 May 2019 20:44:23 +0200 Subject: [PATCH 1/3] Reset state recovery after successful recovery --- .../elasticsearch/cluster/ClusterState.java | 3 +- .../cluster/coordination/Coordinator.java | 1 - .../elasticsearch/gateway/GatewayService.java | 3 + .../coordination/CoordinatorTests.java | 60 ++++++++++++++++++- 4 files changed, 62 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index 6a5e2a324965f..6cde7d5b3bb10 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -297,7 +297,8 @@ public RoutingNodes getRoutingNodes() { public String toString() { StringBuilder sb = new StringBuilder(); final String TAB = " "; - sb.append("cluster uuid: ").append(metaData.clusterUUID()).append("\n"); + sb.append("cluster uuid: ").append(metaData.clusterUUID()) + .append(" [committed: ").append(metaData.clusterUUIDCommitted()).append("]").append("\n"); sb.append("version: ").append(version).append("\n"); sb.append("state uuid: ").append(stateUUID).append("\n"); sb.append("from_diff: ").append(wasReadFromDiff).append("\n"); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 1e7b38e50d1e9..4b703cd177f72 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -699,7 +699,6 @@ public void invariant() { assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID); - assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted(); assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector + " vs " + getPreVoteResponse(); diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java index b7b7d0759980e..989bfafdf5bb8 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -256,6 +256,9 @@ public ClusterState execute(final ClusterState currentState) { @Override public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { logger.info("recovered [{}] indices into cluster_state", newState.metaData().indices().size()); + // reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a + // not-recovered state, that we again do another state recovery. + resetRecoveredFlags(); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 5daa863402b2a..beb201df1a13b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -45,6 +45,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode.Role; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -69,6 +70,8 @@ import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.gateway.ClusterStateUpdaters; +import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.gateway.MockGatewayMetaState; import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService; @@ -130,6 +133,7 @@ import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION; import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING; +import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.hamcrest.Matchers.containsString; @@ -190,6 +194,45 @@ public void testRepeatableTests() throws Exception { assertEquals(result1, result2); } + /** + * This test was added to verify that state recovery is properly reset on a node afte it has become master and successfully + * recovered a state (see {@link GatewayService}). The situation which triggers this with a decent likelihood is as follows: + * 3 master-eligible nodes (leader, follower1, follower2), the followers are shut down (leader remains), when followers come back + * one of them becomes leader and publishes first state (with STATE_NOT_RECOVERED_BLOCK) to old leader, which accepts it. + * Old leader is initiating an election at the same time, and wins election. It becomes leader again, but as it previously + * successfully completed state recovery, is never reset to a state where state recovery can be retried. + */ + public void testStateRecoveryResetAfterPreviousLeadership() { + final Cluster cluster = new Cluster(3); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + final ClusterNode follower1 = cluster.getAnyNodeExcept(leader); + final ClusterNode follower2 = cluster.getAnyNodeExcept(leader, follower1); + + // restart follower1 and follower2 + for (ClusterNode clusterNode : Arrays.asList(follower1, follower2)) { + clusterNode.close(); + cluster.clusterNodes.forEach( + cn -> cluster.deterministicTaskQueue.scheduleNow(cn.onNode( + new Runnable() { + @Override + public void run() { + cn.transportService.disconnectFromNode(clusterNode.getLocalNode()); + } + + @Override + public String toString() { + return "disconnect from " + clusterNode.getLocalNode() + " after shutdown"; + } + }))); + cluster.clusterNodes.replaceAll(cn -> cn == clusterNode ? cn.restartedNode() : cn); + } + + cluster.stabilise(); + } + public void testCanUpdateClusterStateAfterStabilisation() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); @@ -1524,6 +1567,10 @@ void stabilise(long stabilisationDurationMillis) { assertTrue(leaderId + " has been bootstrapped", leader.coordinator.isInitialConfigurationSet()); assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId)); + assertThat(leaderId + " has no NO_MASTER_BLOCK", + leader.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false)); + assertThat(leaderId + " has no STATE_NOT_RECOVERED_BLOCK", + leader.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false)); assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion); for (final ClusterNode clusterNode : clusterNodes) { @@ -1555,6 +1602,8 @@ void stabilise(long stabilisationDurationMillis) { equalTo(leader.getLocalNode())); assertThat(nodeId + " has no NO_MASTER_BLOCK", clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false)); + assertThat(nodeId + " has no STATE_NOT_RECOVERED_BLOCK", + clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false)); } else { assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE)); assertThat(nodeId + " has no master", clusterNode.getLastAppliedClusterState().nodes().getMasterNode(), nullValue()); @@ -1724,7 +1773,8 @@ class MockPersistedState implements PersistedState { } else { nodeEnvironment = null; delegate = new InMemoryPersistedState(0L, - clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); + ClusterStateUpdaters.addStateNotRecoveredBlock( + clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L))); } } catch (IOException e) { throw new UncheckedIOException("Unable to create MockPersistedState", e); @@ -1765,7 +1815,7 @@ class MockPersistedState implements PersistedState { StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(), new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()), - ClusterState.readFrom(inStream, newLocalNode)); // adapts it to new localNode instance + ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode))); // adapts it to new localNode instance } } catch (IOException e) { throw new UncheckedIOException("Unable to create MockPersistedState", e); @@ -1869,15 +1919,19 @@ protected Optional getDisruptableMockTransport(Transpo transportService)); final Collection> onJoinValidators = Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs))); + final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY); coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(), - ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState, + allocationService, masterService, this::getPersistedState, Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get()); masterService.setClusterStatePublisher(coordinator); + final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService, + deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator); logger.trace("starting up [{}]", localNode); transportService.start(); transportService.acceptIncomingRequests(); coordinator.start(); + gatewayService.start(); clusterService.start(); coordinator.startInitialJoin(); } From 8312bfc30cc60bbe86c859c0c3f0d4d9c9aabd7b Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 27 May 2019 10:52:32 +0200 Subject: [PATCH 2/3] chckstl --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index beb201df1a13b..1d248a9742e29 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -1814,8 +1814,9 @@ class MockPersistedState implements PersistedState { clusterState.writeTo(outStream); StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(), new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); + // adapt cluster state to new localNode instance and add blocks delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()), - ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode))); // adapts it to new localNode instance + ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode))); } } catch (IOException e) { throw new UncheckedIOException("Unable to create MockPersistedState", e); From 669d321e5e78216b2a45b5bfada1976994af3986 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 28 May 2019 10:05:34 +0200 Subject: [PATCH 3/3] Armin's review comments --- .../java/org/elasticsearch/gateway/GatewayService.java | 8 ++++---- .../cluster/coordination/CoordinatorTests.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java index 989bfafdf5bb8..3e9c25847f6a7 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -85,7 +85,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste private final Runnable recoveryRunnable; - private final AtomicBoolean recovered = new AtomicBoolean(); + private final AtomicBoolean recoveryInProgress = new AtomicBoolean(); private final AtomicBoolean scheduledRecovery = new AtomicBoolean(); @Inject @@ -211,7 +211,7 @@ public void onFailure(Exception e) { @Override protected void doRun() { - if (recovered.compareAndSet(false, true)) { + if (recoveryInProgress.compareAndSet(false, true)) { logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime); recoveryRunnable.run(); } @@ -219,7 +219,7 @@ protected void doRun() { }, recoverAfterTime, ThreadPool.Names.GENERIC); } } else { - if (recovered.compareAndSet(false, true)) { + if (recoveryInProgress.compareAndSet(false, true)) { threadPool.generic().execute(new AbstractRunnable() { @Override public void onFailure(final Exception e) { @@ -237,7 +237,7 @@ protected void doRun() { } private void resetRecoveredFlags() { - recovered.set(false); + recoveryInProgress.set(false); scheduledRecovery.set(false); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 1d248a9742e29..4f1937efc9c74 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -195,7 +195,7 @@ public void testRepeatableTests() throws Exception { } /** - * This test was added to verify that state recovery is properly reset on a node afte it has become master and successfully + * This test was added to verify that state recovery is properly reset on a node after it has become master and successfully * recovered a state (see {@link GatewayService}). The situation which triggers this with a decent likelihood is as follows: * 3 master-eligible nodes (leader, follower1, follower2), the followers are shut down (leader remains), when followers come back * one of them becomes leader and publishes first state (with STATE_NOT_RECOVERED_BLOCK) to old leader, which accepts it.