Skip to content

Commit

Permalink
DISCOVERY: Cleanup AbstractDisruptionTestCase (elastic#34808)
Browse files Browse the repository at this point in the history
* DISCOVERY: Cleanup AbstractDisruptionTestCase

* Make the internal test cluster manage minimum master nodes where we used the default of (nodes / 2 + 1) before
* Remove use of the `NodeConfigurationSource` indirection
* Relates elastic#33675
  • Loading branch information
original-brownbear committed Nov 16, 2018
1 parent 41e0da6 commit 069c3a1
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,19 @@

package org.elasticsearch.discovery;

import java.nio.file.Path;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
Expand All @@ -56,27 +52,20 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {

static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.

private NodeConfigurationSource discoveryConfig;

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(discoveryConfig.nodeSettings(nodeOrdinal))
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(DEFAULT_SETTINGS)
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
}

@Before
public void clearConfig() {
discoveryConfig = null;
}

@Override
protected int numberOfShards() {
return 3;
Expand Down Expand Up @@ -118,11 +107,6 @@ protected void beforeIndexDeletion() throws Exception {
}

List<String> startCluster(int numberOfNodes) {
return startCluster(numberOfNodes, -1);
}

List<String> startCluster(int numberOfNodes, int minimumMasterNode) {
configureCluster(numberOfNodes, minimumMasterNode);
InternalTestCluster internalCluster = internalCluster();
List<String> nodes = internalCluster.startNodes(numberOfNodes);
ensureStableCluster(numberOfNodes);
Expand Down Expand Up @@ -151,38 +135,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

void configureCluster(int numberOfNodes, int minimumMasterNode) {
configureCluster(DEFAULT_SETTINGS, numberOfNodes, minimumMasterNode);
}

void configureCluster(Settings settings, int numberOfNodes, int minimumMasterNode) {
if (minimumMasterNode < 0) {
minimumMasterNode = numberOfNodes / 2 + 1;
}
logger.info("---> configured unicast");
// TODO: Rarely use default settings form some of these
Settings nodeSettings = Settings.builder()
.put(settings)
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes)
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode)
.putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file")
.build();

if (discoveryConfig == null) {
discoveryConfig = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(final int nodeOrdinal) {
return nodeSettings;
}

@Override
public Path nodeConfigPath(final int nodeOrdinal) {
return null;
}
};
}
}

ClusterState getNodeClusterState(String node) {
return client(node).admin().cluster().prepareState().setLocal(true).get().getState();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class ClusterDisruptionCleanSettingsIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

/**
* This test creates a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the target
* node but already deleted on the source node. Search request should still work.
*/
public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception {
// Don't use AbstractDisruptionTestCase.DEFAULT_SETTINGS as settings
// (which can cause node disconnects on a slow CI machine)
internalCluster().startMasterOnlyNode();
final String node_1 = internalCluster().startDataOnlyNode();

logger.info("--> creating index [test] with one shard and on replica");
assertAcked(prepareCreate("test").setSettings(
Settings.builder().put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
);
ensureGreen("test");

final String node_2 = internalCluster().startDataOnlyNode();
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
indexRequestBuilderList.add(client().prepareIndex().setIndex("test").setType("_doc")
.setSource("{\"int_field\":1}", XContentType.JSON));
}
indexRandom(true, indexRequestBuilderList);

IndicesStoreIntegrationIT.relocateAndBlockCompletion(logger, "test", 0, node_1, node_2);
// now search for the documents and see if we get a reply
assertThat(client().prepareSearch().setSize(0).get().getHits().getTotalHits(), equalTo(100L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@
/**
* Tests for discovery during disruptions.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {

public void testIsolatedUnicastNodes() throws Exception {
internalCluster().setHostsListContainsOnlyFirstNode(true);
List<String> nodes = startCluster(4, -1);
List<String> nodes = startCluster(4);
// Figure out what is the elected master node
final String unicastTarget = nodes.get(0);

Expand Down Expand Up @@ -100,7 +100,7 @@ public void testIsolatedUnicastNodes() throws Exception {
*/
public void testUnicastSinglePingResponseContainsMaster() throws Exception {
internalCluster().setHostsListContainsOnlyFirstNode(true);
List<String> nodes = startCluster(4, -1);
List<String> nodes = startCluster(4);
// Figure out what is the elected master node
final String masterNode = internalCluster().getMasterName();
logger.info("---> legit elected master node={}", masterNode);
Expand Down Expand Up @@ -138,15 +138,8 @@ public void testUnicastSinglePingResponseContainsMaster() throws Exception {
* Test cluster join with issues in cluster state publishing *
*/
public void testClusterJoinDespiteOfPublishingIssues() throws Exception {
List<String> nodes = startCluster(2, 1);

String masterNode = internalCluster().getMasterName();
String nonMasterNode;
if (masterNode.equals(nodes.get(0))) {
nonMasterNode = nodes.get(1);
} else {
nonMasterNode = nodes.get(0);
}
String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY);
String nonMasterNode = internalCluster().startDataOnlyNode(Settings.EMPTY);

DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes();

Expand Down Expand Up @@ -196,7 +189,6 @@ public void testClusterJoinDespiteOfPublishingIssues() throws Exception {
}

public void testClusterFormingWithASlowNode() throws Exception {
configureCluster(3, 2);

SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000);

Expand All @@ -212,7 +204,6 @@ public void testClusterFormingWithASlowNode() throws Exception {
}

public void testElectMasterWithLatestVersion() throws Exception {
configureCluster(3, 2);
final Set<String> nodes = new HashSet<>(internalCluster().startNodes(3));
ensureStableCluster(3);
ServiceDisruptionScheme isolateAllNodes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@
/**
* Tests relating to the loss of the master.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class MasterDisruptionIT extends AbstractDisruptionTestCase {

/**
Expand Down Expand Up @@ -153,8 +153,8 @@ public void testNodesFDAfterMasterReelection() throws Exception {
*/
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.test.disruption:TRACE")
public void testStaleMasterNotHijackingMajority() throws Exception {
// 3 node cluster with unicast discovery and minimum_master_nodes set to 2:
final List<String> nodes = startCluster(3, 2);
// 3 node cluster with unicast discovery and minimum_master_nodes set to the default of 2:
final List<String> nodes = startCluster(3);

// Save the current master node as old master node, because that node will get frozen
final String oldMasterNode = internalCluster().getMasterName();
Expand Down Expand Up @@ -267,7 +267,7 @@ public void onFailure(String source, Exception e) {
* Test that cluster recovers from a long GC on master that causes other nodes to elect a new one
*/
public void testMasterNodeGCs() throws Exception {
List<String> nodes = startCluster(3, -1);
List<String> nodes = startCluster(3);

String oldMasterNode = internalCluster().getMasterName();
// a very long GC, but it's OK as we remove the disruption when it has had an effect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.elasticsearch.discovery;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
Expand All @@ -28,10 +31,12 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.junit.annotations.TestLogging;

Expand All @@ -40,26 +45,35 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.test.transport.MockTransportService;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.instanceOf;

/**
* Tests snapshot operations during disruptions.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
@TestLogging("org.elasticsearch.snapshot:TRACE")
public class SnapshotDisruptionIT extends AbstractDisruptionTestCase {
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class SnapshotDisruptionIT extends ESIntegTestCase {

public void testDisruptionOnSnapshotInitialization() throws Exception {
final Settings settings = Settings.builder()
.put(DEFAULT_SETTINGS)
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(AbstractDisruptionTestCase.DEFAULT_SETTINGS)
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false)
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s")
.build();
}

public void testDisruptionOnSnapshotInitialization() throws Exception {
final String idxName = "test";
configureCluster(settings, 4, 2);
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3);
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(4);
Expand Down Expand Up @@ -159,7 +173,7 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}

private void createRandomIndex(String idxName) throws ExecutionException, InterruptedException {
private void createRandomIndex(String idxName) throws InterruptedException, ExecutionException {
assertAcked(prepareCreate(idxName, 0, Settings.builder().put("number_of_shards", between(1, 20))
.put("number_of_replicas", 0)));
logger.info("--> indexing some data");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void setUp() throws Exception {
super.setUp();
if (cluster2 == null) {
SecuritySettingsSource cluster2SettingsSource =
new SecuritySettingsSource(defaultMaxNumberOfNodes(), useSSL, createTempDir(), Scope.SUITE) {
new SecuritySettingsSource(useSSL, createTempDir(), Scope.SUITE) {
@Override
public Settings nodeSettings(int nodeOrdinal) {
Settings.Builder builder = Settings.builder()
Expand Down Expand Up @@ -239,7 +239,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

private void setupTribeNode(Settings settings) throws Exception {
SecuritySettingsSource cluster2SettingsSource =
new SecuritySettingsSource(1, useSSL, createTempDir(), Scope.TEST) {
new SecuritySettingsSource(useSSL, createTempDir(), Scope.TEST) {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
Expand Down

0 comments on commit 069c3a1

Please sign in to comment.