Skip to content

Commit

Permalink
Dry up Master Disconnect Disruption Tests (#58953) (#59050)
Browse files Browse the repository at this point in the history
Dry up tests that use a disruption that isolates the master from all other nodes.
Also, turn disruption types that have neither parameters nor state into constants
to make things a little clearer.
  • Loading branch information
original-brownbear authored Jul 6, 2020
1 parent 56136b7 commit 49857cc
Show file tree
Hide file tree
Showing 14 changed files with 54 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,10 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.transport.MockTransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

Expand Down Expand Up @@ -103,13 +98,7 @@ public void run() {
barrier.await();

// interrupt communication between master and other nodes in cluster
String master = internalCluster().getMasterName();
Set<String> otherNodes = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
otherNodes.remove(master);

NetworkDisruption partition = new NetworkDisruption(
new TwoPartitions(Collections.singleton(master), otherNodes),
new NetworkDisconnect());
NetworkDisruption partition = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(partition);

logger.info("--> disrupting network");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.transport.MockTransportService;

import java.util.ArrayList;
Expand Down Expand Up @@ -296,10 +295,7 @@ public void testCannotCommitStateThreeNodes() throws Exception {

final String master = internalCluster().getMasterName();
Set<String> otherNodes = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
otherNodes.remove(master);
NetworkDisruption partition = new NetworkDisruption(
new TwoPartitions(Collections.singleton(master), otherNodes),
new NetworkDisruption.NetworkDisconnect());
NetworkDisruption partition = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(partition);

final CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.IsolateAllNodes;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
import org.elasticsearch.test.transport.MockTransportService;

import java.util.Collection;
Expand Down Expand Up @@ -84,7 +83,7 @@ public void testNoMasterActions() throws Exception {
client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet();

final NetworkDisruption disruptionScheme
= new NetworkDisruption(new IsolateAllNodes(new HashSet<>(nodes)), new NetworkDisconnect());
= new NetworkDisruption(new IsolateAllNodes(new HashSet<>(nodes)), NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(disruptionScheme);
disruptionScheme.startDisrupting();

Expand Down Expand Up @@ -213,7 +212,7 @@ public void testNoMasterActionsWriteMasterBlock() throws Exception {
logger.info("Cluster state:\n{}", clusterState.getState());

final NetworkDisruption disruptionScheme
= new NetworkDisruption(new IsolateAllNodes(new HashSet<>(nodes)), new NetworkDisconnect());
= new NetworkDisruption(new IsolateAllNodes(new HashSet<>(nodes)), NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(disruptionScheme);
disruptionScheme.startDisrupting();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.transport.MockTransportService;

Expand Down Expand Up @@ -148,7 +147,7 @@ private Settings createStaleReplicaScenario(String master) throws Exception {

NetworkDisruption partition = new NetworkDisruption(
new TwoPartitions(Sets.newHashSet(master, replicaNode), Collections.singleton(primaryNode)),
new NetworkDisconnect());
NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(partition);
logger.info("--> partitioning node with primary shard from rest of cluster");
partition.startDisrupting();
Expand Down Expand Up @@ -528,7 +527,7 @@ public void testPrimaryReplicaResyncFailed() throws Exception {
}
final Set<String> replicasSide1 = Sets.newHashSet(randomSubsetOf(between(1, numberOfReplicas - 1), replicaNodes));
final Set<String> replicasSide2 = Sets.difference(replicaNodes, replicasSide1);
NetworkDisruption partition = new NetworkDisruption(new TwoPartitions(replicasSide1, replicasSide2), new NetworkDisconnect());
NetworkDisruption partition = new NetworkDisruption(new TwoPartitions(replicasSide1, replicasSide2), NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(partition);
logger.info("--> isolating some replicas during primary-replica resync");
partition.startDisrupting();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkLinkDisruptionType;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
Expand Down Expand Up @@ -345,8 +343,7 @@ public void testSendingShardFailure() throws Exception {
TwoPartitions partitions = isolateNode(isolatedNode);
// we cannot use the NetworkUnresponsive disruption type here as it will swallow the "shard failed" request, calling neither
// onSuccess nor onFailure on the provided listener.
NetworkLinkDisruptionType disruptionType = new NetworkDisconnect();
NetworkDisruption networkDisruption = new NetworkDisruption(partitions, disruptionType);
NetworkDisruption networkDisruption = new NetworkDisruption(partitions, NetworkDisruption.DISCONNECT);
setDisruptionScheme(networkDisruption);
networkDisruption.startDisrupting();

Expand Down Expand Up @@ -443,7 +440,7 @@ public void testIndicesDeleted() throws Exception {

final String masterNode1 = internalCluster().getMasterName();
NetworkDisruption networkDisruption =
new NetworkDisruption(new TwoPartitions(masterNode1, dataNode), new NetworkDisruption.NetworkUnresponsive());
new NetworkDisruption(new TwoPartitions(masterNode1, dataNode), NetworkDisruption.UNRESPONSIVE);
internalCluster().setDisruptionScheme(networkDisruption);
networkDisruption.startDisrupting();
// We know this will time out due to the partition, we check manually below to not proceed until
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -129,7 +127,7 @@ public void testElectMasterWithLatestVersion() throws Exception {
final Set<String> nodes = new HashSet<>(internalCluster().startNodes(3));
ensureStableCluster(3);
ServiceDisruptionScheme isolateAllNodes =
new NetworkDisruption(new NetworkDisruption.IsolateAllNodes(nodes), new NetworkDisconnect());
new NetworkDisruption(new NetworkDisruption.IsolateAllNodes(nodes), NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(isolateAllNodes);

logger.info("--> forcing a complete election to make sure \"preferred\" master is elected");
Expand All @@ -153,11 +151,7 @@ public void testElectMasterWithLatestVersion() throws Exception {
logger.info("--> preferred master is {}", preferredMaster);
final Set<String> nonPreferredNodes = new HashSet<>(nodes);
nonPreferredNodes.remove(preferredMasterName);
final ServiceDisruptionScheme isolatePreferredMaster =
new NetworkDisruption(
new NetworkDisruption.TwoPartitions(
Collections.singleton(preferredMasterName), nonPreferredNodes),
new NetworkDisconnect());
final ServiceDisruptionScheme isolatePreferredMaster = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(isolatePreferredMaster);
isolatePreferredMaster.startDisrupting();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
Expand Down Expand Up @@ -110,7 +109,7 @@ public void testDisruptionOnSnapshotInitialization() throws Exception {

NetworkDisruption networkDisruption =
new NetworkDisruption(new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode1), otherNodes),
new NetworkDisruption.NetworkUnresponsive());
NetworkDisruption.UNRESPONSIVE);
internalCluster().setDisruptionScheme(networkDisruption);

ClusterService clusterService = internalCluster().clusterService(masterNode1);
Expand Down Expand Up @@ -160,8 +159,8 @@ public void clusterChanged(ClusterChangedEvent event) {

public void testDisruptionAfterFinalization() throws Exception {
final String idxName = "test";
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3);
final String dataNode = internalCluster().startDataOnlyNode();
internalCluster().startMasterOnlyNodes(3);
internalCluster().startDataOnlyNode();
ensureStableCluster(4);

createRandomIndex(idxName);
Expand All @@ -172,13 +171,8 @@ public void testDisruptionAfterFinalization() throws Exception {
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));

final String masterNode1 = internalCluster().getMasterName();
Set<String> otherNodes = new HashSet<>(allMasterEligibleNodes);
otherNodes.remove(masterNode1);
otherNodes.add(dataNode);

NetworkDisruption networkDisruption =
new NetworkDisruption(new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode1), otherNodes),
new NetworkDisruption.NetworkUnresponsive());
NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.UNRESPONSIVE);
internalCluster().setDisruptionScheme(networkDisruption);

ClusterService clusterService = internalCluster().clusterService(masterNode1);
Expand Down Expand Up @@ -248,7 +242,7 @@ public void clusterChanged(ClusterChangedEvent event) {
public void testDisruptionAfterShardFinalization() throws Exception {
final String idxName = "test";
internalCluster().startMasterOnlyNodes(1);
final String dataNode = internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
ensureStableCluster(2);
createIndex(idxName);
index(idxName, "type", JsonXContent.contentBuilder().startObject().field("foo", "bar").endObject());
Expand All @@ -267,9 +261,7 @@ public void testDisruptionAfterShardFinalization() throws Exception {

waitForBlockOnAnyDataNode(repoName, TimeValue.timeValueSeconds(10L));

NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode), Collections.singleton(dataNode)),
new NetworkDisruption.NetworkDisconnect());
NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(networkDisruption);
networkDisruption.startDisrupting();

Expand Down Expand Up @@ -326,12 +318,7 @@ public void testMasterFailOverDuringShardSnapshots() throws Exception {

waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));

final String masterNode = internalCluster().getMasterName();
final NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode),
Arrays.stream(internalCluster().getNodeNames()).filter(name -> masterNode.equals(name) == false)
.collect(Collectors.toSet())),
new NetworkDisruption.NetworkDisconnect());
final NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(networkDisruption);
networkDisruption.startDisrupting();
ensureStableCluster(3, dataNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.LongGCDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkLinkDisruptionType;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkUnresponsive;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.disruption.SingleNodeDisruption;
import org.elasticsearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -91,9 +89,8 @@ public void testFailWithMinimumMasterNodesConfigured() throws Exception {

// Simulate a network issue between the unlucky node and elected master node in both directions.

NetworkDisruption networkDisconnect = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(masterNode, unluckyNode),
new NetworkDisruption.NetworkDisconnect());
NetworkDisruption networkDisconnect =
new NetworkDisruption(new NetworkDisruption.TwoPartitions(masterNode, unluckyNode), NetworkDisruption.DISCONNECT);
setDisruptionScheme(networkDisconnect);
networkDisconnect.startDisrupting();

Expand Down Expand Up @@ -123,14 +120,14 @@ private void ensureNoMaster(String node) throws Exception {
* Verify that nodes fault detection detects a disconnected node after master reelection
*/
public void testFollowerCheckerDetectsDisconnectedNodeAfterMasterReelection() throws Exception {
testFollowerCheckerAfterMasterReelection(new NetworkDisconnect(), Settings.EMPTY);
testFollowerCheckerAfterMasterReelection(NetworkDisruption.DISCONNECT, Settings.EMPTY);
}

/**
* Verify that nodes fault detection detects an unresponsive node after master reelection
*/
public void testFollowerCheckerDetectsUnresponsiveNodeAfterMasterReelection() throws Exception {
testFollowerCheckerAfterMasterReelection(new NetworkUnresponsive(), Settings.builder()
testFollowerCheckerAfterMasterReelection(NetworkDisruption.UNRESPONSIVE, Settings.builder()
.put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "1s")
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), "4")
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "1s")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

public void testRetryPostingSnapshotStatusMessages() throws Exception {
String masterNode = internalCluster().startMasterOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();

createRepository("test-repo", "mock", Settings.builder()
.put("location", randomRepoPath()).put("compress", randomBoolean())
Expand All @@ -77,8 +77,7 @@ public void testRetryPostingSnapshotStatusMessages() throws Exception {
.get().getSnapshots().get(0).snapshotId();

logger.info("--> start disrupting cluster");
final NetworkDisruption networkDisruption = new NetworkDisruption(new NetworkDisruption.TwoPartitions(masterNode, dataNode),
NetworkDisruption.NetworkDelay.random(random()));
final NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.NetworkDelay.random(random()));
internalCluster().setDisruptionScheme(networkDisruption);
networkDisruption.startDisrupting();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
import org.elasticsearch.test.disruption.NetworkDisruption.DisruptedLinks;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkLinkDisruptionType;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
Expand Down Expand Up @@ -96,7 +95,7 @@ public void setUp() throws Exception {
@Override
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
if (scheme instanceof NetworkDisruption &&
((NetworkDisruption) scheme).getNetworkLinkDisruptionType() instanceof NetworkDisruption.NetworkUnresponsive) {
((NetworkDisruption) scheme).getNetworkLinkDisruptionType() == NetworkDisruption.UNRESPONSIVE) {
// the network unresponsive disruption may leave operations in flight
// this is because this disruption scheme swallows requests by design
// as such, these operations will never be marked as finished
Expand Down Expand Up @@ -202,10 +201,10 @@ public ServiceDisruptionScheme addRandomDisruptionScheme() {
final NetworkLinkDisruptionType disruptionType;
switch (randomInt(2)) {
case 0:
disruptionType = new NetworkDisruption.NetworkUnresponsive();
disruptionType = NetworkDisruption.UNRESPONSIVE;
break;
case 1:
disruptionType = new NetworkDisconnect();
disruptionType = NetworkDisruption.DISCONNECT;
break;
case 2:
disruptionType = NetworkDisruption.NetworkDelay.random(random());
Expand All @@ -226,9 +225,9 @@ public ServiceDisruptionScheme addRandomDisruptionScheme() {
NetworkDisruption addRandomDisruptionType(TwoPartitions partitions) {
final NetworkLinkDisruptionType disruptionType;
if (randomBoolean()) {
disruptionType = new NetworkDisruption.NetworkUnresponsive();
disruptionType = NetworkDisruption.UNRESPONSIVE;
} else {
disruptionType = new NetworkDisconnect();
disruptionType = NetworkDisruption.DISCONNECT;
}
NetworkDisruption partition = new NetworkDisruption(partitions, disruptionType);

Expand Down
Loading

0 comments on commit 49857cc

Please sign in to comment.