Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset state recovery after successful recovery #42576

Merged
merged 3 commits into from
May 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ public RoutingNodes getRoutingNodes() {
public String toString() {
StringBuilder sb = new StringBuilder();
final String TAB = " ";
sb.append("cluster uuid: ").append(metaData.clusterUUID()).append("\n");
sb.append("cluster uuid: ").append(metaData.clusterUUID())
.append(" [committed: ").append(metaData.clusterUUIDCommitted()).append("]").append("\n");
sb.append("version: ").append(version).append("\n");
sb.append("state uuid: ").append(stateUUID).append("\n");
sb.append("from_diff: ").append(wasReadFromDiff).append("\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,6 @@ public void invariant() {
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID);
assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted();
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
: preVoteCollector + " vs " + getPreVoteResponse();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste

private final Runnable recoveryRunnable;

private final AtomicBoolean recovered = new AtomicBoolean();
private final AtomicBoolean recoveryInProgress = new AtomicBoolean();
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();

@Inject
Expand Down Expand Up @@ -211,15 +211,15 @@ public void onFailure(Exception e) {

@Override
protected void doRun() {
if (recovered.compareAndSet(false, true)) {
if (recoveryInProgress.compareAndSet(false, true)) {
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
recoveryRunnable.run();
}
}
}, recoverAfterTime, ThreadPool.Names.GENERIC);
}
} else {
if (recovered.compareAndSet(false, true)) {
if (recoveryInProgress.compareAndSet(false, true)) {
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(final Exception e) {
Expand All @@ -237,7 +237,7 @@ protected void doRun() {
}

private void resetRecoveredFlags() {
recovered.set(false);
recoveryInProgress.set(false);
scheduledRecovery.set(false);
}

Expand All @@ -256,6 +256,9 @@ public ClusterState execute(final ClusterState currentState) {
@Override
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
logger.info("recovered [{}] indices into cluster_state", newState.metaData().indices().size());
// reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
// not-recovered state, that we again do another state recovery.
resetRecoveredFlags();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to rename the recovered field to recoveryAllowed (or something along those lines) since it's not really a one time thing to run recovery now?

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
Expand All @@ -69,6 +70,8 @@
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.ClusterStateUpdaters;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.gateway.MockGatewayMetaState;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
Expand Down Expand Up @@ -130,6 +133,7 @@
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES;
import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION;
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -190,6 +194,45 @@ public void testRepeatableTests() throws Exception {
assertEquals(result1, result2);
}

/**
* This test was added to verify that state recovery is properly reset on a node after it has become master and successfully
* recovered a state (see {@link GatewayService}). The situation which triggers this with a decent likelihood is as follows:
* 3 master-eligible nodes (leader, follower1, follower2), the followers are shut down (leader remains), when followers come back
* one of them becomes leader and publishes first state (with STATE_NOT_RECOVERED_BLOCK) to old leader, which accepts it.
* Old leader is initiating an election at the same time, and wins election. It becomes leader again, but as it previously
* successfully completed state recovery, is never reset to a state where state recovery can be retried.
*/
public void testStateRecoveryResetAfterPreviousLeadership() {
final Cluster cluster = new Cluster(3);
cluster.runRandomly();
cluster.stabilise();

final ClusterNode leader = cluster.getAnyLeader();
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader);
final ClusterNode follower2 = cluster.getAnyNodeExcept(leader, follower1);

// restart follower1 and follower2
for (ClusterNode clusterNode : Arrays.asList(follower1, follower2)) {
clusterNode.close();
cluster.clusterNodes.forEach(
cn -> cluster.deterministicTaskQueue.scheduleNow(cn.onNode(
new Runnable() {
@Override
public void run() {
cn.transportService.disconnectFromNode(clusterNode.getLocalNode());
}

@Override
public String toString() {
return "disconnect from " + clusterNode.getLocalNode() + " after shutdown";
}
})));
cluster.clusterNodes.replaceAll(cn -> cn == clusterNode ? cn.restartedNode() : cn);
}

cluster.stabilise();
}

public void testCanUpdateClusterStateAfterStabilisation() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly();
Expand Down Expand Up @@ -1524,6 +1567,10 @@ void stabilise(long stabilisationDurationMillis) {

assertTrue(leaderId + " has been bootstrapped", leader.coordinator.isInitialConfigurationSet());
assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId));
assertThat(leaderId + " has no NO_MASTER_BLOCK",
leader.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false));
assertThat(leaderId + " has no STATE_NOT_RECOVERED_BLOCK",
leader.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false));
assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion);

for (final ClusterNode clusterNode : clusterNodes) {
Expand Down Expand Up @@ -1555,6 +1602,8 @@ void stabilise(long stabilisationDurationMillis) {
equalTo(leader.getLocalNode()));
assertThat(nodeId + " has no NO_MASTER_BLOCK",
clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false));
assertThat(nodeId + " has no STATE_NOT_RECOVERED_BLOCK",
clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false));
} else {
assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE));
assertThat(nodeId + " has no master", clusterNode.getLastAppliedClusterState().nodes().getMasterNode(), nullValue());
Expand Down Expand Up @@ -1724,7 +1773,8 @@ class MockPersistedState implements PersistedState {
} else {
nodeEnvironment = null;
delegate = new InMemoryPersistedState(0L,
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
ClusterStateUpdaters.addStateNotRecoveredBlock(
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)));
}
} catch (IOException e) {
throw new UncheckedIOException("Unable to create MockPersistedState", e);
Expand Down Expand Up @@ -1764,8 +1814,9 @@ class MockPersistedState implements PersistedState {
clusterState.writeTo(outStream);
StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
// adapt cluster state to new localNode instance and add blocks
delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()),
ClusterState.readFrom(inStream, newLocalNode)); // adapts it to new localNode instance
ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode)));
}
} catch (IOException e) {
throw new UncheckedIOException("Unable to create MockPersistedState", e);
Expand Down Expand Up @@ -1869,15 +1920,19 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
transportService));
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState,
allocationService, masterService, this::getPersistedState,
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get());
masterService.setClusterStatePublisher(coordinator);
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator);

logger.trace("starting up [{}]", localNode);
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
gatewayService.start();
clusterService.start();
coordinator.startInitialJoin();
}
Expand Down