diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index c77d3f01e5f38..398dd908945a9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -325,7 +325,8 @@ public RoutingNodes getRoutingNodes() { public String toString() { StringBuilder sb = new StringBuilder(); final String TAB = " "; - sb.append("cluster uuid: ").append(metaData.clusterUUID()).append("\n"); + sb.append("cluster uuid: ").append(metaData.clusterUUID()) + .append(" [committed: ").append(metaData.clusterUUIDCommitted()).append("]").append("\n"); sb.append("version: ").append(version).append("\n"); sb.append("state uuid: ").append(stateUUID).append("\n"); sb.append("from_diff: ").append(wasReadFromDiff).append("\n"); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index af9a38bec49ed..bf6ed67f874b3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -729,7 +729,6 @@ public void invariant() { assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID); - assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted(); assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector + " vs " + getPreVoteResponse(); diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java index 85a9c44899128..fc682f1906b48 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -86,7 +86,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste private final Runnable recoveryRunnable; - private final AtomicBoolean recovered = new AtomicBoolean(); + private final AtomicBoolean recoveryInProgress = new AtomicBoolean(); private final AtomicBoolean scheduledRecovery = new AtomicBoolean(); @Inject @@ -214,7 +214,7 @@ public void onFailure(Exception e) { @Override protected void doRun() { - if (recovered.compareAndSet(false, true)) { + if (recoveryInProgress.compareAndSet(false, true)) { logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime); recoveryRunnable.run(); } @@ -222,7 +222,7 @@ protected void doRun() { }, recoverAfterTime, ThreadPool.Names.GENERIC); } } else { - if (recovered.compareAndSet(false, true)) { + if (recoveryInProgress.compareAndSet(false, true)) { threadPool.generic().execute(new AbstractRunnable() { @Override public void onFailure(final Exception e) { @@ -240,7 +240,7 @@ protected void doRun() { } private void resetRecoveredFlags() { - recovered.set(false); + recoveryInProgress.set(false); scheduledRecovery.set(false); } @@ -259,6 +259,9 @@ public ClusterState execute(final ClusterState currentState) { @Override public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { logger.info("recovered [{}] indices into cluster_state", newState.metaData().indices().size()); + // reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a + // not-recovered state, that we again do another state recovery. + resetRecoveredFlags(); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 62fe362298631..0094a8d722b8b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -45,6 +45,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode.Role; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -70,6 +71,8 @@ import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver; import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.gateway.ClusterStateUpdaters; +import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.gateway.MockGatewayMetaState; import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService; @@ -131,6 +134,7 @@ import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION; import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING; +import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.hamcrest.Matchers.containsString; @@ -191,6 +195,45 @@ public void testRepeatableTests() throws Exception { assertEquals(result1, result2); } + /** + * This test was added to verify that state recovery is properly reset on a node after it has become master and successfully + * recovered a state (see {@link GatewayService}). The situation which triggers this with a decent likelihood is as follows: + * 3 master-eligible nodes (leader, follower1, follower2), the followers are shut down (leader remains), when followers come back + * one of them becomes leader and publishes first state (with STATE_NOT_RECOVERED_BLOCK) to old leader, which accepts it. + * Old leader is initiating an election at the same time, and wins election. It becomes leader again, but as it previously + * successfully completed state recovery, is never reset to a state where state recovery can be retried. + */ + public void testStateRecoveryResetAfterPreviousLeadership() { + final Cluster cluster = new Cluster(3); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + final ClusterNode follower1 = cluster.getAnyNodeExcept(leader); + final ClusterNode follower2 = cluster.getAnyNodeExcept(leader, follower1); + + // restart follower1 and follower2 + for (ClusterNode clusterNode : Arrays.asList(follower1, follower2)) { + clusterNode.close(); + cluster.clusterNodes.forEach( + cn -> cluster.deterministicTaskQueue.scheduleNow(cn.onNode( + new Runnable() { + @Override + public void run() { + cn.transportService.disconnectFromNode(clusterNode.getLocalNode()); + } + + @Override + public String toString() { + return "disconnect from " + clusterNode.getLocalNode() + " after shutdown"; + } + }))); + cluster.clusterNodes.replaceAll(cn -> cn == clusterNode ? cn.restartedNode() : cn); + } + + cluster.stabilise(); + } + public void testCanUpdateClusterStateAfterStabilisation() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); @@ -1525,6 +1568,10 @@ void stabilise(long stabilisationDurationMillis) { assertTrue(leaderId + " has been bootstrapped", leader.coordinator.isInitialConfigurationSet()); assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId)); + assertThat(leaderId + " has no NO_MASTER_BLOCK", + leader.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false)); + assertThat(leaderId + " has no STATE_NOT_RECOVERED_BLOCK", + leader.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false)); assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion); for (final ClusterNode clusterNode : clusterNodes) { @@ -1556,6 +1603,8 @@ void stabilise(long stabilisationDurationMillis) { equalTo(leader.getLocalNode())); assertThat(nodeId + " has no NO_MASTER_BLOCK", clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false)); + assertThat(nodeId + " has no STATE_NOT_RECOVERED_BLOCK", + clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false)); } else { assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE)); assertThat(nodeId + " has no master", clusterNode.getLastAppliedClusterState().nodes().getMasterNode(), nullValue()); @@ -1725,7 +1774,8 @@ class MockPersistedState implements PersistedState { } else { nodeEnvironment = null; delegate = new InMemoryPersistedState(0L, - clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); + ClusterStateUpdaters.addStateNotRecoveredBlock( + clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L))); } } catch (IOException e) { throw new UncheckedIOException("Unable to create MockPersistedState", e); @@ -1765,8 +1815,9 @@ class MockPersistedState implements PersistedState { clusterState.writeTo(outStream); StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(), new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); + // adapt cluster state to new localNode instance and add blocks delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()), - ClusterState.readFrom(inStream, newLocalNode)); // adapts it to new localNode instance + ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode))); } } catch (IOException e) { throw new UncheckedIOException("Unable to create MockPersistedState", e); @@ -1870,15 +1921,19 @@ protected Optional getDisruptableMockTransport(Transpo transportService)); final Collection> onJoinValidators = Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs))); + final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY); coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(), - ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState, + allocationService, masterService, this::getPersistedState, Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get()); masterService.setClusterStatePublisher(coordinator); + final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService, + deterministicTaskQueue.getThreadPool(this::onNode), null, null, coordinator); logger.trace("starting up [{}]", localNode); transportService.start(); transportService.acceptIncomingRequests(); coordinator.start(); + gatewayService.start(); clusterService.start(); coordinator.startInitialJoin(); }