Skip to content

Commit

Permalink
Disc: Move AbstractDisruptionTC to filebased D. (#34461)
Browse files Browse the repository at this point in the history
* Discovery: Move AbstractDisruptionTestCase to file-based discovery.
* Relates #33675
* Simplify away ClusterDiscoveryConfiguration
  • Loading branch information
original-brownbear committed Nov 16, 2018
1 parent 3fb2095 commit fc2c5cf
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.discovery;

import java.nio.file.Path;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -33,7 +34,8 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
Expand All @@ -52,17 +54,17 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {

static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.

private ClusterDiscoveryConfiguration discoveryConfig;
private NodeConfigurationSource discoveryConfig;

@Override
protected Settings nodeSettings(int nodeOrdinal) {
Expand Down Expand Up @@ -115,18 +117,14 @@ protected void beforeIndexDeletion() throws Exception {
}
}

List<String> startCluster(int numberOfNodes) throws ExecutionException, InterruptedException {
List<String> startCluster(int numberOfNodes) {
return startCluster(numberOfNodes, -1);
}

List<String> startCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException {
return startCluster(numberOfNodes, minimumMasterNode, null);
}

List<String> startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws
ExecutionException, InterruptedException {
configureCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
List<String> nodes = internalCluster().startNodes(numberOfNodes);
List<String> startCluster(int numberOfNodes, int minimumMasterNode) {
configureCluster(numberOfNodes, minimumMasterNode);
InternalTestCluster internalCluster = internalCluster();
List<String> nodes = internalCluster.startNodes(numberOfNodes);
ensureStableCluster(numberOfNodes);

// TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results
Expand All @@ -153,20 +151,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

void configureCluster(
int numberOfNodes,
@Nullable int[] unicastHostsOrdinals,
int minimumMasterNode
) throws ExecutionException, InterruptedException {
configureCluster(DEFAULT_SETTINGS, numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
void configureCluster(int numberOfNodes, int minimumMasterNode) {
configureCluster(DEFAULT_SETTINGS, numberOfNodes, minimumMasterNode);
}

void configureCluster(
Settings settings,
int numberOfNodes,
@Nullable int[] unicastHostsOrdinals,
int minimumMasterNode
) throws ExecutionException, InterruptedException {
void configureCluster(Settings settings, int numberOfNodes, int minimumMasterNode) {
if (minimumMasterNode < 0) {
minimumMasterNode = numberOfNodes / 2 + 1;
}
Expand All @@ -176,14 +165,21 @@ void configureCluster(
.put(settings)
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes)
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode)
.putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file")
.build();

if (discoveryConfig == null) {
if (unicastHostsOrdinals == null) {
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings);
} else {
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals);
}
discoveryConfig = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(final int nodeOrdinal) {
return nodeSettings;
}

@Override
public Path nodeConfigPath(final int nodeOrdinal) {
return null;
}
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -37,9 +36,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
Expand Down Expand Up @@ -70,8 +67,8 @@
/**
* Tests various cluster operations (e.g., indexing) during disruptions.
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class ClusterDisruptionIT extends AbstractDisruptionTestCase {

/**
Expand Down Expand Up @@ -287,7 +284,7 @@ public void testRejoinDocumentExistsInAllShardCopies() throws Exception {

// simulate handling of sending shard failure during an isolation
public void testSendingShardFailure() throws Exception {
List<String> nodes = startCluster(3, 2);
List<String> nodes = startCluster(3);
String masterNode = internalCluster().getMasterName();
List<String> nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList());
String nonMasterNode = randomFrom(nonMasterNodes);
Expand Down Expand Up @@ -355,43 +352,10 @@ public void onFailure(Exception e) {
}
}

/**
* This test creates a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the target
* node but already deleted on the source node. Search request should still work.
*/
public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception {
// don't use DEFAULT settings (which can cause node disconnects on a slow CI machine)
configureCluster(Settings.EMPTY, 3, null, 1);
final String masterNode = internalCluster().startMasterOnlyNode();
final String node_1 = internalCluster().startDataOnlyNode();

logger.info("--> creating index [test] with one shard and on replica");
assertAcked(prepareCreate("test").setSettings(
Settings.builder().put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
);
ensureGreen("test");

final String node_2 = internalCluster().startDataOnlyNode();
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
indexRequestBuilderList.add(client().prepareIndex().setIndex("test").setType("_doc")
.setSource("{\"int_field\":1}", XContentType.JSON));
}
indexRandom(true, indexRequestBuilderList);

IndicesStoreIntegrationIT.relocateAndBlockCompletion(logger, "test", 0, node_1, node_2);
// now search for the documents and see if we get a reply
assertThat(client().prepareSearch().setSize(0).get().getHits().getTotalHits(), equalTo(100L));
}

public void testIndexImportedFromDataOnlyNodesIfMasterLostDataFolder() throws Exception {
// test for https://github.com/elastic/elasticsearch/issues/8823
configureCluster(2, null, 1);
String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNode(Settings.EMPTY);

ensureStableCluster(2);
assertAcked(prepareCreate("index").setSettings(Settings.builder().put("index.number_of_replicas", 0)));
index("index", "_doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
Expand All @@ -414,14 +378,12 @@ public boolean clearData(String nodeName) {
*/
public void testIndicesDeleted() throws Exception {
final Settings settings = Settings.builder()
.put(DEFAULT_SETTINGS)
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait on isolated data node
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
.build();
final String idxName = "test";
configureCluster(settings, 3, null, 2);
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(2);
final String dataNode = internalCluster().startDataOnlyNode();
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(2, settings);
final String dataNode = internalCluster().startDataOnlyNode(settings);
ensureStableCluster(3);
assertAcked(prepareCreate("test"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {

public void testIsolatedUnicastNodes() throws Exception {
List<String> nodes = startCluster(4, -1, new int[]{0});
internalCluster().setHostsListContainsOnlyFirstNode(true);
List<String> nodes = startCluster(4, -1);
// Figure out what is the elected master node
final String unicastTarget = nodes.get(0);

Expand Down Expand Up @@ -98,7 +99,8 @@ public void testIsolatedUnicastNodes() throws Exception {
* The rejoining node should take this master node and connect.
*/
public void testUnicastSinglePingResponseContainsMaster() throws Exception {
List<String> nodes = startCluster(4, -1, new int[]{0});
internalCluster().setHostsListContainsOnlyFirstNode(true);
List<String> nodes = startCluster(4, -1);
// Figure out what is the elected master node
final String masterNode = internalCluster().getMasterName();
logger.info("---> legit elected master node={}", masterNode);
Expand Down Expand Up @@ -194,7 +196,7 @@ public void testClusterJoinDespiteOfPublishingIssues() throws Exception {
}

public void testClusterFormingWithASlowNode() throws Exception {
configureCluster(3, null, 2);
configureCluster(3, 2);

SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000);

Expand All @@ -210,7 +212,7 @@ public void testClusterFormingWithASlowNode() throws Exception {
}

public void testElectMasterWithLatestVersion() throws Exception {
configureCluster(3, null, 2);
configureCluster(3, 2);
final Set<String> nodes = new HashSet<>(internalCluster().startNodes(3));
ensureStableCluster(3);
ServiceDisruptionScheme isolateAllNodes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testDisruptionOnSnapshotInitialization() throws Exception {
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
.build();
final String idxName = "test";
configureCluster(settings, 4, null, 2);
configureCluster(settings, 4, 2);
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3);
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ public final class InternalTestCluster extends TestCluster {
private ServiceDisruptionScheme activeDisruptionScheme;
private Function<Client, Client> clientWrapper;

// If set to true only the first node in the cluster will be made a unicast node
private boolean hostsListContainsOnlyFirstNode;

public InternalTestCluster(
final long clusterSeed,
final Path baseDir,
Expand Down Expand Up @@ -1596,12 +1599,17 @@ private synchronized void startAndPublishNodesAndClients(List<NodeAndClient> nod

private final Object discoveryFileMutex = new Object();

private void rebuildUnicastHostFiles(Collection<NodeAndClient> newNodes) {
private void rebuildUnicastHostFiles(List<NodeAndClient> newNodes) {
// cannot be a synchronized method since it's called on other threads from within synchronized startAndPublishNodesAndClients()
synchronized (discoveryFileMutex) {
try {
List<String> discoveryFileContents = Stream.concat(nodes.values().stream(), newNodes.stream())
.map(nac -> nac.node.injector().getInstance(TransportService.class)).filter(Objects::nonNull)
Stream<NodeAndClient> unicastHosts = Stream.concat(nodes.values().stream(), newNodes.stream());
if (hostsListContainsOnlyFirstNode) {
unicastHosts = unicastHosts.limit(1L);
}
List<String> discoveryFileContents = unicastHosts.map(
nac -> nac.node.injector().getInstance(TransportService.class)
).filter(Objects::nonNull)
.map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode)
.map(n -> n.getAddress().toString())
.distinct().collect(Collectors.toList());
Expand Down Expand Up @@ -2031,6 +2039,9 @@ public synchronized int numMasterNodes() {
return filterNodes(nodes, NodeAndClient::isMasterEligible).size();
}

public void setHostsListContainsOnlyFirstNode(boolean hostsListContainsOnlyFirstNode) {
this.hostsListContainsOnlyFirstNode = hostsListContainsOnlyFirstNode;
}

public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
assert activeDisruptionScheme == null :
Expand Down
Loading

0 comments on commit fc2c5cf

Please sign in to comment.