Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch more tests to zen2 #36367

Merged
merged 8 commits into from
Dec 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(AbstractDisruptionTestCase.DEFAULT_SETTINGS)
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false)
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // requires more work
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s")
.build();
}
Expand Down Expand Up @@ -133,7 +132,7 @@ public void clusterChanged(ClusterChangedEvent event) {

logger.info("--> wait until the snapshot is done");
assertBusy(() -> {
SnapshotsInProgress snapshots = dataNodeClient().admin().cluster().prepareState().setLocal(true).get().getState()
SnapshotsInProgress snapshots = dataNodeClient().admin().cluster().prepareState().setLocal(false).get().getState()
.custom(SnapshotsInProgress.TYPE);
if (snapshots != null && snapshots.entries().size() > 0) {
logger.info("Current snapshot state [{}]", snapshots.entries().get(0).state());
Expand All @@ -146,15 +145,9 @@ public void clusterChanged(ClusterChangedEvent event) {
logger.info("--> verify that snapshot was successful or no longer exist");
assertBusy(() -> {
try {
GetSnapshotsResponse snapshotsStatusResponse = dataNodeClient().admin().cluster().prepareGetSnapshots("test-repo")
.setSnapshots("test-snap-2").get();
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards());
assertEquals(0, snapshotInfo.failedShards());
logger.info("--> done verifying");
assertSnapshotExists("test-repo", "test-snap-2");
} catch (SnapshotMissingException exception) {
logger.info("--> snapshot doesn't exist");
logger.info("--> done verifying, snapshot doesn't exist");
}
}, 1, TimeUnit.MINUTES);

Expand All @@ -172,6 +165,21 @@ public void clusterChanged(ClusterChangedEvent event) {
cause = cause.getCause();
assertThat(cause, instanceOf(FailedToCommitClusterStateException.class));
}

logger.info("--> verify that snapshot eventually will be created due to retries");
assertBusy(() -> {
assertSnapshotExists("test-repo", "test-snap-2");
}, 1, TimeUnit.MINUTES);
}

private void assertSnapshotExists(String repository, String snapshot) {
GetSnapshotsResponse snapshotsStatusResponse = dataNodeClient().admin().cluster().prepareGetSnapshots(repository)
.setSnapshots(snapshot).get();
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards());
assertEquals(0, snapshotInfo.failedShards());
logger.info("--> done verifying, snapshot exists");
}

private void createRandomIndex(String idxName) throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,14 @@ public void testTwoNodesSingleDoc() throws Exception {
}

public void testDanglingIndices() throws Exception {
/*TODO This test test does not work with Zen2, because once master node looses its cluster state during restart
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the analysis on this one. It still has Zen2 disabled though? Can you just add the TODO comment, but leave the test unchanged until we have a proper fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will start with term = 1, which is the same as the term data node has. Data node won't accept cluster state from master
after the restart, because the term is the same, but version of the cluster state is greater on the data node.
Consider adding term to JoinRequest, so that master node can bump its term if its current term is less than JoinRequest#term.
*/
logger.info("--> starting two nodes");

final String node_1 = internalCluster().startNodes(2,
//TODO fails wih Zen2
Settings.builder().put(TestZenDiscovery.USE_ZEN2.getKey(), false).build()).get(0);

logger.info("--> indexing a simple document");
Expand Down Expand Up @@ -333,9 +337,7 @@ public void testIndexDeletionWhenNodeRejoins() throws Exception {
final List<String> nodes;
logger.info("--> starting a cluster with " + numNodes + " nodes");
nodes = internalCluster().startNodes(numNodes,
Settings.builder().put(IndexGraveyard.SETTING_MAX_TOMBSTONES.getKey(), randomIntBetween(10, 100))
//TODO fails with Zen2
.put(TestZenDiscovery.USE_ZEN2.getKey(), false).build());
Settings.builder().put(IndexGraveyard.SETTING_MAX_TOMBSTONES.getKey(), randomIntBetween(10, 100)).build());
logger.info("--> create an index");
createIndex(indexName);

Expand All @@ -355,6 +357,7 @@ public Settings onNodeStopped(final String nodeName) throws Exception {
final Client client = client(otherNode);
client.admin().indices().prepareDelete(indexName).execute().actionGet();
assertFalse(client.admin().indices().prepareExists(indexName).execute().actionGet().isExists());
logger.info("--> index deleted");
return super.onNodeStopped(nodeName);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.engine.MockEngineSupport;
import org.elasticsearch.test.transport.MockTransportService;

Expand Down Expand Up @@ -99,16 +98,9 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // no state persistence yet
.build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class);
Expand Down Expand Up @@ -260,7 +252,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
}

public void testCorruptTranslogTruncation() throws Exception {
internalCluster().startNodes(2, Settings.EMPTY);
internalCluster().startNodes(2);

final String node1 = internalCluster().getNodeNames()[0];
final String node2 = internalCluster().getNodeNames()[1];
Expand Down Expand Up @@ -436,10 +428,10 @@ public Settings onNodeStopped(String nodeName) throws Exception {
}

public void testCorruptTranslogTruncationOfReplica() throws Exception {
internalCluster().startNodes(2, Settings.EMPTY);
internalCluster().startMasterOnlyNode();

final String node1 = internalCluster().getNodeNames()[0];
final String node2 = internalCluster().getNodeNames()[1];
final String node1 = internalCluster().startDataOnlyNode();
final String node2 = internalCluster().startDataOnlyNode();
logger.info("--> nodes name: {}, {}", node1, node2);

final String indexName = "test";
Expand Down Expand Up @@ -481,12 +473,11 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception {
final ShardId shardId = new ShardId(resolveIndex(indexName), 0);
final Set<Path> translogDirs = getDirs(node2, shardId, ShardPath.TRANSLOG_FOLDER_NAME);

// stop the cluster nodes. we don't use full restart so the node start up order will be the same
// and shard roles will be maintained
// stop data nodes. After the restart the 1st node will be primary and the 2nd node will be replica
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();

// Corrupt the translog file(s)
// Corrupt the translog file(s) on the replica
logger.info("--> corrupting translog");
TestTranslog.corruptRandomTranslogFile(logger, random(), translogDirs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1641,35 +1641,7 @@ private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws
}

private synchronized void stopNodesAndClients(Collection<NodeAndClient> nodeAndClients) throws IOException {
final Set<String> excludedNodeIds = new HashSet<>();

if (autoManageMinMasterNodes && nodeAndClients.size() > 0) {

final long currentMasters = nodes.values().stream().filter(NodeAndClient::isMasterEligible).count();
final long stoppingMasters = nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count();

assert stoppingMasters <= currentMasters : currentMasters + " < " + stoppingMasters;
if (stoppingMasters != currentMasters && stoppingMasters > 0) {
// If stopping few enough master-nodes that there's still a majority left, there is no need to withdraw their votes first.
// However, we do not yet have a way to be sure there's a majority left, because the voting configuration may not yet have
// been updated when the previous nodes shut down, so we must always explicitly withdraw votes.
// TODO add cluster health API to check that voting configuration is optimal so this isn't always needed
nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).map(NodeAndClient::getName).forEach(excludedNodeIds::add);
assert excludedNodeIds.size() == stoppingMasters;

logger.info("adding voting config exclusions {} prior to shutdown", excludedNodeIds);
try {
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
new AddVotingConfigExclusionsRequest(excludedNodeIds.toArray(new String[0]))).get();
} catch (InterruptedException | ExecutionException e) {
throw new AssertionError("unexpected", e);
}
}

if (stoppingMasters > 0) {
updateMinMasterNodes(getMasterNodesCount() - Math.toIntExact(stoppingMasters));
}
}
final Set<String> excludedNodeIds = excludeMasters(nodeAndClients);

for (NodeAndClient nodeAndClient: nodeAndClients) {
removeDisruptionSchemeFromNode(nodeAndClient);
Expand All @@ -1678,14 +1650,7 @@ private synchronized void stopNodesAndClients(Collection<NodeAndClient> nodeAndC
nodeAndClient.close();
}

if (excludedNodeIds.isEmpty() == false) {
logger.info("removing voting config exclusions for {} after shutdown", excludedNodeIds);
try {
client().execute(ClearVotingConfigExclusionsAction.INSTANCE, new ClearVotingConfigExclusionsRequest()).get();
} catch (InterruptedException | ExecutionException e) {
throw new AssertionError("unexpected", e);
}
}
removeExclusions(excludedNodeIds);
}

/**
Expand Down Expand Up @@ -1751,31 +1716,78 @@ public synchronized void rollingRestart(RestartCallback callback) throws Excepti

private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) throws Exception {
logger.info("Restarting node [{}] ", nodeAndClient.name);

if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}
final int masterNodesCount = getMasterNodesCount();
// special case to allow stopping one node in a two node cluster and keep it functional
final boolean updateMinMaster = nodeAndClient.isMasterEligible() && masterNodesCount == 2 && autoManageMinMasterNodes;
if (updateMinMaster) {
updateMinMasterNodes(masterNodesCount - 1);
}

Set<String> excludedNodeIds = excludeMasters(Collections.singleton(nodeAndClient));

final Settings newSettings = nodeAndClient.closeForRestart(callback,
autoManageMinMasterNodes ? getMinMasterNodes(masterNodesCount) : -1);
autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1);

removeExclusions(excludedNodeIds);

nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(emptyList()));
nodeAndClient.startNode();
if (activeDisruptionScheme != null) {
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
}
if (callback.validateClusterForming() || updateMinMaster) {

if (callback.validateClusterForming() || excludedNodeIds.isEmpty() == false) {
// we have to validate cluster size if updateMinMaster == true, because we need the
// second node to join in order to increment min_master_nodes back to 2.
// we also have to do via the node that was just restarted as it may be that the master didn't yet process
// the fact it left
validateClusterFormed(nodeAndClient.name);
}
if (updateMinMaster) {
updateMinMasterNodes(masterNodesCount);

if (excludedNodeIds.isEmpty() == false) {
updateMinMasterNodes(getMasterNodesCount());
}
}

private Set<String> excludeMasters(Collection<NodeAndClient> nodeAndClients) {
final Set<String> excludedNodeIds = new HashSet<>();
if (autoManageMinMasterNodes && nodeAndClients.size() > 0) {

final long currentMasters = nodes.values().stream().filter(NodeAndClient::isMasterEligible).count();
final long stoppingMasters = nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count();

assert stoppingMasters <= currentMasters : currentMasters + " < " + stoppingMasters;
if (stoppingMasters != currentMasters && stoppingMasters > 0) {
// If stopping few enough master-nodes that there's still a majority left, there is no need to withdraw their votes first.
// However, we do not yet have a way to be sure there's a majority left, because the voting configuration may not yet have
// been updated when the previous nodes shut down, so we must always explicitly withdraw votes.
// TODO add cluster health API to check that voting configuration is optimal so this isn't always needed
nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).map(NodeAndClient::getName).forEach(excludedNodeIds::add);
assert excludedNodeIds.size() == stoppingMasters;

logger.info("adding voting config exclusions {} prior to restart/shutdown", excludedNodeIds);
try {
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
new AddVotingConfigExclusionsRequest(excludedNodeIds.toArray(new String[0]))).get();
} catch (InterruptedException | ExecutionException e) {
throw new AssertionError("unexpected", e);
}
}

if (stoppingMasters > 0) {
updateMinMasterNodes(getMasterNodesCount() - Math.toIntExact(stoppingMasters));
}
}
return excludedNodeIds;
}

private void removeExclusions(Set<String> excludedNodeIds) {
if (excludedNodeIds.isEmpty() == false) {
logger.info("removing voting config exclusions for {} after restart/shutdown", excludedNodeIds);
try {
Client client = getRandomNodeAndClient(node -> excludedNodeIds.contains(node.name) == false).client(random);
client.execute(ClearVotingConfigExclusionsAction.INSTANCE, new ClearVotingConfigExclusionsRequest()).get();
} catch (InterruptedException | ExecutionException e) {
throw new AssertionError("unexpected", e);
}
}
}

Expand Down Expand Up @@ -1833,7 +1845,6 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception
}
}


/**
* Returns the name of the current master node in the cluster.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ private NodeConfigurationSource createNodeConfigurationSource() {
builder.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS));
builder.putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()); // empty list disables a port scan for other nodes
builder.putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file");
builder.put(TestZenDiscovery.USE_ZEN2.getKey(), false); // some tests do full cluster restarts
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
builder.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
builder.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
Expand Down