Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move assertBusy to use CheckException #25246

Merged
merged 1 commit into from
Jun 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,10 @@ public void testBulkIndexCreatesMapping() throws Exception {
BulkRequestBuilder bulkBuilder = client().prepareBulk();
bulkBuilder.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON);
bulkBuilder.get();
assertBusy(new Runnable() {
@Override
public void run() {
GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get();
assertTrue(mappingsResponse.getMappings().containsKey("logstash-2014.03.30"));
assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs"));
}
assertBusy(() -> {
GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get();
assertTrue(mappingsResponse.getMappings().containsKey("logstash-2014.03.30"));
assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs"));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,7 @@ public void testRetryOfAnAlreadyTimedOutRequest() throws Exception {
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));

// wait until the timeout was triggered and we actually tried to send for the second time
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(transport.capturedRequests().length, equalTo(1));
}
});
assertBusy(() -> assertThat(transport.capturedRequests().length, equalTo(1)));

// let it fail the second time too
requestId = transport.capturedRequests()[0].requestId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,9 @@ public void testSimpleMinimumMasterNodes() throws Exception {
}

internalCluster().stopRandomNonMasterNode();
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
}
assertBusy(() -> {
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state1.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
});

logger.info("--> starting the previous master node again...");
Expand Down Expand Up @@ -405,12 +402,7 @@ public void onFailure(String source, Exception e) {
latch.await();

assertThat(failure.get(), instanceOf(Discovery.FailedToCommitClusterStateException.class));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue());
}
});
assertBusy(() -> assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue()));

partition.stopDisrupting();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,7 @@ public void testDelayedAllocationNodeLeavesAndComesBack() throws Exception {
ensureGreen("test");
indexRandomData();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true));
}
});
assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)));
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
internalCluster().startNode(); // this will use the same data location as the stopped node
ensureGreen("test");
Expand Down Expand Up @@ -114,12 +109,7 @@ public void testDelayedAllocationChangeWithSettingTo100ms() throws Exception {
ensureGreen("test");
indexRandomData();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true));
}
});
assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)));
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(100))).get());
ensureGreen("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,9 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
List<String> nodes = internalCluster().startNodes(3);

// Wait for all 3 nodes to be up
assertBusy(new Runnable() {
@Override
public void run() {
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
assertThat(resp.getNodes().size(), equalTo(3));
}
assertBusy(() -> {
NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get();
assertThat(resp.getNodes().size(), equalTo(3));
});

// Start with all nodes at 50% usage
Expand All @@ -86,13 +83,10 @@ public void run() {
ensureGreen("test");

// Block until the "fake" cluster info is retrieved at least once
assertBusy(new Runnable() {
@Override
public void run() {
ClusterInfo info = cis.getClusterInfo();
logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size());
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0));
}
assertBusy(() -> {
ClusterInfo info = cis.getClusterInfo();
logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size());
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0));
});

final List<String> realNodeNames = new ArrayList<>();
Expand All @@ -113,21 +107,18 @@ public void run() {
// Retrieve the count of shards on each node
final Map<String, Integer> nodesToShardCount = new HashMap<>();

assertBusy(new Runnable() {
@Override
public void run() {
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator();
while (iter.hasNext()) {
RoutingNode node = iter.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5));
assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5));
assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0));
assertBusy(() -> {
ClusterStateResponse resp12 = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter12 = resp12.getState().getRoutingNodes().iterator();
while (iter12.hasNext()) {
RoutingNode node = iter12.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp12.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has 5 shards", nodesToShardCount.get(realNodeNames.get(0)), equalTo(5));
assertThat("node2 has 5 shards", nodesToShardCount.get(realNodeNames.get(1)), equalTo(5));
assertThat("node3 has 0 shards", nodesToShardCount.get(realNodeNames.get(2)), equalTo(0));
});

// Update the disk usages so one node is now back under the high watermark
Expand All @@ -138,21 +129,18 @@ public void run() {
// Retrieve the count of shards on each node
nodesToShardCount.clear();

assertBusy(new Runnable() {
@Override
public void run() {
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter = resp.getState().getRoutingNodes().iterator();
while (iter.hasNext()) {
RoutingNode node = iter.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3));
assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3));
assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3));
assertBusy(() -> {
ClusterStateResponse resp1 = client().admin().cluster().prepareState().get();
Iterator<RoutingNode> iter1 = resp1.getState().getRoutingNodes().iterator();
while (iter1.hasNext()) {
RoutingNode node = iter1.next();
logger.info("--> node {} has {} shards",
node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
nodesToShardCount.put(node.nodeId(), resp1.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
assertThat("node1 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(0)), greaterThanOrEqualTo(3));
assertThat("node2 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(1)), greaterThanOrEqualTo(3));
assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,9 @@ public void run() {
assertThat("wrong pool size", pool.getPoolSize(), equalTo(max));
assertThat("wrong active size", pool.getActiveCount(), equalTo(max));
barrier.await();
assertBusy(new Runnable() {
@Override
public void run() {
assertThat("wrong active count", pool.getActiveCount(), equalTo(0));
assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max));
}
assertBusy(() -> {
assertThat("wrong active count", pool.getActiveCount(), equalTo(0));
assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max));
});
terminate(pool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,7 @@ public void run() {

// the timeout handler is added post execution (and quickly cancelled). We have allow for this
// and use assert busy
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(timer.getQueue().size(), equalTo(0));
}
}, 5, TimeUnit.SECONDS);
assertBusy(() -> assertThat(timer.getQueue().size(), equalTo(0)), 5, TimeUnit.SECONDS);
assertThat(timeoutCalled.get(), equalTo(false));
assertTrue(terminate(executor));
assertTrue(terminate(threadPool));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,35 +197,29 @@ void assertNoMaster(final String node, TimeValue maxWaitTime) throws Exception {
}

void assertNoMaster(final String node, @Nullable final ClusterBlock expectedBlocks, TimeValue maxWaitTime) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = getNodeClusterState(node);
final DiscoveryNodes nodes = state.nodes();
assertNull("node [" + node + "] still has [" + nodes.getMasterNode() + "] as master", nodes.getMasterNode());
if (expectedBlocks != null) {
for (ClusterBlockLevel level : expectedBlocks.levels()) {
assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
(level));
}
assertBusy(() -> {
ClusterState state = getNodeClusterState(node);
final DiscoveryNodes nodes = state.nodes();
assertNull("node [" + node + "] still has [" + nodes.getMasterNode() + "] as master", nodes.getMasterNode());
if (expectedBlocks != null) {
for (ClusterBlockLevel level : expectedBlocks.levels()) {
assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
(level));
}
}
}, maxWaitTime.getMillis(), TimeUnit.MILLISECONDS);
}

void assertDifferentMaster(final String node, final String oldMasterNode) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = getNodeClusterState(node);
String masterNode = null;
if (state.nodes().getMasterNode() != null) {
masterNode = state.nodes().getMasterNode().getName();
}
logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode());
assertThat("node [" + node + "] still has [" + masterNode + "] as master",
oldMasterNode, not(equalTo(masterNode)));
assertBusy(() -> {
ClusterState state = getNodeClusterState(node);
String masterNode = null;
if (state.nodes().getMasterNode() != null) {
masterNode = state.nodes().getMasterNode().getName();
}
logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode());
assertThat("node [" + node + "] still has [" + masterNode + "] as master",
oldMasterNode, not(equalTo(masterNode)));
}, 10, TimeUnit.SECONDS);
}

Expand Down
33 changes: 15 additions & 18 deletions core/src/test/java/org/elasticsearch/document/ShardInfoIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,24 +128,21 @@ private void assertShardInfo(ReplicationResponse response, int expectedTotal, in
}

private void ensureActiveShardCopies(final int shardId, final int copyCount) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = client().admin().cluster().prepareState().get().getState();
assertThat(state.routingTable().index("idx"), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount));

ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx")
.setWaitForNoRelocatingShards(true)
.get();
assertThat(healthResponse.isTimedOut(), equalTo(false));

RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx")
.setActiveOnly(true)
.get();
assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0));
}
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
assertThat(state.routingTable().index("idx"), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId), not(nullValue()));
assertThat(state.routingTable().index("idx").shard(shardId).activeShards().size(), equalTo(copyCount));

ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth("idx")
.setWaitForNoRelocatingShards(true)
.get();
assertThat(healthResponse.isTimedOut(), equalTo(false));

RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx")
.setActiveOnly(true)
.get();
assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -83,7 +84,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -458,7 +458,7 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception {
threads[i].start();
}
barrier.await();
final Runnable check;
final CheckedRunnable<Exception> check;
if (flush) {
final FlushStats flushStats = shard.flushStats();
final long total = flushStats.getTotal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,11 @@ protected Cancellable scheduleTask(ThreadPool threadPool) {
imc.forceCheck();

// We must assertBusy because the writeIndexingBufferAsync is done in background (REFRESH) thread pool:
assertBusy(new Runnable() {
@Override
public void run() {
try (Engine.Searcher s2 = shard.acquireSearcher("index")) {
// 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write:
final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed();
assertTrue(indexingBufferBytes2 < indexingBufferBytes1);
}
assertBusy(() -> {
try (Engine.Searcher s2 = shard.acquireSearcher("index")) {
// 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write:
final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed();
assertTrue(indexingBufferBytes2 < indexingBufferBytes1);
}
});
}
Expand Down
Loading