Skip to content

Commit

Permalink
move assertBusy to use CheckException (#25246)
Browse files Browse the repository at this point in the history
We use assertBusy in many places where the underlying code throw exceptions. Currently we need to wrap those exceptions in a RuntimeException which is ugly.
  • Loading branch information
bleskes committed Jun 15, 2017
1 parent b839a14 commit b03c8a4
Show file tree
Hide file tree
Showing 27 changed files with 263 additions and 416 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,10 @@ public void testBulkIndexCreatesMapping() throws Exception {
BulkRequestBuilder bulkBuilder = client().prepareBulk();
bulkBuilder.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON);
bulkBuilder.get();
assertBusy(new Runnable() {
@Override
public void run() {
GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get();
assertTrue(mappingsResponse.getMappings().containsKey("logstash-2014.03.30"));
assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs"));
}
assertBusy(() -> {
GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get();
assertTrue(mappingsResponse.getMappings().containsKey("logstash-2014.03.30"));
assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs"));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,7 @@ public void testRetryOfAnAlreadyTimedOutRequest() throws Exception {
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));

// wait until the timeout was triggered and we actually tried to send for the second time
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(transport.capturedRequests().length, equalTo(1));
}
});
assertBusy(() -> assertThat(transport.capturedRequests().length, equalTo(1)));

// let it fail the second time too
requestId = transport.capturedRequests()[0].requestId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ElectMasterService;
Expand All @@ -37,7 +36,6 @@
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDelay;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -161,12 +159,9 @@ public void testSimpleMinimumMasterNodes() throws Exception {
}

internalCluster().stopRandomNonMasterNode();
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
}
assertBusy(() -> {
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state1.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
});

logger.info("--> starting the previous master node again...");
Expand Down Expand Up @@ -408,12 +403,7 @@ public void onFailure(String source, Exception e) {
latch.await();

assertThat(failure.get(), instanceOf(Discovery.FailedToCommitClusterStateException.class));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue());
}
});
assertBusy(() -> assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue()));

partition.stopDisrupting();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,7 @@ public void testDelayedAllocationNodeLeavesAndComesBack() throws Exception {
ensureGreen("test");
indexRandomData();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true));
}
});
assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)));
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
internalCluster().startNode(); // this will use the same data location as the stopped node
ensureGreen("test");
Expand Down Expand Up @@ -116,12 +111,7 @@ public void testDelayedAllocationChangeWithSettingTo100ms() throws Exception {
ensureGreen("test");
indexRandomData();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true));
}
});
assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)));
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(100))).get());
ensureGreen("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,7 @@ private void createStaleReplicaScenario() throws Exception {
logger.info("--> check that old primary shard does not get promoted to primary again");
// kick reroute and wait for all shard states to be fetched
client(master).admin().cluster().prepareReroute().get();
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0));
}
});
assertBusy(() -> assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0)));
// kick reroute a second time and check that all shards are unassigned
assertThat(client(master).admin().cluster().prepareReroute().get().getState().getRoutingNodes().unassigned().size(), equalTo(2));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -50,7 +51,7 @@ public void testPrimaryFailureIncreasesTerm() throws Exception {
logger.info("--> waiting for a yellow index");
// JDK 9 type inference gets confused, so we have to help the
// type inference
assertBusy(((Runnable) () -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(),
assertBusy(((CheckedRunnable<Exception>) () -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(),
equalTo(ClusterHealthStatus.YELLOW))));

final long term0 = shard == 0 ? 2 : 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,9 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
List<String> nodes = internalCluster().startNodes(3);

// Wait for all 3 nodes to be up
assertBusy(new Runnable() {
@Override
public void run() {
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
assertThat(resp.getNodes().size(), equalTo(3));
}
assertBusy(() -> {
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
assertThat(resp.getNodes().size(), equalTo(3));
});

// Start with all nodes at 50% usage
Expand All @@ -86,13 +83,10 @@ public void run() {
ensureGreen("test");

// Block until the "fake" cluster info is retrieved at least once
assertBusy(new Runnable() {
@Override
public void run() {
ClusterInfo info = cis.getClusterInfo();
logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size());
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0));
}
assertBusy(() -> {
ClusterInfo info = cis.getClusterInfo();
logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size());
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0));
});

final List<String> realNodeNames = new ArrayList<>();
Expand All @@ -113,21 +107,18 @@ public void run() {
// Retrieve the count of shards on each node
final Map<String, Integer> nodesToShardCount = new HashMap<>();

assertBusy(new Runnable() {
@Override
public void run() {
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator();
while (iter.hasNext()) {
RoutingNode node = iter.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5));
assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5));
assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0));
assertBusy(() -> {
ClusterStateResponse resp12 = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter12 = resp12.getState().getRoutingNodes().iterator();
while (iter12.hasNext()) {
RoutingNode node = iter12.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5));
assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5));
assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0));
});

// Update the disk usages so one node is now back under the high watermark
Expand All @@ -138,21 +129,18 @@ public void run() {
// Retrieve the count of shards on each node
nodesToShardCount.clear();

assertBusy(new Runnable() {
@Override
public void run() {
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator();
while (iter.hasNext()) {
RoutingNode node = iter.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3));
assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3));
assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3));
assertBusy(() -> {
ClusterStateResponse resp1 = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter1 = resp1.getState().getRoutingNodes().iterator();
while (iter1.hasNext()) {
RoutingNode node = iter1.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3));
assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3));
assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,9 @@ public void run() {
assertThat("wrong pool size", pool.getPoolSize(), equalTo(max));
assertThat("wrong active size", pool.getActiveCount(), equalTo(max));
barrier.await();
assertBusy(new Runnable() {
@Override
public void run() {
assertThat("wrong active count", pool.getActiveCount(), equalTo(0));
assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max));
}
assertBusy(() -> {
assertThat("wrong active count", pool.getActiveCount(), equalTo(0));
assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max));
});
terminate(pool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,7 @@ public void run() {

// the timeout handler is added post execution (and quickly cancelled). We have allow for this
// and use assert busy
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(timer.getQueue().size(), equalTo(0));
}
}, 5, TimeUnit.SECONDS);
assertBusy(() -> assertThat(timer.getQueue().size(), equalTo(0)), 5, TimeUnit.SECONDS);
assertThat(timeoutCalled.get(), equalTo(false));
assertTrue(terminate(executor));
assertTrue(terminate(threadPool));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1324,34 +1324,28 @@ private void assertNoMaster(final String node, TimeValue maxWaitTime) throws Exc
}

private void assertNoMaster(final String node, @Nullable final ClusterBlock expectedBlocks, TimeValue maxWaitTime) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = getNodeClusterState(node);
assertNull("node [" + node + "] still has [" + state.nodes().getMasterNode() + "] as master", state.nodes().getMasterNode());
if (expectedBlocks != null) {
for (ClusterBlockLevel level : expectedBlocks.levels()) {
assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
(level));
}
assertBusy(() -> {
ClusterState state = getNodeClusterState(node);
assertNull("node [" + node + "] still has [" + state.nodes().getMasterNode() + "] as master", state.nodes().getMasterNode());
if (expectedBlocks != null) {
for (ClusterBlockLevel level : expectedBlocks.levels()) {
assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
(level));
}
}
}, maxWaitTime.getMillis(), TimeUnit.MILLISECONDS);
}

private void assertDifferentMaster(final String node, final String oldMasterNode) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = getNodeClusterState(node);
String masterNode = null;
if (state.nodes().getMasterNode() != null) {
masterNode = state.nodes().getMasterNode().getName();
}
logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode());
assertThat("node [" + node + "] still has [" + masterNode + "] as master",
oldMasterNode, not(equalTo(masterNode)));
assertBusy(() -> {
ClusterState state = getNodeClusterState(node);
String masterNode = null;
if (state.nodes().getMasterNode() != null) {
masterNode = state.nodes().getMasterNode().getName();
}
logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode());
assertThat("node [" + node + "] still has [" + masterNode + "] as master",
oldMasterNode, not(equalTo(masterNode)));
}, 10, TimeUnit.SECONDS);
}

Expand Down
33 changes: 15 additions & 18 deletions core/src/test/java/org/elasticsearch/document/ShardInfoIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,24 +130,21 @@ private void assertShardInfo(ReplicationResponse response, int expectedTotal, in
}

private void ensureActiveShardCopies(final int shardId, final int copyCount) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = client().admin().cluster().prepareState().get().getState();
assertThat(state.routingTable().index("idx"), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount));

ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx")
.setWaitForNoRelocatingShards(true)
.get();
assertThat(healthResponse.isTimedOut(), equalTo(false));

RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx")
.setActiveOnly(true)
.get();
assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0));
}
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
assertThat(state.routingTable().index("idx"), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount));

ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx")
.setWaitForNoRelocatingShards(true)
.get();
assertThat(healthResponse.isTimedOut(), equalTo(false));

RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx")
.setActiveOnly(true)
.get();
assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0));
});
}
}
Loading

0 comments on commit b03c8a4

Please sign in to comment.