Skip to content

Commit

Permalink
move assertBusy to use CheckException (#25246)
Browse files Browse the repository at this point in the history
We use assertBusy in many places where the underlying code throw exceptions. Currently we need to wrap those exceptions in a RuntimeException which is ugly.
  • Loading branch information
bleskes authored Jun 15, 2017
1 parent 27f1206 commit 648b471
Show file tree
Hide file tree
Showing 23 changed files with 213 additions and 340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,10 @@ public void testBulkIndexCreatesMapping() throws Exception {
BulkRequestBuilder bulkBuilder = client().prepareBulk();
bulkBuilder.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON);
bulkBuilder.get();
assertBusy(new Runnable() {
@Override
public void run() {
GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get();
assertTrue(mappingsResponse.getMappings().containsKey("logstash-2014.03.30"));
assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs"));
}
assertBusy(() -> {
GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get();
assertTrue(mappingsResponse.getMappings().containsKey("logstash-2014.03.30"));
assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs"));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,7 @@ public void testRetryOfAnAlreadyTimedOutRequest() throws Exception {
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));

// wait until the timeout was triggered and we actually tried to send for the second time
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(transport.capturedRequests().length, equalTo(1));
}
});
assertBusy(() -> assertThat(transport.capturedRequests().length, equalTo(1)));

// let it fail the second time too
requestId = transport.capturedRequests()[0].requestId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,9 @@ public void testSimpleMinimumMasterNodes() throws Exception {
}

internalCluster().stopRandomNonMasterNode();
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
}
assertBusy(() -> {
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state1.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
});

logger.info("--> starting the previous master node again...");
Expand Down Expand Up @@ -405,12 +402,7 @@ public void onFailure(String source, Exception e) {
latch.await();

assertThat(failure.get(), instanceOf(Discovery.FailedToCommitClusterStateException.class));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue());
}
});
assertBusy(() -> assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue()));

partition.stopDisrupting();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,7 @@ public void testDelayedAllocationNodeLeavesAndComesBack() throws Exception {
ensureGreen("test");
indexRandomData();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true));
}
});
assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)));
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
internalCluster().startNode(); // this will use the same data location as the stopped node
ensureGreen("test");
Expand Down Expand Up @@ -114,12 +109,7 @@ public void testDelayedAllocationChangeWithSettingTo100ms() throws Exception {
ensureGreen("test");
indexRandomData();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true));
}
});
assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)));
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(100))).get());
ensureGreen("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,9 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
List<String> nodes = internalCluster().startNodes(3);

// Wait for all 3 nodes to be up
assertBusy(new Runnable() {
@Override
public void run() {
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
assertThat(resp.getNodes().size(), equalTo(3));
}
assertBusy(() -> {
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
assertThat(resp.getNodes().size(), equalTo(3));
});

// Start with all nodes at 50% usage
Expand All @@ -86,13 +83,10 @@ public void run() {
ensureGreen("test");

// Block until the "fake" cluster info is retrieved at least once
assertBusy(new Runnable() {
@Override
public void run() {
ClusterInfo info = cis.getClusterInfo();
logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size());
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0));
}
assertBusy(() -> {
ClusterInfo info = cis.getClusterInfo();
logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size());
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0));
});

final List<String> realNodeNames = new ArrayList<>();
Expand All @@ -113,21 +107,18 @@ public void run() {
// Retrieve the count of shards on each node
final Map<String, Integer> nodesToShardCount = new HashMap<>();

assertBusy(new Runnable() {
@Override
public void run() {
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator();
while (iter.hasNext()) {
RoutingNode node = iter.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5));
assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5));
assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0));
assertBusy(() -> {
ClusterStateResponse resp12 = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter12 = resp12.getState().getRoutingNodes().iterator();
while (iter12.hasNext()) {
RoutingNode node = iter12.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5));
assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5));
assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0));
});

// Update the disk usages so one node is now back under the high watermark
Expand All @@ -138,21 +129,18 @@ public void run() {
// Retrieve the count of shards on each node
nodesToShardCount.clear();

assertBusy(new Runnable() {
@Override
public void run() {
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator();
while (iter.hasNext()) {
RoutingNode node = iter.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3));
assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3));
assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3));
assertBusy(() -> {
ClusterStateResponse resp1 = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter1 = resp1.getState().getRoutingNodes().iterator();
while (iter1.hasNext()) {
RoutingNode node = iter1.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3));
assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3));
assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,9 @@ public void run() {
assertThat("wrong pool size", pool.getPoolSize(), equalTo(max));
assertThat("wrong active size", pool.getActiveCount(), equalTo(max));
barrier.await();
assertBusy(new Runnable() {
@Override
public void run() {
assertThat("wrong active count", pool.getActiveCount(), equalTo(0));
assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max));
}
assertBusy(() -> {
assertThat("wrong active count", pool.getActiveCount(), equalTo(0));
assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max));
});
terminate(pool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,7 @@ public void run() {

// the timeout handler is added post execution (and quickly cancelled). We have allow for this
// and use assert busy
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(timer.getQueue().size(), equalTo(0));
}
}, 5, TimeUnit.SECONDS);
assertBusy(() -> assertThat(timer.getQueue().size(), equalTo(0)), 5, TimeUnit.SECONDS);
assertThat(timeoutCalled.get(), equalTo(false));
assertTrue(terminate(executor));
assertTrue(terminate(threadPool));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,35 +197,29 @@ void assertNoMaster(final String node, TimeValue maxWaitTime) throws Exception {
}

void assertNoMaster(final String node, @Nullable final ClusterBlock expectedBlocks, TimeValue maxWaitTime) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = getNodeClusterState(node);
final DiscoveryNodes nodes = state.nodes();
assertNull("node [" + node + "] still has [" + nodes.getMasterNode() + "] as master", nodes.getMasterNode());
if (expectedBlocks != null) {
for (ClusterBlockLevel level : expectedBlocks.levels()) {
assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
(level));
}
assertBusy(() -> {
ClusterState state = getNodeClusterState(node);
final DiscoveryNodes nodes = state.nodes();
assertNull("node [" + node + "] still has [" + nodes.getMasterNode() + "] as master", nodes.getMasterNode());
if (expectedBlocks != null) {
for (ClusterBlockLevel level : expectedBlocks.levels()) {
assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
(level));
}
}
}, maxWaitTime.getMillis(), TimeUnit.MILLISECONDS);
}

void assertDifferentMaster(final String node, final String oldMasterNode) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = getNodeClusterState(node);
String masterNode = null;
if (state.nodes().getMasterNode() != null) {
masterNode = state.nodes().getMasterNode().getName();
}
logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode());
assertThat("node [" + node + "] still has [" + masterNode + "] as master",
oldMasterNode, not(equalTo(masterNode)));
assertBusy(() -> {
ClusterState state = getNodeClusterState(node);
String masterNode = null;
if (state.nodes().getMasterNode() != null) {
masterNode = state.nodes().getMasterNode().getName();
}
logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode());
assertThat("node [" + node + "] still has [" + masterNode + "] as master",
oldMasterNode, not(equalTo(masterNode)));
}, 10, TimeUnit.SECONDS);
}

Expand Down
33 changes: 15 additions & 18 deletions core/src/test/java/org/elasticsearch/document/ShardInfoIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,24 +128,21 @@ private void assertShardInfo(ReplicationResponse response, int expectedTotal, in
}

private void ensureActiveShardCopies(final int shardId, final int copyCount) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = client().admin().cluster().prepareState().get().getState();
assertThat(state.routingTable().index("idx"), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount));

ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx")
.setWaitForNoRelocatingShards(true)
.get();
assertThat(healthResponse.isTimedOut(), equalTo(false));

RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx")
.setActiveOnly(true)
.get();
assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0));
}
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
assertThat(state.routingTable().index("idx"), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount));

ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx")
.setWaitForNoRelocatingShards(true)
.get();
assertThat(healthResponse.isTimedOut(), equalTo(false));

RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx")
.setActiveOnly(true)
.get();
assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -83,7 +84,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -458,7 +458,7 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception {
threads[i].start();
}
barrier.await();
final Runnable check;
final CheckedRunnable<Exception> check;
if (flush) {
final FlushStats flushStats = shard.flushStats();
final long total = flushStats.getTotal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,11 @@ protected Cancellable scheduleTask(ThreadPool threadPool) {
imc.forceCheck();

// We must assertBusy because the writeIndexingBufferAsync is done in background (REFRESH) thread pool:
assertBusy(new Runnable() {
@Override
public void run() {
try (Engine.Searcher s2 = shard.acquireSearcher("index")) {
// 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write:
final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed();
assertTrue(indexingBufferBytes2 < indexingBufferBytes1);
}
assertBusy(() -> {
try (Engine.Searcher s2 = shard.acquireSearcher("index")) {
// 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write:
final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed();
assertTrue(indexingBufferBytes2 < indexingBufferBytes1);
}
});
}
Expand Down
Loading

0 comments on commit 648b471

Please sign in to comment.