Skip to content

Commit

Permalink
Fix testDataOnlyNodePersistence (#56893)
Browse files Browse the repository at this point in the history
This test failed if all 1000 top-level `rarely()` calls in the loop returned
`false`, because then we would never set the term of the persisted state. This
commit fixes this by adding an earlier call to `persistedState#setCurrentTerm`
and also randomises the number of iterations to catch other potential issues
like this.  It also changes the test to clean up the threadpools it starts
whether it passes or fails.
  • Loading branch information
DaveCTurner authored May 18, 2020
1 parent 76e45d9 commit 703dad7
Showing 1 changed file with 92 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.io.IOError;
import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -328,95 +329,103 @@ public void testStatePersistedOnLoad() throws IOException {
}

public void testDataOnlyNodePersistence() throws Exception {
DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(),
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT);
Settings settings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName.value()).put(
Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_NAME_SETTING.getKey(), "test").build();
final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode);
final TransportService transportService = mock(TransportService.class);
TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode");
when(transportService.getThreadPool()).thenReturn(threadPool);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
final PersistedClusterStateService persistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
gateway.start(settings, transportService, clusterService,
new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService);
final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class));
final List<Closeable> cleanup = new ArrayList<>(2);

try {
DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(),
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT);
Settings settings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName.value()).put(
Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_NAME_SETTING.getKey(), "test").build();
final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode);
cleanup.add(gateway);
final TransportService transportService = mock(TransportService.class);
TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode");
cleanup.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
when(transportService.getThreadPool()).thenReturn(threadPool);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
final PersistedClusterStateService persistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
gateway.start(settings, transportService, clusterService,
new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService);
final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class));

//generate random coordinationMetadata with different lastAcceptedConfiguration and lastCommittedConfiguration
CoordinationMetadata coordinationMetadata;
do {
coordinationMetadata = createCoordinationMetadata(randomNonNegativeLong());
} while (coordinationMetadata.getLastAcceptedConfiguration().equals(coordinationMetadata.getLastCommittedConfiguration()));

//generate random coordinationMetadata with different lastAcceptedConfiguration and lastCommittedConfiguration
CoordinationMetadata coordinationMetadata;
do {
coordinationMetadata = createCoordinationMetadata(randomNonNegativeLong());
} while (coordinationMetadata.getLastAcceptedConfiguration().equals(coordinationMetadata.getLastCommittedConfiguration()));
ClusterState state = createClusterState(randomNonNegativeLong(),
Metadata.builder().coordinationMetadata(coordinationMetadata)
.clusterUUID(randomAlphaOfLength(10)).build());
persistedState.setCurrentTerm(state.term());
persistedState.setLastAcceptedState(state);
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));

assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(),
not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration())));
CoordinationMetadata persistedCoordinationMetadata =
persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));

persistedState.markLastAcceptedStateAsCommitted();
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));

ClusterState state = createClusterState(randomNonNegativeLong(),
Metadata.builder().coordinationMetadata(coordinationMetadata)
.clusterUUID(randomAlphaOfLength(10)).build());
persistedState.setLastAcceptedState(state);
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));

assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(),
not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration())));
CoordinationMetadata persistedCoordinationMetadata =
persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));

persistedState.markLastAcceptedStateAsCommitted();
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));

CoordinationMetadata expectedCoordinationMetadata = CoordinationMetadata.builder(coordinationMetadata)
.lastCommittedConfiguration(coordinationMetadata.getLastAcceptedConfiguration()).build();
ClusterState expectedClusterState =
ClusterState.builder(state).metadata(Metadata.builder().coordinationMetadata(expectedCoordinationMetadata)
.clusterUUID(state.metadata().clusterUUID()).clusterUUIDCommitted(true).build()).build();

assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState());
persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertTrue(persistedClusterStateService.loadBestOnDiskState().metadata.clusterUUIDCommitted());

// generate a series of updates and check if batching works
final String indexName = randomAlphaOfLength(10);
long currentTerm = state.term();
for (int i = 0; i < 1000; i++) {
if (rarely()) {
// bump term
currentTerm = currentTerm + (rarely() ? randomIntBetween(1, 5) : 0L);
persistedState.setCurrentTerm(currentTerm);
} else {
// update cluster state
final int numberOfShards = randomIntBetween(1, 5);
final long term = Math.min(state.term() + (rarely() ? randomIntBetween(1, 5) : 0L), currentTerm);
final IndexMetadata indexMetadata = createIndexMetadata(indexName, numberOfShards, i);
state = createClusterState(state.version() + 1,
Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build());
persistedState.setLastAcceptedState(state);
CoordinationMetadata expectedCoordinationMetadata = CoordinationMetadata.builder(coordinationMetadata)
.lastCommittedConfiguration(coordinationMetadata.getLastAcceptedConfiguration()).build();
ClusterState expectedClusterState =
ClusterState.builder(state).metadata(Metadata.builder().coordinationMetadata(expectedCoordinationMetadata)
.clusterUUID(state.metadata().clusterUUID()).clusterUUIDCommitted(true).build()).build();

assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState());
persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertTrue(persistedClusterStateService.loadBestOnDiskState().metadata.clusterUUIDCommitted());

// generate a series of updates and check if batching works
final String indexName = randomAlphaOfLength(10);
long currentTerm = state.term();
for (int i = 0; i < 1000; i++) {
if (rarely()) {
// bump term
currentTerm = currentTerm + (rarely() ? randomIntBetween(1, 5) : 0L);
persistedState.setCurrentTerm(currentTerm);
} else {
// update cluster state
final int numberOfShards = randomIntBetween(1, 5);
final long term = Math.min(state.term() + (rarely() ? randomIntBetween(1, 5) : 0L), currentTerm);
final IndexMetadata indexMetadata = createIndexMetadata(indexName, numberOfShards, i);
state = createClusterState(state.version() + 1,
Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build());
persistedState.setLastAcceptedState(state);
}
}
}
assertEquals(currentTerm, persistedState.getCurrentTerm());
assertClusterStateEqual(state, persistedState.getLastAcceptedState());
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));
assertEquals(currentTerm, persistedState.getCurrentTerm());
assertClusterStateEqual(state, persistedState.getLastAcceptedState());
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));

gateway.close();
gateway.close();
assertTrue(cleanup.remove(gateway));

try (CoordinationState.PersistedState reloadedPersistedState = newGatewayPersistedState()) {
assertEquals(currentTerm, reloadedPersistedState.getCurrentTerm());
assertClusterStateEqual(GatewayMetaState.AsyncLucenePersistedState.resetVotingConfiguration(state),
reloadedPersistedState.getLastAcceptedState());
assertNotNull(reloadedPersistedState.getLastAcceptedState().metadata().index(indexName));
try (CoordinationState.PersistedState reloadedPersistedState = newGatewayPersistedState()) {
assertEquals(currentTerm, reloadedPersistedState.getCurrentTerm());
assertClusterStateEqual(GatewayMetaState.AsyncLucenePersistedState.resetVotingConfiguration(state),
reloadedPersistedState.getLastAcceptedState());
assertNotNull(reloadedPersistedState.getLastAcceptedState().metadata().index(indexName));
}
} finally {
IOUtils.close(cleanup);
}

ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}

public void testStatePersistenceWithIOIssues() throws IOException {
Expand Down

0 comments on commit 703dad7

Please sign in to comment.